backplane/
lib.rs

1extern crate serde;
2#[macro_use] extern crate serde_derive;
3
4extern crate num;
5#[macro_use] extern crate num_derive;
6
7pub mod stream_read;
8pub mod stream_write;
9
10use std::fmt;
11use std::fs::File;
12use std::io::BufReader;
13use std::net::{TcpListener, TcpStream, UdpSocket, SocketAddrV4};
14use std::error::Error;
15use std::str::FromStr;
16
17use bytes::BytesMut;
18
19use crate::stream_write::*;
20use crate::stream_read::*;
21
22
23/// The stream settings are all the settings for all stream types
24#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct StreamSettings {
26    #[serde(default)]
27    pub file: FileSettings,
28
29    #[serde(default)]
30    pub tcp_client: TcpClientSettings,
31
32    #[serde(default)]
33    pub tcp_server: TcpServerSettings,
34
35    #[serde(default)]
36    pub udp: UdpSettings,
37}
38
39impl StreamSettings {
40    pub fn open_input(&self, input_option: &StreamOption) -> Result<ReadStream, String> {
41        let result;
42
43        match input_option {
44            StreamOption::File => {
45                result = self.file.open_read_stream();
46            },
47
48            StreamOption::TcpClient => {
49                result = self.tcp_client.open_read_stream();
50            },
51
52            StreamOption::TcpServer => {
53                result = self.tcp_server.open_read_stream();
54            },
55
56            StreamOption::Udp => {
57                result = self.udp.open_read_stream();
58            },
59        }
60
61        result
62    }
63
64    pub fn open_output(&self, output_option: &StreamOption) -> Result<WriteStream, String> {
65        let result: Result<WriteStream, String>;
66
67        match output_option {
68            StreamOption::File => {
69                result = self.file.open_write_stream();
70            },
71
72            StreamOption::TcpClient => {
73                result = self.tcp_client.open_write_stream();
74            },
75
76            StreamOption::TcpServer => {
77                result = self.tcp_server.open_write_stream();
78            },
79
80            StreamOption::Udp => {
81                result = self.udp.open_write_stream();
82            },
83        }
84
85        result
86    }
87}
88
89
90/// The stream option identifies the desired stream type for reading or writing
91#[derive(FromPrimitive, Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
92pub enum StreamOption {
93    /// The stream is a file
94    File = 1,
95    /// The stream is a TCP client with a given port
96    TcpClient = 2,
97    /// The stream is a TCP server with a given port
98    TcpServer = 3,
99    /// The stream is a UDP socket with a given port
100    Udp = 4,
101}
102
103impl Default for StreamOption {
104    fn default() -> StreamOption {
105        return StreamOption::File;
106    }
107}
108
109/* Input Streams */
110/// The file settings are everything needed to open and read from a file as an input or output
111/// stream
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct FileSettings {
114    pub file_name: String,
115}
116
117impl Default for FileSettings {
118    fn default() -> Self {
119        FileSettings { file_name: "data.bin".to_string() }
120    }
121}
122
123impl fmt::Display for FileSettings {
124    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125        write!(f, "file:{}", self.file_name)
126    }
127}
128
129impl FromStr for FileSettings {
130    type Err = StreamSettingsParseError;
131    fn from_str(s: &str) -> Result<FileSettings, StreamSettingsParseError> {
132        let prefix = "file:";
133        if s.starts_with(prefix) {
134            Ok(FileSettings { file_name: s[prefix.len()..].to_string() })
135        } else {
136            Err(StreamSettingsParseError(()))
137        }
138    }
139}
140
141impl FileSettings {
142    pub fn open_read_stream(&self) -> Result<ReadStream, String> {
143        let result = File::open(self.file_name.clone())
144                       .map(|file| ReadStream::File(BufReader::new(file)))
145                       .map_err(|err| format!("File open error for reading: {}", err));
146
147        return result;
148    }
149
150    pub fn open_write_stream(&self) -> Result<WriteStream, String> {
151        let result = File::create(self.file_name.clone())
152                        .map(|outfile| WriteStream::File(outfile))
153                        .map_err(|err| format!("File open error for writing: {}", err));
154
155        return result;
156    }
157}
158
159/// The tcp client settings are everything needed to open and read from a tcp socket as an input or output
160/// stream as a tcp client
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub struct TcpClientSettings {
163    pub port: u16,
164    pub ip: String,
165}
166
167impl Default for TcpClientSettings {
168    fn default() -> Self {
169        TcpClientSettings { port: 8000,
170                            ip: "127.0.0.1".to_string()
171        }
172    }
173}
174
175impl fmt::Display for TcpClientSettings {
176    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177        write!(f, "tcp_client:{}:{}", self.ip, self.port)
178    }
179}
180
181impl FromStr for TcpClientSettings {
182    type Err = StreamSettingsParseError;
183    fn from_str(s: &str) -> Result<TcpClientSettings, StreamSettingsParseError> {
184        let prefix = "tcp_client:";
185        if s.starts_with(prefix) {
186            let mut parts = s[prefix.len()..].split(':');
187            let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
188            let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
189            let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
190            Ok(TcpClientSettings { ip: addr.to_string(), port: port })
191        } else {
192            Err(StreamSettingsParseError(()))
193        }
194    }
195}
196
197impl TcpClientSettings {
198    pub fn open_read_stream(&self) -> Result<ReadStream, String> {
199        let addr = SocketAddrV4::new(self.ip.parse().unwrap(),
200                                     self.port);
201        let result = TcpStream::connect(&addr)
202                       .map(|sock| ReadStream::Tcp(sock))
203                       .map_err(|err| format!("TCP Client Open Error: {}", err));
204
205        return result;
206    }
207
208    pub fn open_write_stream(&self) -> Result<WriteStream, String> {
209        let addr = SocketAddrV4::new(self.ip.parse().unwrap(),
210                                     self.port);
211
212        let result = TcpStream::connect(&addr)
213                       .map(|sock| WriteStream::Tcp(sock))
214                       .map_err(|err| format!("TCP Client Open Error: {}", err));
215
216        return result;
217    }
218}
219
220/// The tcp server settings are everything needed to open and read from a tcp socket as an input or output
221/// stream as a tcp server
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct TcpServerSettings {
224    pub port: u16,
225    pub ip: String,
226}
227
228impl Default for TcpServerSettings {
229    fn default() -> Self {
230        TcpServerSettings { port: 8000,
231                            ip: "127.0.0.1".to_string()
232        }
233    }
234}
235
236impl fmt::Display for TcpServerSettings {
237    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238        write!(f, "tcp_server:{}:{}", self.ip, self.port)
239    }
240}
241
242impl FromStr for TcpServerSettings {
243    type Err = StreamSettingsParseError;
244    fn from_str(s: &str) -> Result<TcpServerSettings, StreamSettingsParseError> {
245        let prefix = "tcp_client:";
246        if s.starts_with(prefix) {
247            let mut parts = s[prefix.len()..].split(':');
248            let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
249            let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
250            let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
251            Ok(TcpServerSettings { ip: addr.to_string(), port: port })
252        } else {
253            Err(StreamSettingsParseError(()))
254        }
255    }
256}
257
258impl TcpServerSettings {
259    pub fn open_read_stream(&self) -> Result<ReadStream, String> {
260        let addr = SocketAddrV4::new(self.ip.parse().unwrap(), self.port);
261        let listener = TcpListener::bind(&addr).unwrap();
262        let (sock, _) = listener.accept().map_err(|err| format!("TCP Server Open Error: {}", err))?;
263        return Ok(ReadStream::Tcp(sock));
264    }
265
266    pub fn open_write_stream(&self) -> Result<WriteStream, String> {
267        let addr = SocketAddrV4::new(self.ip.parse().unwrap(), self.port);
268        let listener = TcpListener::bind(&addr).unwrap();
269
270        let result = listener.accept()
271                             .map(|(sock, _)| WriteStream::Tcp(sock))
272                             .map_err(|err| format!("TCP Server Open Error: {}", err));
273
274        return result;
275    }
276}
277
278/// The udp settings are everything needed to open a UDP socket and use it as an input or output
279/// stream
280#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
281pub struct UdpSettings {
282    pub port: u16,
283    pub ip: String,
284}
285
286impl Default for UdpSettings {
287    fn default() -> Self {
288        UdpSettings { port: 8001,
289                      ip: "127.0.0.1".to_string()
290        }
291    }
292}
293
294impl fmt::Display for UdpSettings {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        write!(f, "udp:{}:{}", self.ip, self.port)
297    }
298}
299
300impl FromStr for UdpSettings {
301    type Err = StreamSettingsParseError;
302    fn from_str(s: &str) -> Result<UdpSettings, StreamSettingsParseError> {
303        let prefix = "tcp_client:";
304        if s.starts_with(prefix) {
305            let mut parts = s[prefix.len()..].split(':');
306            let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
307            let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
308            let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
309            Ok(UdpSettings { ip: addr.to_string(), port: port })
310        } else {
311            Err(StreamSettingsParseError(()))
312        }
313    }
314}
315
316
317impl UdpSettings {
318    pub fn open_read_stream(&self) -> Result<ReadStream, String> {
319        let sock = UdpSocket::bind("0.0.0.0:0").map_err(|_err| "Couldn't bind to udp address/port")?;
320        return Ok(ReadStream::Udp(sock));
321    }
322
323    pub fn open_write_stream(&self) -> Result<WriteStream, String> {
324        let result;
325
326        match self.ip.parse() {
327            Ok(ip_addr) => {
328                let addr = SocketAddrV4::new(ip_addr, self.port);
329
330                result = UdpSocket::bind("0.0.0.0:0")
331                         .map(|udp_sock| WriteStream::Udp((udp_sock, addr)))
332                         .map_err(|err| format!("Could not open UDP socket for writing: {}", err));
333            },
334
335            Err(e) => {
336                result = Err(format!("Could not parse ip ({}): {}", self.ip, e));
337            },
338        }
339
340        return result;
341    }
342}
343
344
345/* Input/Output Streams */
346/// A read stream is a source of bytes.
347///
348/// This enum allows a caller to return a read stream without using
349/// trait objects.
350#[derive(Debug)]
351pub enum ReadStream {
352    File(BufReader<File>),
353    Udp(UdpSocket),
354    Tcp(TcpStream),
355    Null,
356}
357
358impl Default for ReadStream {
359    fn default() -> ReadStream {
360        return ReadStream::Null;
361    }
362}
363
364impl FromStr for ReadStream {
365    type Err = String;
366    fn from_str(read_stream_desc: &str) -> Result<ReadStream, String> {
367        let result;
368
369        if let Ok(file_settings) = FileSettings::from_str(read_stream_desc) {
370            result = file_settings.open_read_stream();
371        } else if let Ok(udp_settings) = UdpSettings::from_str(read_stream_desc) {
372            result = udp_settings.open_read_stream();
373        } else if let Ok(tcp_server_settings) = TcpServerSettings::from_str(read_stream_desc) {
374            result = tcp_server_settings.open_read_stream();
375        } else if let Ok(tcp_client_settings) = TcpClientSettings::from_str(read_stream_desc) {
376            result = tcp_client_settings.open_read_stream();
377        } else {
378            result = Err("No matching stream settings!".to_string());
379        }
380
381        return result;
382    }
383}
384
385impl ReadStream {
386    pub fn stream_read(&mut self,
387                       bytes: &mut BytesMut,
388                       num_bytes: usize) -> StreamReadResult {
389
390        let result: StreamReadResult;
391
392        match self {
393            ReadStream::File(ref mut file) => {
394                result = file.read_bytes(bytes, num_bytes);
395            },
396
397            ReadStream::Udp(udp_sock) => {
398                // for UDP we just read a message
399                result = udp_sock.read_bytes(bytes, num_bytes);
400            },
401
402            ReadStream::Tcp(tcp_stream) => {
403                result = tcp_stream.read_bytes(bytes, num_bytes);
404            },
405
406            ReadStream::Null => {
407                // TODO is this an error, or should it just always return no bytes?
408                result = StreamReadResult::Error("Reading a Null Stream! This should not happen!".to_string());
409            },
410        }
411
412        return result;
413    }
414}
415
416
417/// A write stream, wrapped in an enum to allow multiple write streams to be
418/// returned from functions while still allowing the calling function to 
419/// defer the choice of stream.
420///
421/// This is the closed, static way to do this- the open, dynamic way would
422/// be trait objects.
423#[derive(Debug)]
424pub enum WriteStream {
425    File(File),
426    Udp((UdpSocket, SocketAddrV4)),
427    Tcp(TcpStream),
428    Null,
429}
430
431impl FromStr for WriteStream {
432    type Err = String;
433    fn from_str(write_stream_desc: &str) -> Result<WriteStream, String> {
434        let result;
435
436        if let Ok(file_settings) = FileSettings::from_str(write_stream_desc) {
437            result = file_settings.open_write_stream();
438        } else if let Ok(udp_settings) = UdpSettings::from_str(write_stream_desc) {
439            result = udp_settings.open_write_stream();
440        } else if let Ok(tcp_server_settings) = TcpServerSettings::from_str(write_stream_desc) {
441            result = tcp_server_settings.open_write_stream();
442        } else if let Ok(tcp_client_settings) = TcpClientSettings::from_str(write_stream_desc) {
443            result = tcp_client_settings.open_write_stream();
444        } else {
445            result = Err("No matching stream settings!".to_string());
446        }
447
448        return result;
449    }
450}
451
452impl WriteStream {
453    pub fn stream_send(&mut self, packet: &Vec<u8>) -> Result<usize, String> {
454        let result;
455
456        match self {
457            WriteStream::File(file) => {
458                result = file.write_bytes(&packet);
459            },
460
461            WriteStream::Udp(udp_stream) => {
462                result = udp_stream.write_bytes(&packet);
463            },
464
465            WriteStream::Tcp(tcp_stream) => {
466                result = tcp_stream.write_bytes(&packet);
467            },
468
469            WriteStream::Null => {
470                // TODO should this be a sink like /dev/null, and 'write' all bytes, or
471                // should it write 0 bytes?
472                result = Ok(0);
473            },
474        }
475
476        return result;
477    }
478}
479
480impl Default for WriteStream {
481    fn default() -> WriteStream {
482        return WriteStream::Null;
483    }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq)]
487pub struct StreamSettingsParseError(());
488
489impl fmt::Display for StreamSettingsParseError {
490    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
491        fmt.write_str(self.description())
492    }
493}
494
495impl Error for StreamSettingsParseError {
496    fn description(&self) -> &str {
497        "error parsing stream settings"
498    }
499}
500