1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
extern crate carboxyl; use std::io::{ Read, Write }; use carboxyl::Stream; use drivers::{ ReadDriver, WriteDriver }; mod drivers; #[cfg(test)] mod sync; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct Quit; #[derive(Clone, Debug, PartialEq, Eq)] pub enum Input { Line(String), End, } impl Input { pub fn line(self) -> Option<String> { if let Input::Line(text) = self { Some(text) } else { None } } pub fn end(self) -> Option<Quit> { if let Input::End = self { Some(Quit) } else { None } } } pub fn run<R, W, P>(reader: R, writer: W, program: P) where R: 'static + Send + Read, W: 'static + Send + Write, P: Fn(Stream<Input>) -> (Stream<String>, Stream<Quit>) { let read_driver = ReadDriver::new(reader); let write_driver = WriteDriver::new(writer); let (outputs, quit) = program(read_driver.stream()); let mut quit_stream = quit.events(); write_driver.drive(outputs); read_driver.drive(); quit_stream.next(); } #[cfg(test)] mod test { use std::thread; use std::io::Cursor; use std::sync::{ Arc, Mutex }; use std::time::Duration; use carboxyl::Stream; use super::*; use ::sync::{ SyncWriter, check_timeout }; #[test] fn runs_echo_application() { let sample = b"abc\n"; let writer = SyncWriter::new(); run( Cursor::new(sample), writer.clone(), |inputs| (inputs.filter_map(Input::line), inputs.filter_map(Input::end)) ); check_timeout(|| &(*writer.contents().unwrap())[..] == sample, 100); } #[test] fn runs_forever_without_end_of_input() { let flag = Arc::new(Mutex::new(false)); thread::spawn({ let flag = flag.clone(); move || { run( Cursor::new(b""), SyncWriter::new(), |_| (Stream::never(), Stream::never()) ); *flag.lock().unwrap() = true; } }); thread::sleep(Duration::from_millis(5)); assert!(!*flag.lock().unwrap()); } }