pipeawesome2/buffer.rs
1use crate::motion::PullJourney;
2use crate::connectable::Breakable;
3use crate::motion::Journey;
4use crate::connectable::OutputPort;
5use crate::connectable::ConnectableAddOutputError;
6use crate::connectable::ConnectableAddInputError;
7use crate::connectable::Connectable;
8use async_std::{channel::SendError, prelude::*};
9
10use async_std::channel::{bounded, unbounded, Receiver, Sender };
11use crate::motion::{MotionNotifications};
12
13use super::motion::{ motion, MotionError, MonitorMessage, MotionResult, Pull, Push, };
14use crate::back_off::BackOff;
15
16use crate::startable_control::StartableControl;
17use async_trait::async_trait;
18
19#[derive(PartialEq,Debug)]
20pub struct BufferSizeMessage(pub usize);
21
22pub struct Buffer {
23 stdout_size: usize,
24 stdout: Option<Push>,
25 stdin: Option<Pull>,
26 buffer_size_monitor: Option<Sender<BufferSizeMessage>>,
27}
28
29
30impl Connectable for Buffer {
31
32 fn add_output(&mut self, port: OutputPort, breakable: Breakable, src_id: usize, dst_id: usize) -> std::result::Result<Pull, ConnectableAddOutputError> {
33 if self.stdout.is_some() { return Err(ConnectableAddOutputError::AlreadyAllocated(port)); }
34 let (child_stdout_push_channel, stdout_io_reciever_channel) = bounded(self.stdout_size);
35 self.stdout = Some(Push::IoSender(Journey { src: src_id, dst: dst_id, breakable }, child_stdout_push_channel));
36 Ok(Pull::Receiver(PullJourney { src: src_id, dst: dst_id }, stdout_io_reciever_channel))
37 }
38
39 fn add_input(&mut self, pull: Pull, unused_priority: isize) -> std::result::Result<(), ConnectableAddInputError> {
40 if unused_priority != 0 {
41 return Err(ConnectableAddInputError::UnsupportedPriority(unused_priority));
42 }
43 if self.stdin.is_some() {
44 return Err(ConnectableAddInputError::AlreadyAllocated);
45 }
46 self.stdin = Some(pull);
47 Ok(())
48 }
49
50}
51
52
53#[allow(clippy::new_without_default)]
54impl Buffer {
55 pub fn new() -> Buffer {
56 Buffer {
57 stdout_size: 8,
58 stdin: None,
59 stdout: None,
60 buffer_size_monitor: None,
61 }
62 }
63
64 pub fn add_buffer_size_monitor(&mut self) -> Receiver<BufferSizeMessage> {
65 assert!(self.buffer_size_monitor.is_none(), "Each buffer can only be monitored once");
66 let (tx, rx) = bounded(self.stdout_size);
67 self.buffer_size_monitor = Some(tx);
68 rx
69 }
70
71 pub fn set_stdout_size(&mut self, size: usize) {
72 self.stdout_size = size;
73 }
74
75}
76
77
78#[async_trait]
79impl StartableControl for Buffer {
80 async fn start(&mut self) -> MotionResult<usize> {
81
82 let (unbounded_snd, unbounded_rcv) = unbounded();
83 let (monitor_i_snd, monitor_i_rcv): (Sender<MonitorMessage>, Receiver<MonitorMessage>) = bounded(8);
84 let (monitor_o_snd, monitor_o_rcv): (Sender<MonitorMessage>, Receiver<MonitorMessage>) = bounded(8);
85
86 let stdin = std::mem::take(&mut self.stdin).ok_or(MotionError::NoneError)?;
87 let stdout = std::mem::take(&mut self.stdout).ok_or(MotionError::NoneError)?;
88
89 let push_a = Push::Sender(
90 Journey { src: 0, dst: 0, breakable: stdout.journey().ok_or(MotionError::NoneError)?.breakable },
91 unbounded_snd
92 );
93 let pull_b = Pull::Receiver(
94 PullJourney { src: 0, dst: 0 },
95 unbounded_rcv
96 );
97
98 let r_a = motion(
99 stdin,
100 MotionNotifications::written(monitor_i_snd),
101 push_a
102 );
103 let r_b = motion(
104 pull_b,
105 MotionNotifications::read(monitor_o_snd),
106 stdout,
107 );
108
109 async fn total_in_buffer(sender: Option<Sender<BufferSizeMessage>>, m_in: Receiver<MonitorMessage>, m_out: Receiver<MonitorMessage>) -> Result<usize, SendError<BufferSizeMessage>> {
110 let mut size: usize = 0;
111 let mut back_off = BackOff::new();
112 let mut last = 0;
113 loop {
114 let mut buffer_movement = false;
115 match m_in.try_recv() {
116 Err(async_std::channel::TryRecvError::Empty) => {
117 },
118 Err(async_std::channel::TryRecvError::Closed) => (),
119 Ok(MonitorMessage::Wrote(_)) => {
120 // println!("+BUF");
121 buffer_movement = true;
122 size += 1;
123 }
124 Ok(MonitorMessage::Read(_)) => {
125 panic!("SHOULD NOT BE HERE");
126 }
127 }
128 match m_out.try_recv() {
129 Err(async_std::channel::TryRecvError::Empty) => {
130 },
131 Err(async_std::channel::TryRecvError::Closed) => (),
132 Ok(MonitorMessage::Read(_)) => {
133 // println!("-BUF");
134 buffer_movement = true;
135 size -= 1;
136 }
137 Ok(MonitorMessage::Wrote(_)) => {
138 panic!("SHOULD NOT BE HERE");
139 }
140 }
141 match (last != size, &sender) {
142 (true, Some(s)) => {
143 last = size;
144 s.send(BufferSizeMessage(size)).await
145 },
146 _ => Ok(()),
147 }?;
148 if m_in.is_empty() && m_in.is_closed() && m_out.is_empty() && m_out.is_closed() {
149 return Ok(size as usize);
150 }
151 match buffer_movement {
152 false => {
153 back_off.wait().await;
154 },
155 true => {
156 back_off.reset();
157 },
158 };
159 }
160 }
161
162 let r_out_prep = r_a.join(r_b).join(
163 total_in_buffer(std::mem::take(&mut self.buffer_size_monitor), monitor_i_rcv, monitor_o_rcv)
164 ).await;
165
166 fn structure_motion_result(input: ((MotionResult<usize>, MotionResult<usize>), Result<usize, SendError<BufferSizeMessage>>)) -> MotionResult<usize> {
167 match input {
168 ((MotionResult::Ok(stdin_count), MotionResult::Ok(_)), _x) => Ok(stdin_count),
169 _ => Err(MotionError::NoneError),
170 }
171 }
172
173 match structure_motion_result(r_out_prep) {
174 Ok(x) => Ok(x),
175 Err(x) => Err(x)
176 }
177
178 }
179}
180
181
182#[test]
183fn do_stuff() {
184
185 use crate::motion::IOData;
186 use crate::connectable::Breakable;
187
188 pub async fn test_buffer_impl() -> MotionResult<usize> {
189 use std::collections::VecDeque;
190
191 async fn read_data(mut output: Pull) -> Vec<IOData> {
192 let mut v: Vec<IOData> = vec![];
193 async_std::task::sleep(std::time::Duration::from_millis(100)).await;
194 loop {
195 let x: MotionResult<crate::motion::IODataWrapper> = crate::motion::motion_read(&mut output, false).await;
196 match x {
197 Ok(crate::motion::IODataWrapper::Finished) => {
198 return v;
199 }
200 Ok(crate::motion::IODataWrapper::IOData(x)) => {
201 v.push(x)
202 }
203 _ => {
204 return vec![];
205 }
206 }
207 }
208 }
209
210 async fn read_monitoring<X>(output: Receiver<X>) -> Vec<X> {
211 let mut v: Vec<X> = vec![];
212 loop {
213 match output.recv().await {
214 Ok(x) => {
215 v.push(x);
216 },
217 Err(async_std::channel::RecvError) => {
218 return v;
219 }
220 }
221 }
222 }
223
224 fn get_input() -> VecDeque<IOData> {
225 let mut vdq: VecDeque<IOData> = VecDeque::new();
226 vdq.push_front(IOData(vec![68; 255]));
227 vdq.push_front(IOData(vec![67; 255]));
228 vdq.push_front(IOData(vec![66; 255]));
229 vdq.push_front(IOData(vec![65; 255]));
230 vdq
231 }
232
233 let input = Pull::Mock(PullJourney { src: 0, dst: 0 }, get_input());
234 let mut buffer = Buffer::new();
235 buffer.set_stdout_size(1);
236 buffer.add_input(input, 0).unwrap();
237 let output = buffer.add_output(OutputPort::Out, Breakable::Terminate, 0, 0).unwrap();
238 let monitoring = buffer.add_buffer_size_monitor();
239 let buffer_motion = buffer.start();
240 match buffer_motion.join(read_data(output)).join(read_monitoring(monitoring)).await {
241 ((Ok(proc_count), v), monitoring_msg) => {
242 assert_eq!(
243 vec![
244 IOData(vec![65; 255]),
245 IOData(vec![66; 255]),
246 IOData(vec![67; 255]),
247 IOData(vec![68; 255]),
248 ],
249 v
250 );
251
252 assert_eq!(monitoring_msg.last(), Some(&BufferSizeMessage(0)));
253
254 Ok(proc_count)
255 },
256 _ => {
257 panic!("should have succeeded");
258 }
259 }
260 }
261
262 use async_std::task;
263 println!("R: {:?}", task::block_on(test_buffer_impl()));
264}
265
266// struct BufferReturn {
267// stdout: Push, // Pull::IoReceiver || Pull::None
268// stderr: Push, // Pull::IoReceiver || Pull::None
269// future: Future<Output = ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)>,
270// }
271//
272//
273// impl <E: IntoIterator<Item = (K, V)>,
274// A: IntoIterator<Item = R>,
275// R: AsRef<OsStr>,
276// O: AsRef<OsStr>,
277// K: AsRef<OsStr>,
278// V: AsRef<OsStr>,
279// P: AsRef<Path>> Buffer {
280// fn new(
281// stdin: Option<PullConfiguration>,
282// launch_spec: Buffer<E, P, O, A, K, V, R>,
283// ) -> Buffer {
284// Buffer {
285// stdout: None,
286// stderr: None,
287// stdin,
288// launch_spec,
289// }
290// }
291// }
292//
293
294// async fn get_command<E: IntoIterator<Item = (K, V)>,
295// A: IntoIterator<Item = R>,
296// R: AsRef<OsStr>,
297// O: AsRef<OsStr>,
298// K: AsRef<OsStr>,
299// V: AsRef<OsStr>,
300// P: AsRef<Path>>(stdin: Option<PullConfiguration>, launch_spec: Buffer<E, P, O, A, K, V, R>, outputs: BufferOutputs, monitoring: Sender<MonitorMessage>) -> BufferReturn
301// {
302//
303// let outputs: (bool, bool) = match outputs {
304// BufferOutputs::STDOUT => (true, false),
305// BufferOutputs::STDOUT_AND_STDERR => (true, true),
306// BufferOutputs::STDERR => (false, true),
307// };
308//
309// let current_path: &Path = std::env::current_dir().expect("Unable to identify current $PATH").as_path();
310// let cmd = &launch_spec.command;
311//
312// let mut child_builder = aip::Command::new(cmd);
313//
314// child_builder.stdin(if stdin.is_some() { std::process::Stdio::piped() } else { std::process::Stdio::null() } );
315// child_builder.stderr(if outputs.1 { std::process::Stdio::piped() } else { std::process::Stdio::null() });
316// child_builder.stdout(if outputs.0 { std::process::Stdio::piped() } else { std::process::Stdio::null() });
317//
318// match launch_spec.path {
319// Some(p) => { child_builder.current_dir(p); },
320// None => ()
321// };
322//
323// match launch_spec.env {
324// Some(env) => { child_builder.envs(env.into_iter()); }
325// None => { child_builder.envs(std::env::vars_os()); }
326// }
327//
328// match launch_spec.args {
329// Some(args) => { child_builder.args(args); },
330// None => ()
331// };
332//
333// let child = child_builder.spawn().unwrap();
334//
335//
336// let mut child_stdin_pull = [match stdin {
337// Some(stdin) => { stdin },
338// None => PullConfiguration { priority: 0, id: 0, pull: Pull::None }
339// }];
340//
341// let mut child_stdin_push = [match child.stdin {
342// Some(stdin) => Push::CmdStdin(stdin),
343// None => Push::None,
344// }];
345//
346// // let mut io_sender = [];
347// let r1 = motion(&mut child_stdin_pull, monitoring.clone(), &mut child_stdin_push);
348//
349// let mut child_stdout_pull = [match child.stdout {
350// Some(stdout) => PullConfiguration { priority: 2, id: 2, pull: Pull::CmdStdout(stdout) },
351// None => PullConfiguration { priority: 2, id: 2, pull: Pull::None },
352// }];
353//
354// let mut child_stderr_pull = [match child.stderr {
355// Some(stderr) => PullConfiguration { priority: 2, id: 2, pull: Pull::CmdStderr(stderr) },
356// None => PullConfiguration { priority: 2, id: 2, pull: Pull::None },
357// }];
358//
359// let (child_stdout_push_channel, stdout_io_reciever_channel) = bounded(1);
360// let (child_stderr_push_channel, stderr_io_reciever_channel) = bounded(1);
361//
362// let mut child_stdout_push = [Push::IoSender(child_stdout_push_channel)];
363// let mut child_stderr_push = [Push::IoSender(child_stderr_push_channel)];
364//
365// let r2 = motion(&mut child_stdout_pull, monitoring.clone(), &mut child_stdout_push);
366// let r3 = motion(&mut child_stderr_pull, monitoring.clone(), &mut child_stderr_push);
367//
368// fn structure_motion_result(input: ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)) -> MotionResult<usize> {
369// match input {
370// ((MotionResult::Ok(stdin_count), MotionResult::Ok(_)), MotionResult::Ok(_)) => Ok(stdin_count),
371// _ => Err(MotionError::NoneError),
372// }
373// }
374// // let f = structure_motion_result(r1.join(r2).join(r3).await);
375// let f: Future<Output = ((MotionResult<usize>, MotionResult<usize>), MotionResult<usize>)> = r1.join(r2).join(r3);
376//
377// // BufferReturn {
378// // stdout: Push::IoReceiver(stdout_io_reciever_channel),
379// // stderr: Push::IoReceiver(stderr_io_reciever_channel),
380// // future: f,
381// // }
382// // struct CommandStats {
383// // }
384// // let mut cmd_stdin = Push::CmdStdin(cmd.stdin.unwrap());
385// // let mut cmd_stdin = Pull::CmdStderr(child.stderr.unwrap());
386// // let mut cmd_stdout = Pull::CmdStdout(child.stdout.unwrap());
387// f
388// }