allenap_libtftp/
rrq.rs

1extern crate byteorder;
2extern crate slog;
3
4use std::fs;
5use std::net;
6use std::io;
7use std::time;
8
9use super::packet::{
10    BlockNum,
11    Data,
12    ErrorCode,
13    ErrorMessage,
14    Filename,
15    Packet,
16    TransferMode,
17};
18use super::options::Options;
19use super::make_socket;
20
21
22pub fn serve_file(
23    peer: net::SocketAddr,
24    filename: Filename,
25    txmode: TransferMode,
26    options: Options,
27    logger: &slog::Logger,
28) {
29    info!(logger, "Received RRQ: {:?} {:?} {:?}", filename, txmode, options);
30    let Filename(filename) = filename;
31    match make_socket(peer) {
32        Ok(socket) => match fs::File::open(&filename) {
33            Ok(mut file) => {
34                let len = file.metadata().ok().and_then(|m| Some(m.len()));
35                let logger = logger.new(o!(
36                    "peer" => format!("{}", peer),
37                    "filename" => filename,
38                ));
39                match send_to(
40                    &mut file, len, socket, peer, options, &logger) {
41                    Ok(_) => info!(
42                        logger, "Completed transfer to {:?}", peer),
43                    Err(error) => error!(
44                        logger, "Error transferring to {:?}: {}", peer, error),
45                };
46            },
47            Err(error) => {
48                error!(logger, "Problem with file {}: {}", &filename, error);
49                // TODO: Send error to peer.
50            },
51        },
52        Err(error) => {
53            error!(logger, "Could not open socket: {}", error);
54        },
55    };
56}
57
58
59const EMPTY_DATA: Data<'static> = Data(&[]);
60
61
62fn send_to(
63    data: &mut io::Read,
64    len: Option<u64>,
65    socket: net::UdpSocket,
66    peer: net::SocketAddr,
67    options: Options,
68    logger: &slog::Logger,
69)
70    -> io::Result<()>
71{
72    // First, connect the socket to the peer so that we're only sending
73    // and receiving traffic to/from the peer. TODO: Do this earlier?
74    socket.connect(peer)?;
75
76    let mut options_out = Options::new();
77
78    let blksize: usize = match options.blksize {
79        Some(blksize) if blksize >= 512 => {
80            options_out.blksize = Some(blksize);
81            blksize as usize
82        },
83        _ => 512,  // Default.
84    };
85
86    socket.set_read_timeout(
87        Some(match options.timeout {
88            Some(timeout) if timeout >= 1 => {
89                options_out.timeout = Some(timeout);
90                time::Duration::from_secs(timeout as u64)
91            },
92            _ => {
93                time::Duration::from_secs(8u64)  // Default.
94            },
95        })
96    )?;
97
98    match options.tsize {
99        Some(0) => {
100            options_out.tsize = len;
101        },
102        Some(tsize) => {
103            warn!(logger, "Option tsize should be zero, got: {}", tsize);
104        },
105        None => {
106            // Do nothing.
107        },
108    };
109
110    let mut bufout = vec![0u8; 4 + blksize];  // opcode + blkno + data
111    let mut bufin = vec![0u8; blksize];
112
113    if options_out.is_set() {
114        let packet = Packet::OAck(options_out);
115        let size = packet.write(&mut bufout)?;
116        socket.send(&bufout[..size])?;
117        info!(logger, "Sent OACK ({} bytes) to {}.", size, &peer);
118        // TODO: Wait for ACK(0).
119    }
120
121    fn timed_out(error: &io::Error) -> bool {
122        // See the comment in UdpSocket.set_{read,write}_timeout to
123        // understand why both errors are matched.
124        error.kind() == io::ErrorKind::WouldBlock ||
125            error.kind() == io::ErrorKind::TimedOut
126    }
127
128    'send: for blkno in (1 as u16).. {
129        let mut timeouts = 0u8;
130        match data.read(&mut bufout[4..]) {
131            Ok(size) => {
132                // To avoid an extra copy we cheat and use a Data packet
133                // to write headers only. We've already read the payload
134                // into the correct place in `bufout`.
135                let packet = Packet::Data(BlockNum(blkno), EMPTY_DATA);
136                packet.write(&mut bufout[..4])?;
137                socket.send(&bufout[..size + 4])?;
138                info!(logger, "Sent DATA ({} bytes) to {}.", size, &peer);
139
140                'recv: loop {
141                    match socket.recv(&mut bufin) {
142                        Ok(amt) => {
143                            match Packet::parse(&mut bufin[..amt]) {
144                                Ok(packet) => match packet {
145                                    Packet::Ack(BlockNum(blocknum)) => {
146                                        if blocknum == blkno {
147                                            break 'recv;
148                                        };
149                                    },
150                                    Packet::Error(code, message) => {
151                                        error!(logger, "{:?}: {:?}", code, message);
152                                        break 'send;
153                                    },
154                                    Packet::Data(..) => warn!(
155                                        logger, "Ignoring unexpected DATA packet."),
156                                    Packet::Read(..) => warn!(
157                                        logger, "Ignoring unexpected RRQ packet."),
158                                    Packet::Write(..) => warn!(
159                                        logger, "Ignoring unexpected WRQ packet."),
160                                    Packet::OAck(..) => warn!(
161                                        logger, "Ignoring unexpected OACK packet."),
162                                },
163                                Err(error) => {
164                                    warn!(
165                                        logger, "Ignoring mangled packet ({:?}).",
166                                        error);
167                                },
168                            };
169                        },
170                        Err(ref error) if timed_out(error) => {
171                            match timeouts {
172                                0...7 => {
173                                    timeouts += 1;
174                                    socket.send(&bufout[..size + 4])?;
175                                    info!(
176                                        logger,
177                                        "Sent DATA ({} bytes) to {} (attempt #{}).",
178                                        size, &peer, timeouts + 1);
179                                },
180                                _ => {
181                                    error!(logger, "Too many time-outs; aborting");
182                                    break 'send;
183                                },
184                            };
185                        },
186                        Err(error) => {
187                            error!(logger, "Error receiving packet: {}", error);
188                            break 'send;
189                        },
190                    }
191                }
192
193                if size < blksize {
194                    break;
195                }
196            },
197            Err(error) => {
198                let packet = Packet::Error(
199                    ErrorCode::NotDefined, ErrorMessage(format!(
200                        "Something broke: {}\0", error)));
201
202                match packet.write(&mut bufout) {
203                    Ok(length) => {
204                        socket.send(&bufout[..length])?;
205                    },
206                    Err(error) => {
207                        error!(
208                            logger, "Error preparing error packet: {:?}",
209                            error);
210                    },
211                };
212
213                break 'send;
214            },
215        }
216    };
217    Result::Ok(())
218}