backplane/
stream_read.rs

1use std::fs::File;
2use std::io::{Read, BufReader};
3use std::net::{TcpStream, UdpSocket};
4use std::borrow::BorrowMut;
5
6use bytes::BytesMut;
7
8
9// TODO this API does not make blocking vs non-block calls apparent
10// ideally there would be a timeout provided, which could be 0 (non-blocking)
11// a timeout, or infinite (block until data is available). This would cover the
12// case of files which are being written as well as read.
13// TODO this error return of String should be replaced with a error handling strategy
14// TODO this should include serial reading
15// TODO this might include stdin reading
16
17pub enum StreamReadResult {
18    BytesRead(usize),
19    Finished,
20    Error(String),
21}
22
23pub trait StreamRead {
24    fn read_bytes(&mut self, bytes: &mut BytesMut, num_bytes: usize) -> StreamReadResult;
25}
26
27impl StreamRead for TcpStream {
28    fn read_bytes(&mut self, bytes: &mut BytesMut, num_bytes: usize) -> StreamReadResult {
29        read_bytes_from_reader(self, bytes, num_bytes)
30    }
31}
32
33impl StreamRead for BufReader<File> {
34    fn read_bytes(&mut self, bytes: &mut BytesMut, num_bytes: usize) -> StreamReadResult {
35        let result = read_bytes_from_reader(self, bytes, num_bytes);
36
37        if let StreamReadResult::BytesRead(0) = result {
38            // NOTE assumes that the end of the file is the end of the stream, and no new data is
39            // possible!
40            return StreamReadResult::Finished;
41        } else {
42            return result;
43        }
44    }
45}
46
47impl StreamRead for UdpSocket {
48    fn read_bytes(&mut self, bytes: &mut BytesMut, _num_bytes: usize) -> StreamReadResult {
49        // for UDP we just read a message
50        bytes.clear();
51        match self.recv(bytes).map_err(|err| format!("Udp Socket Read Error: {}", err)) {
52            Ok(bytes_read) => {
53                return StreamReadResult::BytesRead(bytes_read);
54            },
55
56            Err(string) => {
57                return StreamReadResult::Error(string);
58            }
59        }
60    }
61}
62
63
64fn read_bytes_from_reader<R: Read>(reader: &mut R, bytes: &mut BytesMut, num_bytes: usize) -> StreamReadResult {
65    let old_len = bytes.len();
66    let new_len = old_len + num_bytes;
67
68    // ensure that there is room for the request number of bytes
69    bytes.reserve(num_bytes);
70
71    // NOTE this zeroing of memory could be avoided with a use of the unsafe function
72    // set_len. This has not been done to avoid use of 'unsafe'.
73    bytes.resize(new_len, 0);
74
75    // retrieve the underlying byte buffer
76    let mut_bytes: &mut [u8] = bytes.borrow_mut();
77
78    // read up to num_bytes bytes from the reader
79    let result = reader.read(&mut mut_bytes[old_len..(old_len + num_bytes)])
80                       .map_err(|err| format!("Stream Read Error: {}", err));
81
82    match result {
83        Ok(bytes_read) => {
84            // if byte were read, set the BytesMut length to reflect the new data available
85            bytes.truncate(old_len + bytes_read);
86            return StreamReadResult::BytesRead(bytes_read);
87        },
88
89        Err(string) => {
90            return StreamReadResult::Error(string);
91        }
92    }
93}
94