1extern crate serde;
2#[macro_use] extern crate serde_derive;
3
4extern crate num;
5#[macro_use] extern crate num_derive;
6
7pub mod stream_read;
8pub mod stream_write;
9
10use std::fmt;
11use std::fs::File;
12use std::io::BufReader;
13use std::net::{TcpListener, TcpStream, UdpSocket, SocketAddrV4};
14use std::error::Error;
15use std::str::FromStr;
16
17use bytes::BytesMut;
18
19use crate::stream_write::*;
20use crate::stream_read::*;
21
22
23#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct StreamSettings {
26 #[serde(default)]
27 pub file: FileSettings,
28
29 #[serde(default)]
30 pub tcp_client: TcpClientSettings,
31
32 #[serde(default)]
33 pub tcp_server: TcpServerSettings,
34
35 #[serde(default)]
36 pub udp: UdpSettings,
37}
38
39impl StreamSettings {
40 pub fn open_input(&self, input_option: &StreamOption) -> Result<ReadStream, String> {
41 let result;
42
43 match input_option {
44 StreamOption::File => {
45 result = self.file.open_read_stream();
46 },
47
48 StreamOption::TcpClient => {
49 result = self.tcp_client.open_read_stream();
50 },
51
52 StreamOption::TcpServer => {
53 result = self.tcp_server.open_read_stream();
54 },
55
56 StreamOption::Udp => {
57 result = self.udp.open_read_stream();
58 },
59 }
60
61 result
62 }
63
64 pub fn open_output(&self, output_option: &StreamOption) -> Result<WriteStream, String> {
65 let result: Result<WriteStream, String>;
66
67 match output_option {
68 StreamOption::File => {
69 result = self.file.open_write_stream();
70 },
71
72 StreamOption::TcpClient => {
73 result = self.tcp_client.open_write_stream();
74 },
75
76 StreamOption::TcpServer => {
77 result = self.tcp_server.open_write_stream();
78 },
79
80 StreamOption::Udp => {
81 result = self.udp.open_write_stream();
82 },
83 }
84
85 result
86 }
87}
88
89
90#[derive(FromPrimitive, Debug, PartialEq, Eq, Copy, Clone, Serialize, Deserialize)]
92pub enum StreamOption {
93 File = 1,
95 TcpClient = 2,
97 TcpServer = 3,
99 Udp = 4,
101}
102
103impl Default for StreamOption {
104 fn default() -> StreamOption {
105 return StreamOption::File;
106 }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct FileSettings {
114 pub file_name: String,
115}
116
117impl Default for FileSettings {
118 fn default() -> Self {
119 FileSettings { file_name: "data.bin".to_string() }
120 }
121}
122
123impl fmt::Display for FileSettings {
124 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125 write!(f, "file:{}", self.file_name)
126 }
127}
128
129impl FromStr for FileSettings {
130 type Err = StreamSettingsParseError;
131 fn from_str(s: &str) -> Result<FileSettings, StreamSettingsParseError> {
132 let prefix = "file:";
133 if s.starts_with(prefix) {
134 Ok(FileSettings { file_name: s[prefix.len()..].to_string() })
135 } else {
136 Err(StreamSettingsParseError(()))
137 }
138 }
139}
140
141impl FileSettings {
142 pub fn open_read_stream(&self) -> Result<ReadStream, String> {
143 let result = File::open(self.file_name.clone())
144 .map(|file| ReadStream::File(BufReader::new(file)))
145 .map_err(|err| format!("File open error for reading: {}", err));
146
147 return result;
148 }
149
150 pub fn open_write_stream(&self) -> Result<WriteStream, String> {
151 let result = File::create(self.file_name.clone())
152 .map(|outfile| WriteStream::File(outfile))
153 .map_err(|err| format!("File open error for writing: {}", err));
154
155 return result;
156 }
157}
158
159#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub struct TcpClientSettings {
163 pub port: u16,
164 pub ip: String,
165}
166
167impl Default for TcpClientSettings {
168 fn default() -> Self {
169 TcpClientSettings { port: 8000,
170 ip: "127.0.0.1".to_string()
171 }
172 }
173}
174
175impl fmt::Display for TcpClientSettings {
176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 write!(f, "tcp_client:{}:{}", self.ip, self.port)
178 }
179}
180
181impl FromStr for TcpClientSettings {
182 type Err = StreamSettingsParseError;
183 fn from_str(s: &str) -> Result<TcpClientSettings, StreamSettingsParseError> {
184 let prefix = "tcp_client:";
185 if s.starts_with(prefix) {
186 let mut parts = s[prefix.len()..].split(':');
187 let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
188 let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
189 let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
190 Ok(TcpClientSettings { ip: addr.to_string(), port: port })
191 } else {
192 Err(StreamSettingsParseError(()))
193 }
194 }
195}
196
197impl TcpClientSettings {
198 pub fn open_read_stream(&self) -> Result<ReadStream, String> {
199 let addr = SocketAddrV4::new(self.ip.parse().unwrap(),
200 self.port);
201 let result = TcpStream::connect(&addr)
202 .map(|sock| ReadStream::Tcp(sock))
203 .map_err(|err| format!("TCP Client Open Error: {}", err));
204
205 return result;
206 }
207
208 pub fn open_write_stream(&self) -> Result<WriteStream, String> {
209 let addr = SocketAddrV4::new(self.ip.parse().unwrap(),
210 self.port);
211
212 let result = TcpStream::connect(&addr)
213 .map(|sock| WriteStream::Tcp(sock))
214 .map_err(|err| format!("TCP Client Open Error: {}", err));
215
216 return result;
217 }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct TcpServerSettings {
224 pub port: u16,
225 pub ip: String,
226}
227
228impl Default for TcpServerSettings {
229 fn default() -> Self {
230 TcpServerSettings { port: 8000,
231 ip: "127.0.0.1".to_string()
232 }
233 }
234}
235
236impl fmt::Display for TcpServerSettings {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 write!(f, "tcp_server:{}:{}", self.ip, self.port)
239 }
240}
241
242impl FromStr for TcpServerSettings {
243 type Err = StreamSettingsParseError;
244 fn from_str(s: &str) -> Result<TcpServerSettings, StreamSettingsParseError> {
245 let prefix = "tcp_client:";
246 if s.starts_with(prefix) {
247 let mut parts = s[prefix.len()..].split(':');
248 let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
249 let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
250 let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
251 Ok(TcpServerSettings { ip: addr.to_string(), port: port })
252 } else {
253 Err(StreamSettingsParseError(()))
254 }
255 }
256}
257
258impl TcpServerSettings {
259 pub fn open_read_stream(&self) -> Result<ReadStream, String> {
260 let addr = SocketAddrV4::new(self.ip.parse().unwrap(), self.port);
261 let listener = TcpListener::bind(&addr).unwrap();
262 let (sock, _) = listener.accept().map_err(|err| format!("TCP Server Open Error: {}", err))?;
263 return Ok(ReadStream::Tcp(sock));
264 }
265
266 pub fn open_write_stream(&self) -> Result<WriteStream, String> {
267 let addr = SocketAddrV4::new(self.ip.parse().unwrap(), self.port);
268 let listener = TcpListener::bind(&addr).unwrap();
269
270 let result = listener.accept()
271 .map(|(sock, _)| WriteStream::Tcp(sock))
272 .map_err(|err| format!("TCP Server Open Error: {}", err));
273
274 return result;
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
281pub struct UdpSettings {
282 pub port: u16,
283 pub ip: String,
284}
285
286impl Default for UdpSettings {
287 fn default() -> Self {
288 UdpSettings { port: 8001,
289 ip: "127.0.0.1".to_string()
290 }
291 }
292}
293
294impl fmt::Display for UdpSettings {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 write!(f, "udp:{}:{}", self.ip, self.port)
297 }
298}
299
300impl FromStr for UdpSettings {
301 type Err = StreamSettingsParseError;
302 fn from_str(s: &str) -> Result<UdpSettings, StreamSettingsParseError> {
303 let prefix = "tcp_client:";
304 if s.starts_with(prefix) {
305 let mut parts = s[prefix.len()..].split(':');
306 let addr = parts.next().ok_or(StreamSettingsParseError(()))?;
307 let port_str = parts.next().ok_or(StreamSettingsParseError(()))?;
308 let port = port_str.parse::<u16>().map_err(|_| StreamSettingsParseError(()))?;
309 Ok(UdpSettings { ip: addr.to_string(), port: port })
310 } else {
311 Err(StreamSettingsParseError(()))
312 }
313 }
314}
315
316
317impl UdpSettings {
318 pub fn open_read_stream(&self) -> Result<ReadStream, String> {
319 let sock = UdpSocket::bind("0.0.0.0:0").map_err(|_err| "Couldn't bind to udp address/port")?;
320 return Ok(ReadStream::Udp(sock));
321 }
322
323 pub fn open_write_stream(&self) -> Result<WriteStream, String> {
324 let result;
325
326 match self.ip.parse() {
327 Ok(ip_addr) => {
328 let addr = SocketAddrV4::new(ip_addr, self.port);
329
330 result = UdpSocket::bind("0.0.0.0:0")
331 .map(|udp_sock| WriteStream::Udp((udp_sock, addr)))
332 .map_err(|err| format!("Could not open UDP socket for writing: {}", err));
333 },
334
335 Err(e) => {
336 result = Err(format!("Could not parse ip ({}): {}", self.ip, e));
337 },
338 }
339
340 return result;
341 }
342}
343
344
345#[derive(Debug)]
351pub enum ReadStream {
352 File(BufReader<File>),
353 Udp(UdpSocket),
354 Tcp(TcpStream),
355 Null,
356}
357
358impl Default for ReadStream {
359 fn default() -> ReadStream {
360 return ReadStream::Null;
361 }
362}
363
364impl FromStr for ReadStream {
365 type Err = String;
366 fn from_str(read_stream_desc: &str) -> Result<ReadStream, String> {
367 let result;
368
369 if let Ok(file_settings) = FileSettings::from_str(read_stream_desc) {
370 result = file_settings.open_read_stream();
371 } else if let Ok(udp_settings) = UdpSettings::from_str(read_stream_desc) {
372 result = udp_settings.open_read_stream();
373 } else if let Ok(tcp_server_settings) = TcpServerSettings::from_str(read_stream_desc) {
374 result = tcp_server_settings.open_read_stream();
375 } else if let Ok(tcp_client_settings) = TcpClientSettings::from_str(read_stream_desc) {
376 result = tcp_client_settings.open_read_stream();
377 } else {
378 result = Err("No matching stream settings!".to_string());
379 }
380
381 return result;
382 }
383}
384
385impl ReadStream {
386 pub fn stream_read(&mut self,
387 bytes: &mut BytesMut,
388 num_bytes: usize) -> StreamReadResult {
389
390 let result: StreamReadResult;
391
392 match self {
393 ReadStream::File(ref mut file) => {
394 result = file.read_bytes(bytes, num_bytes);
395 },
396
397 ReadStream::Udp(udp_sock) => {
398 result = udp_sock.read_bytes(bytes, num_bytes);
400 },
401
402 ReadStream::Tcp(tcp_stream) => {
403 result = tcp_stream.read_bytes(bytes, num_bytes);
404 },
405
406 ReadStream::Null => {
407 result = StreamReadResult::Error("Reading a Null Stream! This should not happen!".to_string());
409 },
410 }
411
412 return result;
413 }
414}
415
416
417#[derive(Debug)]
424pub enum WriteStream {
425 File(File),
426 Udp((UdpSocket, SocketAddrV4)),
427 Tcp(TcpStream),
428 Null,
429}
430
431impl FromStr for WriteStream {
432 type Err = String;
433 fn from_str(write_stream_desc: &str) -> Result<WriteStream, String> {
434 let result;
435
436 if let Ok(file_settings) = FileSettings::from_str(write_stream_desc) {
437 result = file_settings.open_write_stream();
438 } else if let Ok(udp_settings) = UdpSettings::from_str(write_stream_desc) {
439 result = udp_settings.open_write_stream();
440 } else if let Ok(tcp_server_settings) = TcpServerSettings::from_str(write_stream_desc) {
441 result = tcp_server_settings.open_write_stream();
442 } else if let Ok(tcp_client_settings) = TcpClientSettings::from_str(write_stream_desc) {
443 result = tcp_client_settings.open_write_stream();
444 } else {
445 result = Err("No matching stream settings!".to_string());
446 }
447
448 return result;
449 }
450}
451
452impl WriteStream {
453 pub fn stream_send(&mut self, packet: &Vec<u8>) -> Result<usize, String> {
454 let result;
455
456 match self {
457 WriteStream::File(file) => {
458 result = file.write_bytes(&packet);
459 },
460
461 WriteStream::Udp(udp_stream) => {
462 result = udp_stream.write_bytes(&packet);
463 },
464
465 WriteStream::Tcp(tcp_stream) => {
466 result = tcp_stream.write_bytes(&packet);
467 },
468
469 WriteStream::Null => {
470 result = Ok(0);
473 },
474 }
475
476 return result;
477 }
478}
479
480impl Default for WriteStream {
481 fn default() -> WriteStream {
482 return WriteStream::Null;
483 }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq)]
487pub struct StreamSettingsParseError(());
488
489impl fmt::Display for StreamSettingsParseError {
490 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
491 fmt.write_str(self.description())
492 }
493}
494
495impl Error for StreamSettingsParseError {
496 fn description(&self) -> &str {
497 "error parsing stream settings"
498 }
499}
500