1use crate::action::{Action, ActionSignal, Props, StatefulAction, DEFAULT, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, LoggerSignal, ResourceAddr, ResourceManager, ResourceValue};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use eyre::{eyre, Context, Error, Result};
6use serde::{Deserialize, Serialize};
7use serde_cbor::Value;
8use std::collections::{BTreeMap, BTreeSet};
9use std::io::{BufRead, BufReader, Write};
10use std::path::PathBuf;
11use std::process::{Child, ChildStdin, Command, Stdio};
12use std::sync::mpsc::{self, Receiver, RecvError, TryRecvError};
13use std::sync::{Arc, Mutex};
14use std::thread;
15use std::time::Instant;
16
17#[derive(Debug, Deserialize, Serialize)]
18pub struct Process {
19 #[serde(default)]
20 name: String,
21 src: PathBuf,
22 #[serde(default)]
23 args: Vec<String>,
24 #[serde(default)]
25 passive: bool,
26 #[serde(default)]
27 response_type: ResponseType,
28 #[serde(default)]
29 vars: BTreeMap<String, Value>,
30 #[serde(default = "defaults::on_start")]
31 on_start: bool,
32 #[serde(default = "defaults::on_change")]
33 on_change: bool,
34 #[serde(default)]
35 once: bool,
36 #[serde(default = "defaults::blocking")]
37 blocking: bool,
38 #[serde(default)]
39 drop_early: bool,
40 #[serde(default)]
41 in_mapping: BTreeMap<SignalId, String>,
42 #[serde(default)]
43 in_update: SignalId,
44 lo_incoming: SignalId,
45 #[serde(default)]
46 out_result: SignalId,
47}
48
49stateful!(Process {
50 name: String,
51 passive: bool,
52 vars: BTreeMap<String, Value>,
53 on_start: bool,
54 on_change: bool,
55 once: bool,
56 blocking: bool,
57 in_mapping: BTreeMap<SignalId, String>,
58 in_update: SignalId,
59 lo_incoming: SignalId,
60 out_result: SignalId,
61 child: Child,
62 stdin: ChildStdin,
63 link: Receiver<Response>,
64 started: Arc<Mutex<bool>>,
65});
66
67mod defaults {
68 pub fn on_start() -> bool {
69 true
70 }
71
72 pub fn on_change() -> bool {
73 true
74 }
75
76 pub fn blocking() -> bool {
77 true
78 }
79}
80
81enum Response {
82 Result(Value),
83 Error(Error),
84 End,
85}
86
87#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
88#[serde(rename_all = "snake_case")]
89enum ResponseType {
90 Value,
91 Raw,
92 RawAll,
93}
94
95impl Default for ResponseType {
96 fn default() -> Self {
97 Self::Value
98 }
99}
100
101impl Action for Process {
102 fn init(mut self) -> Result<Box<dyn Action>>
103 where
104 Self: 'static + Sized,
105 {
106 if matches!(self.response_type, ResponseType::RawAll) {
107 self.once = true;
108 }
109
110 if self.lo_incoming == 0 {
111 return Err(eyre!("`lo_incoming`for Process cannot be zero."));
112 }
113
114 if self.passive && !self.in_mapping.is_empty() {
115 return Err(eyre!("Setting `in_mapping`for passive Process is useless."));
116 }
117
118 if self.passive && !self.vars.is_empty() {
119 return Err(eyre!("Setting `vars`for passive Process is useless."));
120 }
121
122 if self.drop_early && matches!(self.response_type, ResponseType::RawAll) {
123 return Err(eyre!(
124 "Process cannot have drop_early=True and response_type=raw_all simultaneously."
125 ));
126 }
127
128 Ok(Box::new(self))
129 }
130
131 fn in_signals(&self) -> BTreeSet<SignalId> {
132 let mut signals: BTreeSet<_> = self.in_mapping.keys().cloned().collect();
133 signals.extend([self.in_update, self.lo_incoming]);
134 signals
135 }
136
137 fn out_signals(&self) -> BTreeSet<SignalId> {
138 BTreeSet::from([self.lo_incoming, self.out_result])
139 }
140
141 fn resources(&self, _config: &Config) -> Vec<ResourceAddr> {
142 vec![ResourceAddr::Ref(self.src.clone())]
143 }
144
145 fn stateful(
146 &self,
147 _io: &IoManager,
148 res: &ResourceManager,
149 _config: &Config,
150 sync_writer: &QWriter<SyncSignal>,
151 _async_writer: &QWriter<AsyncSignal>,
152 ) -> Result<Box<dyn StatefulAction>> {
153 let src = match res.fetch(&ResourceAddr::Ref(self.src.clone()))? {
154 ResourceValue::Ref(src) => src,
155 _ => return Err(eyre!("Resource address and value types don't match.")),
156 };
157
158 let mut child = Command::new(src)
159 .args(&self.args)
160 .stdin(Stdio::piped())
161 .stdout(Stdio::piped())
162 .spawn()
163 .wrap_err("Failed to spawn child process.")?;
164
165 let stdin = child
166 .stdin
167 .take()
168 .ok_or(eyre!("Failed to open stdin of child process."))?;
169
170 let stdout = child
171 .stdout
172 .take()
173 .ok_or(eyre!("Failed to open stdout of child process."))?;
174
175 let (tx, rx) = mpsc::channel();
176
177 let started = Arc::new(Mutex::new(false));
178 let drop_early = self.drop_early;
179 let lo_incoming = self.lo_incoming;
180 let response_type = self.response_type;
181 let mut sync_writer = sync_writer.clone();
182 let started_clone = started.clone();
183 thread::spawn(move || {
184 let mut reader = BufReader::new(stdout);
185
186 loop {
187 let response = match response_type {
188 ResponseType::Value => {
189 let mut response = String::with_capacity(1024);
190 if let Err(e) = reader.read_line(&mut response) {
191 sync_writer.push(SyncSignal::Error(eyre!(
192 "Failed to receive response from child process:\n{e:#?}"
193 )));
194 break;
195 }
196
197 let response = response.strip_suffix('\n').unwrap();
198 let (typ, value) = match response.split_once(' ') {
199 Some(pair) => pair,
200 None => (response, ""),
201 };
202
203 match typ {
204 "nil" => Response::Result(Value::Null),
205 "true" => Response::Result(Value::Bool(true)),
206 "false" => Response::Result(Value::Bool(false)),
207 "i64" => value.parse::<i128>().map_or_else(
208 |e| {
209 Response::Error(eyre!(
210 "Failed to parse (claimed) i64 response from child process:\n{e:?}"
211 ))
212 },
213 |v| Response::Result(Value::Integer(v)),
214 ),
215 "f64" => value.parse::<f64>().map_or_else(
216 |e| {
217 Response::Error(eyre!(
218 "Failed to parse (claimed) f64 response from child process:\n{e:?}"
219 ))
220 },
221 |v| Response::Result(Value::Float(v)),
222 ),
223 "str" => Response::Result(Value::Text(value.replace("\\n", "\n"))),
224 "err" => Response::Error(eyre!(value.replace("\\n", "\n"))),
225 "end" => Response::End,
226 _ => Response::Error(eyre!(
227 "Unknown response type ({typ}) from child process."
228 )),
229 }
230 }
231 ResponseType::Raw => {
232 let mut response = String::with_capacity(1024);
233 if reader.read_line(&mut response).is_err() {
234 Response::End
235 } else {
236 let response = response.strip_suffix('\n').unwrap();
237 Response::Result(Value::Text(response.to_owned()))
238 }
239 }
240 ResponseType::RawAll => {
241 let mut response = String::with_capacity(1024);
242 while let Ok(i) = reader.read_line(&mut response) {
243 if i == 0 {
244 break;
245 }
246 }
247 Response::Result(Value::Text(response))
248 }
249 };
250
251 let end = matches!(response, Response::End | Response::Error(_))
252 || matches!(response_type, ResponseType::RawAll);
253
254 if !end && drop_early && !*started_clone.lock().unwrap() {
255 continue;
256 }
257
258 if tx.send(response).is_err() {
259 break;
260 }
261 sync_writer.push(SyncSignal::Emit(
262 Instant::now(),
263 Signal::from(vec![(lo_incoming, Value::Null)]),
264 ));
265 if end {
266 break;
267 }
268 }
269 });
270
271 Ok(Box::new(StatefulProcess {
272 done: false,
273 name: self.name.clone(),
274 passive: self.passive,
275 vars: self.vars.clone(),
276 on_start: self.on_start,
277 on_change: self.on_change,
278 once: self.once,
279 blocking: self.blocking,
280 in_mapping: BTreeMap::new(),
281 in_update: self.in_update,
282 lo_incoming: self.lo_incoming,
283 out_result: self.out_result,
284 child,
285 stdin,
286 link: rx,
287 started,
288 }))
289 }
290}
291
292impl StatefulAction for StatefulProcess {
293 impl_stateful!();
294
295 fn props(&self) -> Props {
296 if self.once { DEFAULT } else { INFINITE }.into()
297 }
298
299 fn start(
300 &mut self,
301 sync_writer: &mut QWriter<SyncSignal>,
302 async_writer: &mut QWriter<AsyncSignal>,
303 state: &State,
304 ) -> Result<Signal> {
305 for (id, var) in self.in_mapping.iter() {
306 if let Some(entry) = self.vars.get_mut(var) {
307 if let Some(value) = state.get(id) {
308 *entry = value.clone();
309 }
310 }
311 }
312
313 *self.started.lock().unwrap() = true;
314
315 let mut news = if self.on_start {
316 if self.once && self.blocking {
317 self.done = true;
318 sync_writer.push(SyncSignal::UpdateGraph);
319 }
320
321 self.run(sync_writer, async_writer)
322 .wrap_err("Failed to evaluate function.")?
323 .into_iter()
324 .collect()
325 } else {
326 vec![]
327 };
328
329 loop {
330 let result = match self.link.try_recv() {
331 Ok(Response::Result(v)) => v,
332 Ok(Response::Error(e)) => {
333 return Err(eyre!("Child process returned error:\n{e:#?}"));
334 }
335 Ok(Response::End) => {
336 self.done = true;
337 sync_writer.push(SyncSignal::UpdateGraph);
338 break;
339 }
340 Err(TryRecvError::Empty) => break,
341 Err(TryRecvError::Disconnected) => {
342 if self.done {
343 break;
344 } else {
345 return Err(eyre!("Child process died without informing about it."));
346 }
347 }
348 };
349
350 if !self.name.is_empty() {
351 async_writer.push(LoggerSignal::Append(
352 "process".to_owned(),
353 (self.name.clone(), result.clone()),
354 ));
355 }
356
357 if self.out_result > 0 {
358 news.push((self.out_result, result.clone()));
359 }
360
361 if self.once {
362 self.done = true;
363 sync_writer.push(SyncSignal::UpdateGraph);
364 }
365 }
366
367 Ok(news.into())
368 }
369
370 fn update(
371 &mut self,
372 signal: &ActionSignal,
373 sync_writer: &mut QWriter<SyncSignal>,
374 async_writer: &mut QWriter<AsyncSignal>,
375 state: &State,
376 ) -> Result<Signal> {
377 let mut news: Vec<(SignalId, Value)> = vec![];
378 let mut changed = false;
379 let mut updated = false;
380 if let ActionSignal::StateChanged(_, signal) = signal {
381 for id in signal {
382 if let Some(var) = self.in_mapping.get(id) {
383 if let Some(entry) = self.vars.get_mut(var) {
384 *entry = state.get(id).unwrap().clone();
385 }
386 changed = true;
387 }
388
389 if *id == self.lo_incoming {
390 let result = match self.link.try_recv() {
391 Ok(Response::Result(v)) => v,
392 Ok(Response::Error(e)) => {
393 return Err(eyre!("Child process returned error:\n{e:#?}"));
394 }
395 Ok(Response::End) => {
396 self.done = true;
397 sync_writer.push(SyncSignal::UpdateGraph);
398 return Ok(Signal::none());
399 }
400 Err(TryRecvError::Empty) => continue,
401 Err(TryRecvError::Disconnected) => {
402 return Err(eyre!("Child process died without informing about it."));
403 }
404 };
405
406 if !self.name.is_empty() {
407 async_writer.push(LoggerSignal::Append(
408 "process".to_owned(),
409 (self.name.clone(), result.clone()),
410 ));
411 }
412
413 if self.out_result > 0 {
414 news.push((self.out_result, result.clone()));
415 }
416
417 if self.once {
418 self.done = true;
419 sync_writer.push(SyncSignal::UpdateGraph);
420 }
421 }
422 }
423
424 if signal.contains(&self.in_update) {
425 updated = true;
426 }
427 }
428
429 if (changed && self.on_change) || updated {
430 news.extend(
431 self.run(sync_writer, async_writer)
432 .wrap_err("Failed to run process.")?,
433 );
434 }
435
436 Ok(news.into())
437 }
438
439 fn stop(
440 &mut self,
441 _sync_writer: &mut QWriter<SyncSignal>,
442 _async_writer: &mut QWriter<AsyncSignal>,
443 _state: &State,
444 ) -> Result<Signal> {
445 let _ = self.child.kill();
449 Ok(Signal::none())
450 }
451}
452
453impl StatefulProcess {
454 #[inline(always)]
455 fn run(
456 &mut self,
457 sync_writer: &mut QWriter<SyncSignal>,
458 async_writer: &mut QWriter<AsyncSignal>,
459 ) -> Result<Signal> {
460 if !self.passive {
461 let mut inputs = String::new();
462 if !self.vars.is_empty() {
463 inputs.push_str(&format!("with {}\n", self.vars.len()));
464 for (name, value) in self.vars.iter() {
465 let value = match value {
466 Value::Null => "nil".to_owned(),
467 Value::Bool(true) => "true".to_owned(),
468 Value::Bool(false) => "false".to_owned(),
469 Value::Integer(i) => format!("i64 {i}"),
470 Value::Float(f) => format!("f64 {f}"),
471 Value::Text(s) => format!("str {}", s.replace('\n', "\\n")),
472 v => return Err(eyre!("Cannot send value ({v:?}) to child process.")),
473 };
474
475 inputs.push_str(&format!("{name} {value}\n"));
476 }
477 }
478 inputs.push_str("go\n");
479
480 self.stdin
481 .write_all(inputs.as_bytes())
482 .wrap_err("Failed to run child process step.")?;
483 }
484
485 let mut news = vec![];
486 if self.blocking {
487 let result = match self.link.recv() {
488 Ok(Response::Result(v)) => v,
489 Ok(Response::Error(e)) => {
490 return Err(eyre!("Child process returned error:\n{e:#?}"));
491 }
492 Ok(Response::End) => {
493 self.done = true;
494 sync_writer.push(SyncSignal::UpdateGraph);
495 return Ok(Signal::none());
496 }
497 Err(RecvError) => {
498 return Err(eyre!("Child process died without informing about it."))
499 }
500 };
501
502 if !self.name.is_empty() {
503 async_writer.push(LoggerSignal::Append(
504 "process".to_owned(),
505 (self.name.clone(), result.clone()),
506 ));
507 }
508
509 if self.out_result > 0 {
510 news.push((self.out_result, result));
511 }
512
513 if self.once {
514 self.done = true;
515 sync_writer.push(SyncSignal::UpdateGraph);
516 }
517 }
518
519 Ok(news.into())
520 }
521}