1extern crate byteorder;
2extern crate slog;
3
4use std::fs;
5use std::net;
6use std::io;
7use std::time;
8
9use super::packet::{
10 BlockNum,
11 Data,
12 ErrorCode,
13 ErrorMessage,
14 Filename,
15 Packet,
16 TransferMode,
17};
18use super::options::Options;
19use super::make_socket;
20
21
22pub fn serve_file(
23 peer: net::SocketAddr,
24 filename: Filename,
25 txmode: TransferMode,
26 options: Options,
27 logger: &slog::Logger,
28) {
29 info!(logger, "Received RRQ: {:?} {:?} {:?}", filename, txmode, options);
30 let Filename(filename) = filename;
31 match make_socket(peer) {
32 Ok(socket) => match fs::File::open(&filename) {
33 Ok(mut file) => {
34 let len = file.metadata().ok().and_then(|m| Some(m.len()));
35 let logger = logger.new(o!(
36 "peer" => format!("{}", peer),
37 "filename" => filename,
38 ));
39 match send_to(
40 &mut file, len, socket, peer, options, &logger) {
41 Ok(_) => info!(
42 logger, "Completed transfer to {:?}", peer),
43 Err(error) => error!(
44 logger, "Error transferring to {:?}: {}", peer, error),
45 };
46 },
47 Err(error) => {
48 error!(logger, "Problem with file {}: {}", &filename, error);
49 },
51 },
52 Err(error) => {
53 error!(logger, "Could not open socket: {}", error);
54 },
55 };
56}
57
58
59const EMPTY_DATA: Data<'static> = Data(&[]);
60
61
62fn send_to(
63 data: &mut io::Read,
64 len: Option<u64>,
65 socket: net::UdpSocket,
66 peer: net::SocketAddr,
67 options: Options,
68 logger: &slog::Logger,
69)
70 -> io::Result<()>
71{
72 socket.connect(peer)?;
75
76 let mut options_out = Options::new();
77
78 let blksize: usize = match options.blksize {
79 Some(blksize) if blksize >= 512 => {
80 options_out.blksize = Some(blksize);
81 blksize as usize
82 },
83 _ => 512, };
85
86 socket.set_read_timeout(
87 Some(match options.timeout {
88 Some(timeout) if timeout >= 1 => {
89 options_out.timeout = Some(timeout);
90 time::Duration::from_secs(timeout as u64)
91 },
92 _ => {
93 time::Duration::from_secs(8u64) },
95 })
96 )?;
97
98 match options.tsize {
99 Some(0) => {
100 options_out.tsize = len;
101 },
102 Some(tsize) => {
103 warn!(logger, "Option tsize should be zero, got: {}", tsize);
104 },
105 None => {
106 },
108 };
109
110 let mut bufout = vec![0u8; 4 + blksize]; let mut bufin = vec![0u8; blksize];
112
113 if options_out.is_set() {
114 let packet = Packet::OAck(options_out);
115 let size = packet.write(&mut bufout)?;
116 socket.send(&bufout[..size])?;
117 info!(logger, "Sent OACK ({} bytes) to {}.", size, &peer);
118 }
120
121 fn timed_out(error: &io::Error) -> bool {
122 error.kind() == io::ErrorKind::WouldBlock ||
125 error.kind() == io::ErrorKind::TimedOut
126 }
127
128 'send: for blkno in (1 as u16).. {
129 let mut timeouts = 0u8;
130 match data.read(&mut bufout[4..]) {
131 Ok(size) => {
132 let packet = Packet::Data(BlockNum(blkno), EMPTY_DATA);
136 packet.write(&mut bufout[..4])?;
137 socket.send(&bufout[..size + 4])?;
138 info!(logger, "Sent DATA ({} bytes) to {}.", size, &peer);
139
140 'recv: loop {
141 match socket.recv(&mut bufin) {
142 Ok(amt) => {
143 match Packet::parse(&mut bufin[..amt]) {
144 Ok(packet) => match packet {
145 Packet::Ack(BlockNum(blocknum)) => {
146 if blocknum == blkno {
147 break 'recv;
148 };
149 },
150 Packet::Error(code, message) => {
151 error!(logger, "{:?}: {:?}", code, message);
152 break 'send;
153 },
154 Packet::Data(..) => warn!(
155 logger, "Ignoring unexpected DATA packet."),
156 Packet::Read(..) => warn!(
157 logger, "Ignoring unexpected RRQ packet."),
158 Packet::Write(..) => warn!(
159 logger, "Ignoring unexpected WRQ packet."),
160 Packet::OAck(..) => warn!(
161 logger, "Ignoring unexpected OACK packet."),
162 },
163 Err(error) => {
164 warn!(
165 logger, "Ignoring mangled packet ({:?}).",
166 error);
167 },
168 };
169 },
170 Err(ref error) if timed_out(error) => {
171 match timeouts {
172 0...7 => {
173 timeouts += 1;
174 socket.send(&bufout[..size + 4])?;
175 info!(
176 logger,
177 "Sent DATA ({} bytes) to {} (attempt #{}).",
178 size, &peer, timeouts + 1);
179 },
180 _ => {
181 error!(logger, "Too many time-outs; aborting");
182 break 'send;
183 },
184 };
185 },
186 Err(error) => {
187 error!(logger, "Error receiving packet: {}", error);
188 break 'send;
189 },
190 }
191 }
192
193 if size < blksize {
194 break;
195 }
196 },
197 Err(error) => {
198 let packet = Packet::Error(
199 ErrorCode::NotDefined, ErrorMessage(format!(
200 "Something broke: {}\0", error)));
201
202 match packet.write(&mut bufout) {
203 Ok(length) => {
204 socket.send(&bufout[..length])?;
205 },
206 Err(error) => {
207 error!(
208 logger, "Error preparing error packet: {:?}",
209 error);
210 },
211 };
212
213 break 'send;
214 },
215 }
216 };
217 Result::Ok(())
218}