cog_task/action/core/
process.rs

1use crate::action::{Action, ActionSignal, Props, StatefulAction, DEFAULT, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, LoggerSignal, ResourceAddr, ResourceManager, ResourceValue};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use eyre::{eyre, Context, Error, Result};
6use serde::{Deserialize, Serialize};
7use serde_cbor::Value;
8use std::collections::{BTreeMap, BTreeSet};
9use std::io::{BufRead, BufReader, Write};
10use std::path::PathBuf;
11use std::process::{Child, ChildStdin, Command, Stdio};
12use std::sync::mpsc::{self, Receiver, RecvError, TryRecvError};
13use std::sync::{Arc, Mutex};
14use std::thread;
15use std::time::Instant;
16
17#[derive(Debug, Deserialize, Serialize)]
18pub struct Process {
19    #[serde(default)]
20    name: String,
21    src: PathBuf,
22    #[serde(default)]
23    args: Vec<String>,
24    #[serde(default)]
25    passive: bool,
26    #[serde(default)]
27    response_type: ResponseType,
28    #[serde(default)]
29    vars: BTreeMap<String, Value>,
30    #[serde(default = "defaults::on_start")]
31    on_start: bool,
32    #[serde(default = "defaults::on_change")]
33    on_change: bool,
34    #[serde(default)]
35    once: bool,
36    #[serde(default = "defaults::blocking")]
37    blocking: bool,
38    #[serde(default)]
39    drop_early: bool,
40    #[serde(default)]
41    in_mapping: BTreeMap<SignalId, String>,
42    #[serde(default)]
43    in_update: SignalId,
44    lo_incoming: SignalId,
45    #[serde(default)]
46    out_result: SignalId,
47}
48
49stateful!(Process {
50    name: String,
51    passive: bool,
52    vars: BTreeMap<String, Value>,
53    on_start: bool,
54    on_change: bool,
55    once: bool,
56    blocking: bool,
57    in_mapping: BTreeMap<SignalId, String>,
58    in_update: SignalId,
59    lo_incoming: SignalId,
60    out_result: SignalId,
61    child: Child,
62    stdin: ChildStdin,
63    link: Receiver<Response>,
64    started: Arc<Mutex<bool>>,
65});
66
67mod defaults {
68    pub fn on_start() -> bool {
69        true
70    }
71
72    pub fn on_change() -> bool {
73        true
74    }
75
76    pub fn blocking() -> bool {
77        true
78    }
79}
80
81enum Response {
82    Result(Value),
83    Error(Error),
84    End,
85}
86
87#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
88#[serde(rename_all = "snake_case")]
89enum ResponseType {
90    Value,
91    Raw,
92    RawAll,
93}
94
95impl Default for ResponseType {
96    fn default() -> Self {
97        Self::Value
98    }
99}
100
101impl Action for Process {
102    fn init(mut self) -> Result<Box<dyn Action>>
103    where
104        Self: 'static + Sized,
105    {
106        if matches!(self.response_type, ResponseType::RawAll) {
107            self.once = true;
108        }
109
110        if self.lo_incoming == 0 {
111            return Err(eyre!("`lo_incoming`for Process cannot be zero."));
112        }
113
114        if self.passive && !self.in_mapping.is_empty() {
115            return Err(eyre!("Setting `in_mapping`for passive Process is useless."));
116        }
117
118        if self.passive && !self.vars.is_empty() {
119            return Err(eyre!("Setting `vars`for passive Process is useless."));
120        }
121
122        if self.drop_early && matches!(self.response_type, ResponseType::RawAll) {
123            return Err(eyre!(
124                "Process cannot have drop_early=True and response_type=raw_all simultaneously."
125            ));
126        }
127
128        Ok(Box::new(self))
129    }
130
131    fn in_signals(&self) -> BTreeSet<SignalId> {
132        let mut signals: BTreeSet<_> = self.in_mapping.keys().cloned().collect();
133        signals.extend([self.in_update, self.lo_incoming]);
134        signals
135    }
136
137    fn out_signals(&self) -> BTreeSet<SignalId> {
138        BTreeSet::from([self.lo_incoming, self.out_result])
139    }
140
141    fn resources(&self, _config: &Config) -> Vec<ResourceAddr> {
142        vec![ResourceAddr::Ref(self.src.clone())]
143    }
144
145    fn stateful(
146        &self,
147        _io: &IoManager,
148        res: &ResourceManager,
149        _config: &Config,
150        sync_writer: &QWriter<SyncSignal>,
151        _async_writer: &QWriter<AsyncSignal>,
152    ) -> Result<Box<dyn StatefulAction>> {
153        let src = match res.fetch(&ResourceAddr::Ref(self.src.clone()))? {
154            ResourceValue::Ref(src) => src,
155            _ => return Err(eyre!("Resource address and value types don't match.")),
156        };
157
158        let mut child = Command::new(src)
159            .args(&self.args)
160            .stdin(Stdio::piped())
161            .stdout(Stdio::piped())
162            .spawn()
163            .wrap_err("Failed to spawn child process.")?;
164
165        let stdin = child
166            .stdin
167            .take()
168            .ok_or(eyre!("Failed to open stdin of child process."))?;
169
170        let stdout = child
171            .stdout
172            .take()
173            .ok_or(eyre!("Failed to open stdout of child process."))?;
174
175        let (tx, rx) = mpsc::channel();
176
177        let started = Arc::new(Mutex::new(false));
178        let drop_early = self.drop_early;
179        let lo_incoming = self.lo_incoming;
180        let response_type = self.response_type;
181        let mut sync_writer = sync_writer.clone();
182        let started_clone = started.clone();
183        thread::spawn(move || {
184            let mut reader = BufReader::new(stdout);
185
186            loop {
187                let response = match response_type {
188                    ResponseType::Value => {
189                        let mut response = String::with_capacity(1024);
190                        if let Err(e) = reader.read_line(&mut response) {
191                            sync_writer.push(SyncSignal::Error(eyre!(
192                                "Failed to receive response from child process:\n{e:#?}"
193                            )));
194                            break;
195                        }
196
197                        let response = response.strip_suffix('\n').unwrap();
198                        let (typ, value) = match response.split_once(' ') {
199                            Some(pair) => pair,
200                            None => (response, ""),
201                        };
202
203                        match typ {
204                            "nil" => Response::Result(Value::Null),
205                            "true" => Response::Result(Value::Bool(true)),
206                            "false" => Response::Result(Value::Bool(false)),
207                            "i64" => value.parse::<i128>().map_or_else(
208                                |e| {
209                                    Response::Error(eyre!(
210                                "Failed to parse (claimed) i64 response from child process:\n{e:?}"
211                            ))
212                                },
213                                |v| Response::Result(Value::Integer(v)),
214                            ),
215                            "f64" => value.parse::<f64>().map_or_else(
216                                |e| {
217                                    Response::Error(eyre!(
218                                "Failed to parse (claimed) f64 response from child process:\n{e:?}"
219                            ))
220                                },
221                                |v| Response::Result(Value::Float(v)),
222                            ),
223                            "str" => Response::Result(Value::Text(value.replace("\\n", "\n"))),
224                            "err" => Response::Error(eyre!(value.replace("\\n", "\n"))),
225                            "end" => Response::End,
226                            _ => Response::Error(eyre!(
227                                "Unknown response type ({typ}) from child process."
228                            )),
229                        }
230                    }
231                    ResponseType::Raw => {
232                        let mut response = String::with_capacity(1024);
233                        if reader.read_line(&mut response).is_err() {
234                            Response::End
235                        } else {
236                            let response = response.strip_suffix('\n').unwrap();
237                            Response::Result(Value::Text(response.to_owned()))
238                        }
239                    }
240                    ResponseType::RawAll => {
241                        let mut response = String::with_capacity(1024);
242                        while let Ok(i) = reader.read_line(&mut response) {
243                            if i == 0 {
244                                break;
245                            }
246                        }
247                        Response::Result(Value::Text(response))
248                    }
249                };
250
251                let end = matches!(response, Response::End | Response::Error(_))
252                    || matches!(response_type, ResponseType::RawAll);
253
254                if !end && drop_early && !*started_clone.lock().unwrap() {
255                    continue;
256                }
257
258                if tx.send(response).is_err() {
259                    break;
260                }
261                sync_writer.push(SyncSignal::Emit(
262                    Instant::now(),
263                    Signal::from(vec![(lo_incoming, Value::Null)]),
264                ));
265                if end {
266                    break;
267                }
268            }
269        });
270
271        Ok(Box::new(StatefulProcess {
272            done: false,
273            name: self.name.clone(),
274            passive: self.passive,
275            vars: self.vars.clone(),
276            on_start: self.on_start,
277            on_change: self.on_change,
278            once: self.once,
279            blocking: self.blocking,
280            in_mapping: BTreeMap::new(),
281            in_update: self.in_update,
282            lo_incoming: self.lo_incoming,
283            out_result: self.out_result,
284            child,
285            stdin,
286            link: rx,
287            started,
288        }))
289    }
290}
291
292impl StatefulAction for StatefulProcess {
293    impl_stateful!();
294
295    fn props(&self) -> Props {
296        if self.once { DEFAULT } else { INFINITE }.into()
297    }
298
299    fn start(
300        &mut self,
301        sync_writer: &mut QWriter<SyncSignal>,
302        async_writer: &mut QWriter<AsyncSignal>,
303        state: &State,
304    ) -> Result<Signal> {
305        for (id, var) in self.in_mapping.iter() {
306            if let Some(entry) = self.vars.get_mut(var) {
307                if let Some(value) = state.get(id) {
308                    *entry = value.clone();
309                }
310            }
311        }
312
313        *self.started.lock().unwrap() = true;
314
315        let mut news = if self.on_start {
316            if self.once && self.blocking {
317                self.done = true;
318                sync_writer.push(SyncSignal::UpdateGraph);
319            }
320
321            self.run(sync_writer, async_writer)
322                .wrap_err("Failed to evaluate function.")?
323                .into_iter()
324                .collect()
325        } else {
326            vec![]
327        };
328
329        loop {
330            let result = match self.link.try_recv() {
331                Ok(Response::Result(v)) => v,
332                Ok(Response::Error(e)) => {
333                    return Err(eyre!("Child process returned error:\n{e:#?}"));
334                }
335                Ok(Response::End) => {
336                    self.done = true;
337                    sync_writer.push(SyncSignal::UpdateGraph);
338                    break;
339                }
340                Err(TryRecvError::Empty) => break,
341                Err(TryRecvError::Disconnected) => {
342                    if self.done {
343                        break;
344                    } else {
345                        return Err(eyre!("Child process died without informing about it."));
346                    }
347                }
348            };
349
350            if !self.name.is_empty() {
351                async_writer.push(LoggerSignal::Append(
352                    "process".to_owned(),
353                    (self.name.clone(), result.clone()),
354                ));
355            }
356
357            if self.out_result > 0 {
358                news.push((self.out_result, result.clone()));
359            }
360
361            if self.once {
362                self.done = true;
363                sync_writer.push(SyncSignal::UpdateGraph);
364            }
365        }
366
367        Ok(news.into())
368    }
369
370    fn update(
371        &mut self,
372        signal: &ActionSignal,
373        sync_writer: &mut QWriter<SyncSignal>,
374        async_writer: &mut QWriter<AsyncSignal>,
375        state: &State,
376    ) -> Result<Signal> {
377        let mut news: Vec<(SignalId, Value)> = vec![];
378        let mut changed = false;
379        let mut updated = false;
380        if let ActionSignal::StateChanged(_, signal) = signal {
381            for id in signal {
382                if let Some(var) = self.in_mapping.get(id) {
383                    if let Some(entry) = self.vars.get_mut(var) {
384                        *entry = state.get(id).unwrap().clone();
385                    }
386                    changed = true;
387                }
388
389                if *id == self.lo_incoming {
390                    let result = match self.link.try_recv() {
391                        Ok(Response::Result(v)) => v,
392                        Ok(Response::Error(e)) => {
393                            return Err(eyre!("Child process returned error:\n{e:#?}"));
394                        }
395                        Ok(Response::End) => {
396                            self.done = true;
397                            sync_writer.push(SyncSignal::UpdateGraph);
398                            return Ok(Signal::none());
399                        }
400                        Err(TryRecvError::Empty) => continue,
401                        Err(TryRecvError::Disconnected) => {
402                            return Err(eyre!("Child process died without informing about it."));
403                        }
404                    };
405
406                    if !self.name.is_empty() {
407                        async_writer.push(LoggerSignal::Append(
408                            "process".to_owned(),
409                            (self.name.clone(), result.clone()),
410                        ));
411                    }
412
413                    if self.out_result > 0 {
414                        news.push((self.out_result, result.clone()));
415                    }
416
417                    if self.once {
418                        self.done = true;
419                        sync_writer.push(SyncSignal::UpdateGraph);
420                    }
421                }
422            }
423
424            if signal.contains(&self.in_update) {
425                updated = true;
426            }
427        }
428
429        if (changed && self.on_change) || updated {
430            news.extend(
431                self.run(sync_writer, async_writer)
432                    .wrap_err("Failed to run process.")?,
433            );
434        }
435
436        Ok(news.into())
437    }
438
439    fn stop(
440        &mut self,
441        _sync_writer: &mut QWriter<SyncSignal>,
442        _async_writer: &mut QWriter<AsyncSignal>,
443        _state: &State,
444    ) -> Result<Signal> {
445        // self.stdin
446        //     .write_all("stop".as_bytes())
447        //     .wrap_err("Failed to stop child process.")?;
448        let _ = self.child.kill();
449        Ok(Signal::none())
450    }
451}
452
453impl StatefulProcess {
454    #[inline(always)]
455    fn run(
456        &mut self,
457        sync_writer: &mut QWriter<SyncSignal>,
458        async_writer: &mut QWriter<AsyncSignal>,
459    ) -> Result<Signal> {
460        if !self.passive {
461            let mut inputs = String::new();
462            if !self.vars.is_empty() {
463                inputs.push_str(&format!("with {}\n", self.vars.len()));
464                for (name, value) in self.vars.iter() {
465                    let value = match value {
466                        Value::Null => "nil".to_owned(),
467                        Value::Bool(true) => "true".to_owned(),
468                        Value::Bool(false) => "false".to_owned(),
469                        Value::Integer(i) => format!("i64 {i}"),
470                        Value::Float(f) => format!("f64 {f}"),
471                        Value::Text(s) => format!("str {}", s.replace('\n', "\\n")),
472                        v => return Err(eyre!("Cannot send value ({v:?}) to child process.")),
473                    };
474
475                    inputs.push_str(&format!("{name} {value}\n"));
476                }
477            }
478            inputs.push_str("go\n");
479
480            self.stdin
481                .write_all(inputs.as_bytes())
482                .wrap_err("Failed to run child process step.")?;
483        }
484
485        let mut news = vec![];
486        if self.blocking {
487            let result = match self.link.recv() {
488                Ok(Response::Result(v)) => v,
489                Ok(Response::Error(e)) => {
490                    return Err(eyre!("Child process returned error:\n{e:#?}"));
491                }
492                Ok(Response::End) => {
493                    self.done = true;
494                    sync_writer.push(SyncSignal::UpdateGraph);
495                    return Ok(Signal::none());
496                }
497                Err(RecvError) => {
498                    return Err(eyre!("Child process died without informing about it."))
499                }
500            };
501
502            if !self.name.is_empty() {
503                async_writer.push(LoggerSignal::Append(
504                    "process".to_owned(),
505                    (self.name.clone(), result.clone()),
506                ));
507            }
508
509            if self.out_result > 0 {
510                news.push((self.out_result, result));
511            }
512
513            if self.once {
514                self.done = true;
515                sync_writer.push(SyncSignal::UpdateGraph);
516            }
517        }
518
519        Ok(news.into())
520    }
521}