workflow_node/
process.rs

1//!
2//! Module encapsulating [`Process`] API for running child process daemons under Node.js and NWJS
3//!
4use crate::child_process::{
5    spawn_with_args_and_options, ChildProcess, KillSignal, SpawnArgs, SpawnOptions,
6};
7use crate::error::Error;
8use crate::result::Result;
9use borsh::{BorshDeserialize, BorshSerialize};
10use futures::{select, FutureExt};
11use node_sys::*;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use wasm_bindgen::prelude::*;
19use workflow_core::channel::{oneshot, Channel, Receiver, Sender};
20use workflow_core::task::*;
21use workflow_core::time::Instant;
22use workflow_log::*;
23use workflow_task::*;
24use workflow_wasm::callback::*;
25use workflow_wasm::jserror::*;
26
27/// Version struct for standard version extraction from executables via `--version` output
28pub struct Version {
29    pub major: u64,
30    pub minor: u64,
31    pub patch: u64,
32    pub none: bool,
33}
34
35impl Version {
36    pub fn new(major: u64, minor: u64, patch: u64) -> Version {
37        Version {
38            major,
39            minor,
40            patch,
41            none: false,
42        }
43    }
44
45    pub fn none() -> Version {
46        Version {
47            major: 0,
48            minor: 0,
49            patch: 0,
50            none: true,
51        }
52    }
53}
54
55impl std::fmt::Display for Version {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        if self.none {
58            write!(f, "n/a")
59        } else {
60            write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
61        }
62    }
63}
64
65/// Child process execution result
66pub struct ExecutionResult {
67    pub termination: Termination,
68    pub stdout: String,
69    pub stderr: String,
70}
71
72impl ExecutionResult {
73    pub fn is_error(&self) -> bool {
74        matches!(self.termination, Termination::Error(_))
75    }
76}
77
78pub enum Termination {
79    Exit(u32),
80    Error(String),
81}
82
83#[derive(Debug, Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize)]
84pub enum Event {
85    Start,
86    Exit(u32),
87    Error(String),
88    Stdout(String),
89    Stderr(String),
90}
91
92/// Options for [`Process`] daemon runner
93pub struct Options {
94    /// Process arguments (the first element is the process binary file name / executable)
95    argv: Vec<String>,
96    /// Current working directory
97    cwd: Option<PathBuf>,
98    /// Automatic restart on exit
99    restart: bool,
100    /// Delay between automatic restarts
101    restart_delay: Duration,
102    /// This flag triggers forceful process termination after a given period of time.
103    /// At the termination, the process is issued a `SIGTERM` signal. If the process fails
104    /// to exit after a given period of time and `use_force` is enabled, the process
105    /// will be issued a `SIGKILL` signal, triggering it's immediate termination.
106    use_force: bool,
107    /// Delay period after which to issue a `SIGKILL` signal.
108    use_force_delay: Duration,
109    /// Events relay [`Event`] enum that carries events emitted by the child process
110    /// this includes stdout and stderr output, [`Event::Exit`] in case of a graceful
111    /// termination and [`Event::Error`] in case of an error.
112    events: Channel<Event>,
113    muted_buffer_capacity: Option<usize>,
114    mute: bool,
115}
116
117#[allow(clippy::too_many_arguments)]
118impl Options {
119    pub fn new(
120        argv: &[&str],
121        cwd: Option<PathBuf>,
122        restart: bool,
123        restart_delay: Option<Duration>,
124        use_force: bool,
125        use_force_delay: Option<Duration>,
126        events: Channel<Event>,
127        muted_buffer_capacity: Option<usize>,
128        mute: bool,
129    ) -> Options {
130        let argv = argv.iter().map(|s| s.to_string()).collect::<Vec<_>>();
131
132        Options {
133            argv,
134            cwd,
135            restart,
136            restart_delay: restart_delay.unwrap_or_default(),
137            use_force,
138            use_force_delay: use_force_delay.unwrap_or(Duration::from_millis(10_000)),
139            events,
140            muted_buffer_capacity,
141            mute,
142        }
143    }
144}
145
146impl Default for Options {
147    fn default() -> Self {
148        Self {
149            argv: Vec::new(),
150            cwd: None,
151            restart: true,
152            restart_delay: Duration::from_millis(3_000),
153            use_force: false,
154            use_force_delay: Duration::from_millis(10_000),
155            events: Channel::unbounded(),
156            muted_buffer_capacity: None,
157            mute: false,
158        }
159    }
160}
161
162struct Inner {
163    argv: Mutex<Vec<String>>,
164    cwd: Mutex<Option<PathBuf>>,
165    running: AtomicBool,
166    restart: AtomicBool,
167    restart_delay: Mutex<Duration>,
168    use_force: AtomicBool,
169    use_force_delay: Mutex<Duration>,
170    events: Channel<Event>,
171    proc: Arc<Mutex<Option<Arc<ChildProcess>>>>,
172    callbacks: CallbackMap,
173    start_time: Arc<Mutex<Option<Instant>>>,
174    mute: Arc<AtomicBool>,
175    muted_buffer_capacity: Option<usize>,
176    muted_buffer_stdout: Arc<Mutex<VecDeque<String>>>,
177    muted_buffer_stderr: Arc<Mutex<VecDeque<String>>>,
178}
179
180unsafe impl Send for Inner {}
181unsafe impl Sync for Inner {}
182
183impl Inner {
184    pub fn new(options: Options) -> Inner {
185        Inner {
186            argv: Mutex::new(options.argv),
187            cwd: Mutex::new(options.cwd),
188            running: AtomicBool::new(false),
189            restart: AtomicBool::new(options.restart),
190            restart_delay: Mutex::new(options.restart_delay),
191            use_force: AtomicBool::new(options.use_force),
192            use_force_delay: Mutex::new(options.use_force_delay),
193            events: options.events,
194            proc: Arc::new(Mutex::new(None)),
195            callbacks: CallbackMap::new(),
196            start_time: Arc::new(Mutex::new(None)),
197            mute: Arc::new(AtomicBool::new(options.mute)),
198            muted_buffer_capacity: options.muted_buffer_capacity,
199            muted_buffer_stdout: Arc::new(Mutex::new(VecDeque::default())),
200            muted_buffer_stderr: Arc::new(Mutex::new(VecDeque::default())),
201        }
202    }
203
204    fn program(&self) -> String {
205        self.argv.lock().unwrap().first().unwrap().clone()
206    }
207
208    fn args(&self) -> Vec<String> {
209        self.argv.lock().unwrap()[1..].to_vec()
210    }
211
212    fn cwd(&self) -> Option<PathBuf> {
213        self.cwd.lock().unwrap().clone()
214    }
215
216    pub fn uptime(&self) -> Option<Duration> {
217        if self.running.load(Ordering::SeqCst) {
218            self.start_time.lock().unwrap().map(|ts| ts.elapsed())
219        } else {
220            None
221        }
222    }
223
224    fn buffer_muted(&self, data: buffer::Buffer, muted_buffer: &Arc<Mutex<VecDeque<String>>>) {
225        let muted_buffer_capacity = self.muted_buffer_capacity.unwrap_or_default();
226        if muted_buffer_capacity > 0 {
227            let mut muted_buffer = muted_buffer.lock().unwrap();
228            let buffer = String::from(data.to_string(None, None, None));
229            let lines = buffer.split('\n').collect::<Vec<_>>();
230            for line in lines {
231                let line = line.trim();
232                if !line.is_empty() {
233                    muted_buffer.push_back(trim(line.to_string()));
234                }
235            }
236            while muted_buffer.len() > muted_buffer_capacity {
237                muted_buffer.pop_front();
238            }
239        }
240    }
241
242    fn drain_muted(
243        &self,
244        acc: &Arc<Mutex<VecDeque<String>>>,
245        sender: &Sender<Event>,
246        stdout: bool,
247    ) -> Result<()> {
248        let mut acc = acc.lock().unwrap();
249        if stdout {
250            acc.drain(..).for_each(|line| {
251                sender.try_send(Event::Stdout(line)).unwrap();
252            });
253        } else {
254            acc.drain(..).for_each(|line| {
255                sender.try_send(Event::Stderr(line)).unwrap();
256            });
257        }
258        Ok(())
259    }
260
261    pub fn toggle_mute(&self) -> Result<bool> {
262        if self.mute.load(Ordering::SeqCst) {
263            self.mute.store(false, Ordering::SeqCst);
264            self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
265            self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
266            Ok(false)
267        } else {
268            self.mute.store(true, Ordering::SeqCst);
269            Ok(true)
270        }
271    }
272
273    pub fn mute(&self, mute: bool) -> Result<()> {
274        if mute != self.mute.load(Ordering::SeqCst) {
275            self.mute.store(mute, Ordering::SeqCst);
276            if !mute {
277                self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
278                self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
279            }
280        }
281
282        Ok(())
283    }
284
285    pub async fn run(self: &Arc<Self>, stop: Receiver<()>) -> Result<()> {
286        if self.running.load(Ordering::SeqCst) {
287            return Err(Error::AlreadyRunning);
288        }
289
290        'outer: loop {
291            let termination = Channel::<Termination>::oneshot();
292
293            self.start_time.lock().unwrap().replace(Instant::now());
294
295            let proc = {
296                let program = self.program();
297                let args = &self.args();
298
299                let args: SpawnArgs = args.as_slice().into();
300                let options = SpawnOptions::new();
301                if let Some(cwd) = &self.cwd() {
302                    options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
303                        panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
304                    }));
305                }
306
307                Arc::new(spawn_with_args_and_options(&program, &args, &options))
308            };
309
310            let this = self.clone();
311            let exit_sender = termination.sender.clone();
312            let exit = callback!(move |code: JsValue| {
313                let code = code.as_f64().unwrap_or_default() as u32;
314                this.events.sender.try_send(Event::Exit(code)).ok();
315                exit_sender
316                    .try_send(Termination::Exit(code))
317                    .expect("unable to send close notification");
318            });
319            proc.on("exit", exit.as_ref());
320            self.callbacks.retain(exit.clone())?;
321
322            let this = self.clone();
323            let error_sender = termination.sender.clone();
324            let error = callback!(move |err: JsValue| {
325                let msg = JsErrorData::from(err);
326                this.events
327                    .sender
328                    .try_send(Event::Error(msg.to_string()))
329                    .ok();
330                error_sender
331                    .try_send(Termination::Error(msg.to_string()))
332                    .expect("unable to send close notification");
333            });
334            proc.on("error", error.as_ref());
335            self.callbacks.retain(error.clone())?;
336
337            let this = self.clone();
338            let stdout_cb = callback!(move |data: buffer::Buffer| {
339                if this.mute.load(Ordering::SeqCst) {
340                    this.buffer_muted(data, &this.muted_buffer_stdout);
341                } else {
342                    this.events
343                        .sender
344                        .try_send(Event::Stdout(String::from(
345                            data.to_string(None, None, None),
346                        )))
347                        .unwrap();
348                }
349            });
350            proc.stdout().on("data", stdout_cb.as_ref());
351            self.callbacks.retain(stdout_cb)?;
352
353            let this = self.clone();
354            let stderr_cb = callback!(move |data: buffer::Buffer| {
355                if this.mute.load(Ordering::SeqCst) {
356                    this.buffer_muted(data, &this.muted_buffer_stderr);
357                } else {
358                    this.events
359                        .sender
360                        .try_send(Event::Stderr(String::from(
361                            data.to_string(None, None, None),
362                        )))
363                        .unwrap();
364                }
365            });
366            proc.stderr().on("data", stderr_cb.as_ref());
367            self.callbacks.retain(stderr_cb)?;
368
369            *self.proc.lock().unwrap() = Some(proc.clone());
370            self.running.store(true, Ordering::SeqCst);
371
372            self.events.sender.try_send(Event::Start).unwrap();
373
374            let kill = select! {
375                // process exited
376                e = termination.receiver.recv().fuse() => {
377
378                    // if exited with error, abort...
379                    if matches!(e,Ok(Termination::Error(_))) {
380                        break;
381                    }
382
383                    // if restart is not required, break
384                    if !self.restart.load(Ordering::SeqCst) {
385                        break;
386                    } else {
387                        // sleep and then restart
388                        let restart_delay = *self.restart_delay.lock().unwrap();
389                        select! {
390                            // slept well, aim to restart
391                            _ = sleep(restart_delay).fuse() => {
392                                false
393                            },
394                            // stop received while sleeping, break
395                            _ = stop.recv().fuse() => {
396                                break;
397                            }
398                        }
399                    }
400                },
401                // manual shutdown while the process is running
402                _ = stop.recv().fuse() => {
403                    true
404                }
405            };
406
407            if kill {
408                // start process termination
409                self.restart.store(false, Ordering::SeqCst);
410                proc.kill_with_signal(KillSignal::SIGTERM);
411                // if not using force, wait for process termination on SIGTERM
412                if !self.use_force.load(Ordering::SeqCst) {
413                    termination.receiver.recv().await?;
414                    break;
415                } else {
416                    // if using force, sleep and kill with SIGKILL
417                    let use_force_delay = sleep(*self.use_force_delay.lock().unwrap());
418                    select! {
419                        // process exited normally, break
420                        _ = termination.receiver.recv().fuse() => {
421                            break 'outer;
422                        },
423                        // post SIGKILL and wait for exit
424                        _ = use_force_delay.fuse() => {
425                            proc.kill_with_signal(KillSignal::SIGKILL);
426                            termination.receiver.recv().await?;
427                            break 'outer;
428                        },
429                    }
430                }
431            }
432        }
433
434        self.callbacks.clear();
435        *self.proc.lock().unwrap() = None;
436        self.running.store(false, Ordering::SeqCst);
437
438        Ok(())
439    }
440}
441
442/// The [`Process`] class facilitating execution of a Child Process in Node.js or NWJS
443/// environments. This wrapper runs the child process as a daemon, restarting it if
444/// it fails.  The process provides `stdout` and `stderr` output as channel [`Receiver`]
445/// channels, allowing for a passive capture of the process console output.
446#[derive(Clone)]
447pub struct Process {
448    inner: Arc<Inner>,
449    task: Arc<Task<Arc<Inner>, ()>>,
450}
451
452unsafe impl Send for Process {}
453unsafe impl Sync for Process {}
454
455impl Process {
456    /// Create new process instance
457    pub fn new(options: Options) -> Process {
458        let inner = Arc::new(Inner::new(options));
459
460        let task = task!(|inner: Arc<Inner>, stop| async move {
461            inner.run(stop).await.ok();
462        });
463
464        Process {
465            inner,
466            task: Arc::new(task),
467        }
468    }
469
470    pub fn new_once(path: &str) -> Process {
471        let options = Options::new(
472            &[path],
473            None,
474            false,
475            None,
476            false,
477            // None,
478            // None,
479            None,
480            Channel::unbounded(),
481            None,
482            false,
483        );
484
485        Self::new(options)
486    }
487
488    pub async fn version(path: &str) -> Result<Version> {
489        version(path).await
490    }
491
492    pub fn is_running(&self) -> bool {
493        self.inner.running.load(Ordering::SeqCst)
494    }
495
496    pub fn mute(&self, mute: bool) -> Result<()> {
497        self.inner.mute(mute)
498    }
499
500    pub fn toggle_mute(&self) -> Result<bool> {
501        self.inner.toggle_mute()
502    }
503
504    pub fn uptime(&self) -> Option<Duration> {
505        self.inner.uptime()
506    }
507
508    /// Obtain a clone of the channel [`Receiver`] that captures
509    /// [`Event`] of the underlying process.
510    pub fn events(&self) -> Receiver<Event> {
511        self.inner.events.receiver.clone()
512    }
513
514    pub fn replace_argv(&self, argv: Vec<String>) {
515        *self.inner.argv.lock().unwrap() = argv;
516    }
517
518    /// Run the process in the background.  Spawns an async task that
519    /// monitors the process, capturing its output and restarting
520    /// the process if it exits prematurely.
521    pub fn run(&self) -> Result<()> {
522        self.task.run(self.inner.clone())?;
523        Ok(())
524    }
525
526    /// Issue a `SIGKILL` signal, terminating the process immediately.
527    pub fn kill(&self) -> Result<()> {
528        if !self.inner.running.load(Ordering::SeqCst) {
529            Err(Error::NotRunning)
530        } else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
531            self.inner.restart.store(false, Ordering::SeqCst);
532            proc.kill_with_signal(KillSignal::SIGKILL);
533            Ok(())
534        } else {
535            Err(Error::ProcIsAbsent)
536        }
537    }
538
539    /// Issue a `SIGTERM` signal causing the process to exit. The process
540    /// will be restarted by the monitoring task.
541    pub fn restart(&self) -> Result<()> {
542        if !self.inner.running.load(Ordering::SeqCst) {
543            Err(Error::NotRunning)
544        } else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
545            proc.kill_with_signal(KillSignal::SIGTERM);
546            Ok(())
547        } else {
548            Err(Error::ProcIsAbsent)
549        }
550    }
551
552    /// Stop the process by disabling auto-restart and issuing
553    /// a `SIGTERM` signal. Returns `Ok(())` if the process
554    /// is not running.
555    pub fn stop(&self) -> Result<()> {
556        if self.inner.running.load(Ordering::SeqCst) {
557            self.inner.restart.store(false, Ordering::SeqCst);
558            self.task.stop()?;
559        }
560
561        Ok(())
562    }
563
564    /// Join the process like you would a thread - this async
565    /// function blocks until the process exits.
566    pub async fn join(&self) -> Result<()> {
567        if self.task.is_running() {
568            self.task.join().await?;
569        }
570        Ok(())
571    }
572
573    /// Stop the process and block until it exits.
574    pub async fn stop_and_join(&self) -> Result<()> {
575        self.stop()?;
576        self.join().await?;
577        Ok(())
578    }
579}
580
581/// Execute the process single time with custom command-line arguments.
582/// Useful to obtain a version via `--version` or perform single-task
583/// executions - not as a daemon.
584pub async fn exec(
585    // &self,
586    argv: &[&str],
587    cwd: Option<PathBuf>,
588) -> Result<ExecutionResult> {
589    let proc = *argv.first().unwrap();
590
591    let args: SpawnArgs = argv[1..].into();
592    let options = SpawnOptions::new();
593    if let Some(cwd) = cwd {
594        options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
595            panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
596        }));
597    }
598
599    let termination = Channel::<Termination>::oneshot();
600    let (stdout_tx, stdout_rx) = oneshot();
601    let (stderr_tx, stderr_rx) = oneshot();
602
603    let cp = spawn_with_args_and_options(proc, &args, &options);
604
605    let exit = termination.sender.clone();
606    let exit = callback!(move |code: u32| {
607        exit.try_send(Termination::Exit(code))
608            .expect("unable to send close notification");
609    });
610    cp.on("exit", exit.as_ref());
611
612    let error = termination.sender.clone();
613    let error = callback!(move |err: JsValue| {
614        error
615            .try_send(Termination::Error(format!("{:?}", err)))
616            .expect("unable to send close notification");
617    });
618    cp.on("error", error.as_ref());
619
620    let stdout_cb = callback!(move |data: buffer::Buffer| {
621        stdout_tx
622            .try_send(String::from(data.to_string(None, None, None)))
623            .expect("unable to send stdout data");
624    });
625    cp.stdout().on("data", stdout_cb.as_ref());
626
627    let stderr_cb = callback!(move |data: buffer::Buffer| {
628        stderr_tx
629            .try_send(String::from(data.to_string(None, None, None)))
630            .expect("unable to send stderr data");
631    });
632    cp.stderr().on("data", stderr_cb.as_ref());
633
634    let termination = termination.recv().await?;
635
636    let mut stdout = String::new();
637    for _ in 0..stdout_rx.len() {
638        stdout.push_str(&stdout_rx.try_recv()?);
639    }
640
641    let mut stderr = String::new();
642    for _ in 0..stderr_rx.len() {
643        stderr.push_str(&stdout_rx.try_recv()?);
644    }
645
646    Ok(ExecutionResult {
647        termination,
648        stdout,
649        stderr,
650    })
651}
652
653/// Obtain the process version information by running it with `--version` argument.
654pub async fn version(proc: &str) -> Result<Version> {
655    let text = exec([proc, "--version"].as_slice(), None).await?.stdout;
656    let vstr = if let Some(vstr) = text.split_whitespace().last() {
657        vstr
658    } else {
659        return Ok(Version::none());
660    };
661
662    let v = vstr
663        .split('.')
664        .flat_map(|v| v.parse::<u64>())
665        .collect::<Vec<_>>();
666
667    if v.len() != 3 {
668        return Ok(Version::none());
669    }
670
671    Ok(Version::new(v[0], v[1], v[2]))
672}
673
674pub fn trim(mut s: String) -> String {
675    // let mut s = String::from(self);
676    if s.ends_with('\n') {
677        s.pop();
678        if s.ends_with('\r') {
679            s.pop();
680        }
681    }
682    s
683}
684
685// #[wasm_bindgen]
686pub async fn test_child_process() {
687    log_info!("running rust test() fn");
688    workflow_wasm::panic::init_console_panic_hook();
689
690    let proc = Process::new(Options::new(
691        &["/Users/aspect/dev/kaspa-dev/kaspad/kaspad"],
692        None,
693        true,
694        Some(Duration::from_millis(3000)),
695        true,
696        Some(Duration::from_millis(100)),
697        Channel::unbounded(),
698        None,
699        false,
700    ));
701    // futures::task
702    let task = task!(|events: Receiver<Event>, stop: Receiver<()>| async move {
703        loop {
704            select! {
705                v = events.recv().fuse() => {
706                    if let Ok(v) = v {
707                        log_info!("| {:?}",v);
708                    }
709                },
710                _ = stop.recv().fuse() => {
711                    log_info!("stop...");
712                    break;
713                }
714            }
715            log_info!("in loop");
716        }
717    });
718    task.run(proc.events()).expect("task.run()");
719
720    proc.run().expect("proc.run()");
721
722    sleep(Duration::from_millis(5_000)).await;
723
724    proc.stop_and_join()
725        .await
726        .expect("proc.stop_and_join() failure");
727    task.stop_and_join()
728        .await
729        .expect("task.stop_and_join() failure");
730}