pipeawesome2/
buffer.rs

1use crate::motion::PullJourney;
2use crate::connectable::Breakable;
3use crate::motion::Journey;
4use crate::connectable::OutputPort;
5use crate::connectable::ConnectableAddOutputError;
6use crate::connectable::ConnectableAddInputError;
7use crate::connectable::Connectable;
8use async_std::{channel::SendError, prelude::*};
9
10use async_std::channel::{bounded, unbounded, Receiver, Sender };
11use crate::motion::{MotionNotifications};
12
13use super::motion::{ motion, MotionError, MonitorMessage, MotionResult, Pull, Push, };
14use crate::back_off::BackOff;
15
16use crate::startable_control::StartableControl;
17use async_trait::async_trait;
18
19#[derive(PartialEq,Debug)]
20pub struct BufferSizeMessage(pub usize);
21
22pub struct Buffer {
23    stdout_size: usize,
24    stdout: Option<Push>,
25    stdin: Option<Pull>,
26    buffer_size_monitor: Option<Sender<BufferSizeMessage>>,
27}
28
29
30impl Connectable for Buffer {
31
32    fn add_output(&mut self, port: OutputPort, breakable: Breakable, src_id: usize, dst_id: usize) -> std::result::Result<Pull, ConnectableAddOutputError> {
33        if self.stdout.is_some() { return Err(ConnectableAddOutputError::AlreadyAllocated(port)); }
34        let (child_stdout_push_channel, stdout_io_reciever_channel) = bounded(self.stdout_size);
35        self.stdout = Some(Push::IoSender(Journey { src: src_id, dst: dst_id, breakable }, child_stdout_push_channel));
36        Ok(Pull::Receiver(PullJourney { src: src_id, dst: dst_id }, stdout_io_reciever_channel))
37    }
38
39    fn add_input(&mut self, pull: Pull, unused_priority: isize) -> std::result::Result<(), ConnectableAddInputError> {
40        if unused_priority != 0 {
41            return Err(ConnectableAddInputError::UnsupportedPriority(unused_priority));
42        }
43        if self.stdin.is_some() {
44            return Err(ConnectableAddInputError::AlreadyAllocated);
45        }
46        self.stdin = Some(pull);
47        Ok(())
48    }
49
50}
51
52
53#[allow(clippy::new_without_default)]
54impl Buffer {
55    pub fn new() -> Buffer {
56        Buffer {
57            stdout_size: 8,
58            stdin: None,
59            stdout: None,
60            buffer_size_monitor: None,
61        }
62    }
63
64    pub fn add_buffer_size_monitor(&mut self) -> Receiver<BufferSizeMessage> {
65        assert!(self.buffer_size_monitor.is_none(), "Each buffer can only be monitored once");
66        let (tx, rx) = bounded(self.stdout_size);
67        self.buffer_size_monitor = Some(tx);
68        rx
69    }
70
71    pub fn set_stdout_size(&mut self, size: usize) {
72        self.stdout_size = size;
73    }
74
75}
76
77
78#[async_trait]
79impl StartableControl for Buffer {
80    async fn start(&mut self) -> MotionResult<usize> {
81
82        let (unbounded_snd, unbounded_rcv) = unbounded();
83        let (monitor_i_snd, monitor_i_rcv): (Sender<MonitorMessage>, Receiver<MonitorMessage>) = bounded(8);
84        let (monitor_o_snd, monitor_o_rcv): (Sender<MonitorMessage>, Receiver<MonitorMessage>) = bounded(8);
85
86        let stdin = std::mem::take(&mut self.stdin).ok_or(MotionError::NoneError)?;
87        let stdout = std::mem::take(&mut self.stdout).ok_or(MotionError::NoneError)?;
88
89        let push_a = Push::Sender(
90            Journey { src: 0, dst: 0, breakable: stdout.journey().ok_or(MotionError::NoneError)?.breakable },
91            unbounded_snd
92        );
93        let pull_b = Pull::Receiver(
94            PullJourney { src: 0, dst: 0 },
95            unbounded_rcv
96        );
97
98        let r_a = motion(
99            stdin,
100            MotionNotifications::written(monitor_i_snd),
101            push_a
102        );
103        let r_b = motion(
104            pull_b,
105            MotionNotifications::read(monitor_o_snd),
106            stdout,
107        );
108
109        async fn total_in_buffer(sender: Option<Sender<BufferSizeMessage>>, m_in: Receiver<MonitorMessage>, m_out: Receiver<MonitorMessage>) -> Result<usize, SendError<BufferSizeMessage>> {
110            let mut size: usize = 0;
111            let mut back_off = BackOff::new();
112            let mut last = 0;
113            loop {
114                let mut buffer_movement = false;
115                match m_in.try_recv() {
116                    Err(async_std::channel::TryRecvError::Empty) => {
117                    },
118                    Err(async_std::channel::TryRecvError::Closed) => (),
119                    Ok(MonitorMessage::Wrote(_)) => {
120                        // println!("+BUF");
121                        buffer_movement = true;
122                        size += 1;
123                    }
124                    Ok(MonitorMessage::Read(_)) => {
125                        panic!("SHOULD NOT BE HERE");
126                    }
127                }
128                match m_out.try_recv() {
129                    Err(async_std::channel::TryRecvError::Empty) => {
130                    },
131                    Err(async_std::channel::TryRecvError::Closed) => (),
132                    Ok(MonitorMessage::Read(_)) => {
133                        // println!("-BUF");
134                        buffer_movement = true;
135                        size -= 1;
136                    }
137                    Ok(MonitorMessage::Wrote(_)) => {
138                        panic!("SHOULD NOT BE HERE");
139                    }
140                }
141                match (last != size, &sender) {
142                    (true, Some(s)) => {
143                        last = size;
144                        s.send(BufferSizeMessage(size)).await
145                    },
146                    _ => Ok(()),
147                }?;
148                if m_in.is_empty() && m_in.is_closed() && m_out.is_empty() && m_out.is_closed() {
149                    return Ok(size as usize);
150                }
151                match buffer_movement {
152                    false => {
153                        back_off.wait().await;
154                    },
155                    true => {
156                        back_off.reset();
157                    },
158                };
159            }
160        }
161
162        let r_out_prep = r_a.join(r_b).join(
163            total_in_buffer(std::mem::take(&mut self.buffer_size_monitor), monitor_i_rcv, monitor_o_rcv)
164        ).await;
165
166        fn structure_motion_result(input: ((MotionResult<usize>, MotionResult<usize>), Result<usize, SendError<BufferSizeMessage>>)) -> MotionResult<usize> {
167            match input {
168                ((MotionResult::Ok(stdin_count), MotionResult::Ok(_)), _x) => Ok(stdin_count),
169                _ => Err(MotionError::NoneError),
170            }
171        }
172
173        match structure_motion_result(r_out_prep) {
174            Ok(x) => Ok(x),
175            Err(x) => Err(x)
176        }
177
178    }
179}
180
181
182#[test]
183fn do_stuff() {
184
185    use crate::motion::IOData;
186    use crate::connectable::Breakable;
187
188    pub async fn test_buffer_impl() -> MotionResult<usize>  {
189        use std::collections::VecDeque;
190
191        async fn read_data(mut output: Pull) -> Vec<IOData> {
192            let mut v: Vec<IOData> = vec![];
193            async_std::task::sleep(std::time::Duration::from_millis(100)).await;
194            loop {
195                let x: MotionResult<crate::motion::IODataWrapper> = crate::motion::motion_read(&mut output, false).await;
196                match x {
197                    Ok(crate::motion::IODataWrapper::Finished) => {
198                        return v;
199                    }
200                    Ok(crate::motion::IODataWrapper::IOData(x)) => {
201                        v.push(x)
202                    }
203                    _ => {
204                        return vec![];
205                    }
206                }
207            }
208        }
209
210        async fn read_monitoring<X>(output: Receiver<X>) -> Vec<X> {
211            let mut v: Vec<X> = vec![];
212            loop {
213                match output.recv().await {
214                    Ok(x) => {
215                        v.push(x);
216                    },
217                    Err(async_std::channel::RecvError) => {
218                        return v;
219                    }
220                }
221            }
222        }
223
224        fn get_input() -> VecDeque<IOData> {
225            let mut vdq: VecDeque<IOData> = VecDeque::new();
226            vdq.push_front(IOData(vec![68; 255]));
227            vdq.push_front(IOData(vec![67; 255]));
228            vdq.push_front(IOData(vec![66; 255]));
229            vdq.push_front(IOData(vec![65; 255]));
230            vdq
231        }
232
233        let input = Pull::Mock(PullJourney { src: 0, dst: 0 }, get_input());
234        let mut buffer = Buffer::new();
235        buffer.set_stdout_size(1);
236        buffer.add_input(input, 0).unwrap();
237        let output = buffer.add_output(OutputPort::Out, Breakable::Terminate, 0, 0).unwrap();
238        let monitoring = buffer.add_buffer_size_monitor();
239        let buffer_motion = buffer.start();
240        match buffer_motion.join(read_data(output)).join(read_monitoring(monitoring)).await {
241            ((Ok(proc_count), v), monitoring_msg) => {
242                assert_eq!(
243                    vec![
244                        IOData(vec![65; 255]),
245                        IOData(vec![66; 255]),
246                        IOData(vec![67; 255]),
247                        IOData(vec![68; 255]),
248                    ],
249                    v
250                );
251
252                assert_eq!(monitoring_msg.last(), Some(&BufferSizeMessage(0)));
253
254                Ok(proc_count)
255            },
256            _ => {
257                panic!("should have succeeded");
258            }
259        }
260    }
261
262    use async_std::task;
263    println!("R: {:?}", task::block_on(test_buffer_impl()));
264}
265
266// struct BufferReturn {
267//     stdout: Push, // Pull::IoReceiver || Pull::None
268//     stderr: Push, // Pull::IoReceiver || Pull::None
269//     future: Future<Output = ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)>,
270// }
271// 
272// 
273// impl <E: IntoIterator<Item = (K, V)>,
274//           A: IntoIterator<Item = R>,
275//           R: AsRef<OsStr>,
276//           O: AsRef<OsStr>,
277//           K: AsRef<OsStr>,
278//           V: AsRef<OsStr>,
279//           P: AsRef<Path>> Buffer {
280//     fn new(
281//         stdin: Option<PullConfiguration>,
282//         launch_spec: Buffer<E, P, O, A, K, V, R>,
283//     ) -> Buffer {
284//         Buffer {
285//             stdout: None,
286//             stderr: None,
287//             stdin,
288//             launch_spec,
289//         }
290//     }
291// }
292// 
293
294// async fn get_command<E: IntoIterator<Item = (K, V)>,
295//           A: IntoIterator<Item = R>,
296//           R: AsRef<OsStr>,
297//           O: AsRef<OsStr>,
298//           K: AsRef<OsStr>,
299//           V: AsRef<OsStr>,
300//           P: AsRef<Path>>(stdin: Option<PullConfiguration>, launch_spec: Buffer<E, P, O, A, K, V, R>, outputs: BufferOutputs, monitoring: Sender<MonitorMessage>) -> BufferReturn
301// {
302// 
303//     let outputs: (bool, bool) = match outputs {
304//         BufferOutputs::STDOUT => (true, false),
305//         BufferOutputs::STDOUT_AND_STDERR => (true, true),
306//         BufferOutputs::STDERR => (false, true),
307//     };
308// 
309//     let current_path: &Path = std::env::current_dir().expect("Unable to identify current $PATH").as_path();
310//     let cmd = &launch_spec.command;
311// 
312//     let mut child_builder = aip::Command::new(cmd);
313// 
314//     child_builder.stdin(if stdin.is_some() { std::process::Stdio::piped() } else { std::process::Stdio::null() } );
315//     child_builder.stderr(if outputs.1 { std::process::Stdio::piped() } else { std::process::Stdio::null() });
316//     child_builder.stdout(if outputs.0 { std::process::Stdio::piped() } else { std::process::Stdio::null() });
317// 
318//     match launch_spec.path {
319//         Some(p) => { child_builder.current_dir(p); },
320//         None => ()
321//     };
322// 
323//     match launch_spec.env {
324//         Some(env) => { child_builder.envs(env.into_iter()); }
325//         None => { child_builder.envs(std::env::vars_os()); }
326//     }
327// 
328//     match launch_spec.args {
329//         Some(args) => { child_builder.args(args); },
330//         None => ()
331//     };
332// 
333//     let child = child_builder.spawn().unwrap();
334// 
335// 
336//     let mut child_stdin_pull = [match stdin {
337//         Some(stdin) => { stdin },
338//         None => PullConfiguration { priority: 0, id: 0, pull: Pull::None }
339//     }];
340// 
341//     let mut child_stdin_push = [match child.stdin {
342//         Some(stdin) => Push::CmdStdin(stdin),
343//         None => Push::None,
344//     }];
345// 
346//     // let mut io_sender = [];
347//     let r1 = motion(&mut child_stdin_pull, monitoring.clone(), &mut child_stdin_push);
348// 
349//     let mut child_stdout_pull = [match child.stdout {
350//         Some(stdout) => PullConfiguration { priority: 2, id: 2, pull: Pull::CmdStdout(stdout) },
351//         None => PullConfiguration { priority: 2, id: 2, pull: Pull::None },
352//     }];
353// 
354//     let mut child_stderr_pull = [match child.stderr {
355//         Some(stderr) => PullConfiguration { priority: 2, id: 2, pull: Pull::CmdStderr(stderr) },
356//         None => PullConfiguration { priority: 2, id: 2, pull: Pull::None },
357//     }];
358// 
359//     let (child_stdout_push_channel, stdout_io_reciever_channel) = bounded(1);
360//     let (child_stderr_push_channel, stderr_io_reciever_channel) = bounded(1);
361// 
362//     let mut child_stdout_push = [Push::IoSender(child_stdout_push_channel)];
363//     let mut child_stderr_push = [Push::IoSender(child_stderr_push_channel)];
364// 
365//     let r2 = motion(&mut child_stdout_pull, monitoring.clone(), &mut child_stdout_push);
366//     let r3 = motion(&mut child_stderr_pull, monitoring.clone(), &mut child_stderr_push);
367// 
368//     fn structure_motion_result(input: ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)) -> MotionResult<usize> {
369//         match input {
370//             ((MotionResult::Ok(stdin_count), MotionResult::Ok(_)), MotionResult::Ok(_)) => Ok(stdin_count),
371//             _ => Err(MotionError::NoneError),
372//         }
373//     }
374//     // let f = structure_motion_result(r1.join(r2).join(r3).await);
375//     let f: Future<Output = ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)> = r1.join(r2).join(r3);
376// 
377//     // BufferReturn {
378//     //     stdout: Push::IoReceiver(stdout_io_reciever_channel),
379//     //     stderr: Push::IoReceiver(stderr_io_reciever_channel),
380//     //     future: f,
381//     // }
382//     // struct CommandStats {
383//     // }
384//     // let mut cmd_stdin = Push::CmdStdin(cmd.stdin.unwrap());
385//     // let mut cmd_stdin = Pull::CmdStderr(child.stderr.unwrap());
386//     // let mut cmd_stdout = Pull::CmdStdout(child.stdout.unwrap());
387//     f
388// }