1use crate::child_process::{
5 spawn_with_args_and_options, ChildProcess, KillSignal, SpawnArgs, SpawnOptions,
6};
7use crate::error::Error;
8use crate::result::Result;
9use borsh::{BorshDeserialize, BorshSerialize};
10use futures::{select, FutureExt};
11use node_sys::*;
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18use wasm_bindgen::prelude::*;
19use workflow_core::channel::{oneshot, Channel, Receiver, Sender};
20use workflow_core::task::*;
21use workflow_core::time::Instant;
22use workflow_log::*;
23use workflow_task::*;
24use workflow_wasm::callback::*;
25use workflow_wasm::jserror::*;
26
27pub struct Version {
29 pub major: u64,
30 pub minor: u64,
31 pub patch: u64,
32 pub none: bool,
33}
34
35impl Version {
36 pub fn new(major: u64, minor: u64, patch: u64) -> Version {
37 Version {
38 major,
39 minor,
40 patch,
41 none: false,
42 }
43 }
44
45 pub fn none() -> Version {
46 Version {
47 major: 0,
48 minor: 0,
49 patch: 0,
50 none: true,
51 }
52 }
53}
54
55impl std::fmt::Display for Version {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 if self.none {
58 write!(f, "n/a")
59 } else {
60 write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
61 }
62 }
63}
64
65pub struct ExecutionResult {
67 pub termination: Termination,
68 pub stdout: String,
69 pub stderr: String,
70}
71
72impl ExecutionResult {
73 pub fn is_error(&self) -> bool {
74 matches!(self.termination, Termination::Error(_))
75 }
76}
77
78pub enum Termination {
79 Exit(u32),
80 Error(String),
81}
82
83#[derive(Debug, Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize)]
84pub enum Event {
85 Start,
86 Exit(u32),
87 Error(String),
88 Stdout(String),
89 Stderr(String),
90}
91
92pub struct Options {
94 argv: Vec<String>,
96 cwd: Option<PathBuf>,
98 restart: bool,
100 restart_delay: Duration,
102 use_force: bool,
107 use_force_delay: Duration,
109 events: Channel<Event>,
113 muted_buffer_capacity: Option<usize>,
114 mute: bool,
115}
116
117#[allow(clippy::too_many_arguments)]
118impl Options {
119 pub fn new(
120 argv: &[&str],
121 cwd: Option<PathBuf>,
122 restart: bool,
123 restart_delay: Option<Duration>,
124 use_force: bool,
125 use_force_delay: Option<Duration>,
126 events: Channel<Event>,
127 muted_buffer_capacity: Option<usize>,
128 mute: bool,
129 ) -> Options {
130 let argv = argv.iter().map(|s| s.to_string()).collect::<Vec<_>>();
131
132 Options {
133 argv,
134 cwd,
135 restart,
136 restart_delay: restart_delay.unwrap_or_default(),
137 use_force,
138 use_force_delay: use_force_delay.unwrap_or(Duration::from_millis(10_000)),
139 events,
140 muted_buffer_capacity,
141 mute,
142 }
143 }
144}
145
146impl Default for Options {
147 fn default() -> Self {
148 Self {
149 argv: Vec::new(),
150 cwd: None,
151 restart: true,
152 restart_delay: Duration::from_millis(3_000),
153 use_force: false,
154 use_force_delay: Duration::from_millis(10_000),
155 events: Channel::unbounded(),
156 muted_buffer_capacity: None,
157 mute: false,
158 }
159 }
160}
161
162struct Inner {
163 argv: Mutex<Vec<String>>,
164 cwd: Mutex<Option<PathBuf>>,
165 running: AtomicBool,
166 restart: AtomicBool,
167 restart_delay: Mutex<Duration>,
168 use_force: AtomicBool,
169 use_force_delay: Mutex<Duration>,
170 events: Channel<Event>,
171 proc: Arc<Mutex<Option<Arc<ChildProcess>>>>,
172 callbacks: CallbackMap,
173 start_time: Arc<Mutex<Option<Instant>>>,
174 mute: Arc<AtomicBool>,
175 muted_buffer_capacity: Option<usize>,
176 muted_buffer_stdout: Arc<Mutex<VecDeque<String>>>,
177 muted_buffer_stderr: Arc<Mutex<VecDeque<String>>>,
178}
179
180unsafe impl Send for Inner {}
181unsafe impl Sync for Inner {}
182
183impl Inner {
184 pub fn new(options: Options) -> Inner {
185 Inner {
186 argv: Mutex::new(options.argv),
187 cwd: Mutex::new(options.cwd),
188 running: AtomicBool::new(false),
189 restart: AtomicBool::new(options.restart),
190 restart_delay: Mutex::new(options.restart_delay),
191 use_force: AtomicBool::new(options.use_force),
192 use_force_delay: Mutex::new(options.use_force_delay),
193 events: options.events,
194 proc: Arc::new(Mutex::new(None)),
195 callbacks: CallbackMap::new(),
196 start_time: Arc::new(Mutex::new(None)),
197 mute: Arc::new(AtomicBool::new(options.mute)),
198 muted_buffer_capacity: options.muted_buffer_capacity,
199 muted_buffer_stdout: Arc::new(Mutex::new(VecDeque::default())),
200 muted_buffer_stderr: Arc::new(Mutex::new(VecDeque::default())),
201 }
202 }
203
204 fn program(&self) -> String {
205 self.argv.lock().unwrap().first().unwrap().clone()
206 }
207
208 fn args(&self) -> Vec<String> {
209 self.argv.lock().unwrap()[1..].to_vec()
210 }
211
212 fn cwd(&self) -> Option<PathBuf> {
213 self.cwd.lock().unwrap().clone()
214 }
215
216 pub fn uptime(&self) -> Option<Duration> {
217 if self.running.load(Ordering::SeqCst) {
218 self.start_time.lock().unwrap().map(|ts| ts.elapsed())
219 } else {
220 None
221 }
222 }
223
224 fn buffer_muted(&self, data: buffer::Buffer, muted_buffer: &Arc<Mutex<VecDeque<String>>>) {
225 let muted_buffer_capacity = self.muted_buffer_capacity.unwrap_or_default();
226 if muted_buffer_capacity > 0 {
227 let mut muted_buffer = muted_buffer.lock().unwrap();
228 let buffer = String::from(data.to_string(None, None, None));
229 let lines = buffer.split('\n').collect::<Vec<_>>();
230 for line in lines {
231 let line = line.trim();
232 if !line.is_empty() {
233 muted_buffer.push_back(trim(line.to_string()));
234 }
235 }
236 while muted_buffer.len() > muted_buffer_capacity {
237 muted_buffer.pop_front();
238 }
239 }
240 }
241
242 fn drain_muted(
243 &self,
244 acc: &Arc<Mutex<VecDeque<String>>>,
245 sender: &Sender<Event>,
246 stdout: bool,
247 ) -> Result<()> {
248 let mut acc = acc.lock().unwrap();
249 if stdout {
250 acc.drain(..).for_each(|line| {
251 sender.try_send(Event::Stdout(line)).unwrap();
252 });
253 } else {
254 acc.drain(..).for_each(|line| {
255 sender.try_send(Event::Stderr(line)).unwrap();
256 });
257 }
258 Ok(())
259 }
260
261 pub fn toggle_mute(&self) -> Result<bool> {
262 if self.mute.load(Ordering::SeqCst) {
263 self.mute.store(false, Ordering::SeqCst);
264 self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
265 self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
266 Ok(false)
267 } else {
268 self.mute.store(true, Ordering::SeqCst);
269 Ok(true)
270 }
271 }
272
273 pub fn mute(&self, mute: bool) -> Result<()> {
274 if mute != self.mute.load(Ordering::SeqCst) {
275 self.mute.store(mute, Ordering::SeqCst);
276 if !mute {
277 self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
278 self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
279 }
280 }
281
282 Ok(())
283 }
284
285 pub async fn run(self: &Arc<Self>, stop: Receiver<()>) -> Result<()> {
286 if self.running.load(Ordering::SeqCst) {
287 return Err(Error::AlreadyRunning);
288 }
289
290 'outer: loop {
291 let termination = Channel::<Termination>::oneshot();
292
293 self.start_time.lock().unwrap().replace(Instant::now());
294
295 let proc = {
296 let program = self.program();
297 let args = &self.args();
298
299 let args: SpawnArgs = args.as_slice().into();
300 let options = SpawnOptions::new();
301 if let Some(cwd) = &self.cwd() {
302 options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
303 panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
304 }));
305 }
306
307 Arc::new(spawn_with_args_and_options(&program, &args, &options))
308 };
309
310 let this = self.clone();
311 let exit_sender = termination.sender.clone();
312 let exit = callback!(move |code: JsValue| {
313 let code = code.as_f64().unwrap_or_default() as u32;
314 this.events.sender.try_send(Event::Exit(code)).ok();
315 exit_sender
316 .try_send(Termination::Exit(code))
317 .expect("unable to send close notification");
318 });
319 proc.on("exit", exit.as_ref());
320 self.callbacks.retain(exit.clone())?;
321
322 let this = self.clone();
323 let error_sender = termination.sender.clone();
324 let error = callback!(move |err: JsValue| {
325 let msg = JsErrorData::from(err);
326 this.events
327 .sender
328 .try_send(Event::Error(msg.to_string()))
329 .ok();
330 error_sender
331 .try_send(Termination::Error(msg.to_string()))
332 .expect("unable to send close notification");
333 });
334 proc.on("error", error.as_ref());
335 self.callbacks.retain(error.clone())?;
336
337 let this = self.clone();
338 let stdout_cb = callback!(move |data: buffer::Buffer| {
339 if this.mute.load(Ordering::SeqCst) {
340 this.buffer_muted(data, &this.muted_buffer_stdout);
341 } else {
342 this.events
343 .sender
344 .try_send(Event::Stdout(String::from(
345 data.to_string(None, None, None),
346 )))
347 .unwrap();
348 }
349 });
350 proc.stdout().on("data", stdout_cb.as_ref());
351 self.callbacks.retain(stdout_cb)?;
352
353 let this = self.clone();
354 let stderr_cb = callback!(move |data: buffer::Buffer| {
355 if this.mute.load(Ordering::SeqCst) {
356 this.buffer_muted(data, &this.muted_buffer_stderr);
357 } else {
358 this.events
359 .sender
360 .try_send(Event::Stderr(String::from(
361 data.to_string(None, None, None),
362 )))
363 .unwrap();
364 }
365 });
366 proc.stderr().on("data", stderr_cb.as_ref());
367 self.callbacks.retain(stderr_cb)?;
368
369 *self.proc.lock().unwrap() = Some(proc.clone());
370 self.running.store(true, Ordering::SeqCst);
371
372 self.events.sender.try_send(Event::Start).unwrap();
373
374 let kill = select! {
375 e = termination.receiver.recv().fuse() => {
377
378 if matches!(e,Ok(Termination::Error(_))) {
380 break;
381 }
382
383 if !self.restart.load(Ordering::SeqCst) {
385 break;
386 } else {
387 let restart_delay = *self.restart_delay.lock().unwrap();
389 select! {
390 _ = sleep(restart_delay).fuse() => {
392 false
393 },
394 _ = stop.recv().fuse() => {
396 break;
397 }
398 }
399 }
400 },
401 _ = stop.recv().fuse() => {
403 true
404 }
405 };
406
407 if kill {
408 self.restart.store(false, Ordering::SeqCst);
410 proc.kill_with_signal(KillSignal::SIGTERM);
411 if !self.use_force.load(Ordering::SeqCst) {
413 termination.receiver.recv().await?;
414 break;
415 } else {
416 let use_force_delay = sleep(*self.use_force_delay.lock().unwrap());
418 select! {
419 _ = termination.receiver.recv().fuse() => {
421 break 'outer;
422 },
423 _ = use_force_delay.fuse() => {
425 proc.kill_with_signal(KillSignal::SIGKILL);
426 termination.receiver.recv().await?;
427 break 'outer;
428 },
429 }
430 }
431 }
432 }
433
434 self.callbacks.clear();
435 *self.proc.lock().unwrap() = None;
436 self.running.store(false, Ordering::SeqCst);
437
438 Ok(())
439 }
440}
441
442#[derive(Clone)]
447pub struct Process {
448 inner: Arc<Inner>,
449 task: Arc<Task<Arc<Inner>, ()>>,
450}
451
452unsafe impl Send for Process {}
453unsafe impl Sync for Process {}
454
455impl Process {
456 pub fn new(options: Options) -> Process {
458 let inner = Arc::new(Inner::new(options));
459
460 let task = task!(|inner: Arc<Inner>, stop| async move {
461 inner.run(stop).await.ok();
462 });
463
464 Process {
465 inner,
466 task: Arc::new(task),
467 }
468 }
469
470 pub fn new_once(path: &str) -> Process {
471 let options = Options::new(
472 &[path],
473 None,
474 false,
475 None,
476 false,
477 None,
480 Channel::unbounded(),
481 None,
482 false,
483 );
484
485 Self::new(options)
486 }
487
488 pub async fn version(path: &str) -> Result<Version> {
489 version(path).await
490 }
491
492 pub fn is_running(&self) -> bool {
493 self.inner.running.load(Ordering::SeqCst)
494 }
495
496 pub fn mute(&self, mute: bool) -> Result<()> {
497 self.inner.mute(mute)
498 }
499
500 pub fn toggle_mute(&self) -> Result<bool> {
501 self.inner.toggle_mute()
502 }
503
504 pub fn uptime(&self) -> Option<Duration> {
505 self.inner.uptime()
506 }
507
508 pub fn events(&self) -> Receiver<Event> {
511 self.inner.events.receiver.clone()
512 }
513
514 pub fn replace_argv(&self, argv: Vec<String>) {
515 *self.inner.argv.lock().unwrap() = argv;
516 }
517
518 pub fn run(&self) -> Result<()> {
522 self.task.run(self.inner.clone())?;
523 Ok(())
524 }
525
526 pub fn kill(&self) -> Result<()> {
528 if !self.inner.running.load(Ordering::SeqCst) {
529 Err(Error::NotRunning)
530 } else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
531 self.inner.restart.store(false, Ordering::SeqCst);
532 proc.kill_with_signal(KillSignal::SIGKILL);
533 Ok(())
534 } else {
535 Err(Error::ProcIsAbsent)
536 }
537 }
538
539 pub fn restart(&self) -> Result<()> {
542 if !self.inner.running.load(Ordering::SeqCst) {
543 Err(Error::NotRunning)
544 } else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
545 proc.kill_with_signal(KillSignal::SIGTERM);
546 Ok(())
547 } else {
548 Err(Error::ProcIsAbsent)
549 }
550 }
551
552 pub fn stop(&self) -> Result<()> {
556 if self.inner.running.load(Ordering::SeqCst) {
557 self.inner.restart.store(false, Ordering::SeqCst);
558 self.task.stop()?;
559 }
560
561 Ok(())
562 }
563
564 pub async fn join(&self) -> Result<()> {
567 if self.task.is_running() {
568 self.task.join().await?;
569 }
570 Ok(())
571 }
572
573 pub async fn stop_and_join(&self) -> Result<()> {
575 self.stop()?;
576 self.join().await?;
577 Ok(())
578 }
579}
580
581pub async fn exec(
585 argv: &[&str],
587 cwd: Option<PathBuf>,
588) -> Result<ExecutionResult> {
589 let proc = *argv.first().unwrap();
590
591 let args: SpawnArgs = argv[1..].into();
592 let options = SpawnOptions::new();
593 if let Some(cwd) = cwd {
594 options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
595 panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
596 }));
597 }
598
599 let termination = Channel::<Termination>::oneshot();
600 let (stdout_tx, stdout_rx) = oneshot();
601 let (stderr_tx, stderr_rx) = oneshot();
602
603 let cp = spawn_with_args_and_options(proc, &args, &options);
604
605 let exit = termination.sender.clone();
606 let exit = callback!(move |code: u32| {
607 exit.try_send(Termination::Exit(code))
608 .expect("unable to send close notification");
609 });
610 cp.on("exit", exit.as_ref());
611
612 let error = termination.sender.clone();
613 let error = callback!(move |err: JsValue| {
614 error
615 .try_send(Termination::Error(format!("{:?}", err)))
616 .expect("unable to send close notification");
617 });
618 cp.on("error", error.as_ref());
619
620 let stdout_cb = callback!(move |data: buffer::Buffer| {
621 stdout_tx
622 .try_send(String::from(data.to_string(None, None, None)))
623 .expect("unable to send stdout data");
624 });
625 cp.stdout().on("data", stdout_cb.as_ref());
626
627 let stderr_cb = callback!(move |data: buffer::Buffer| {
628 stderr_tx
629 .try_send(String::from(data.to_string(None, None, None)))
630 .expect("unable to send stderr data");
631 });
632 cp.stderr().on("data", stderr_cb.as_ref());
633
634 let termination = termination.recv().await?;
635
636 let mut stdout = String::new();
637 for _ in 0..stdout_rx.len() {
638 stdout.push_str(&stdout_rx.try_recv()?);
639 }
640
641 let mut stderr = String::new();
642 for _ in 0..stderr_rx.len() {
643 stderr.push_str(&stdout_rx.try_recv()?);
644 }
645
646 Ok(ExecutionResult {
647 termination,
648 stdout,
649 stderr,
650 })
651}
652
653pub async fn version(proc: &str) -> Result<Version> {
655 let text = exec([proc, "--version"].as_slice(), None).await?.stdout;
656 let vstr = if let Some(vstr) = text.split_whitespace().last() {
657 vstr
658 } else {
659 return Ok(Version::none());
660 };
661
662 let v = vstr
663 .split('.')
664 .flat_map(|v| v.parse::<u64>())
665 .collect::<Vec<_>>();
666
667 if v.len() != 3 {
668 return Ok(Version::none());
669 }
670
671 Ok(Version::new(v[0], v[1], v[2]))
672}
673
674pub fn trim(mut s: String) -> String {
675 if s.ends_with('\n') {
677 s.pop();
678 if s.ends_with('\r') {
679 s.pop();
680 }
681 }
682 s
683}
684
685pub async fn test_child_process() {
687 log_info!("running rust test() fn");
688 workflow_wasm::panic::init_console_panic_hook();
689
690 let proc = Process::new(Options::new(
691 &["/Users/aspect/dev/kaspa-dev/kaspad/kaspad"],
692 None,
693 true,
694 Some(Duration::from_millis(3000)),
695 true,
696 Some(Duration::from_millis(100)),
697 Channel::unbounded(),
698 None,
699 false,
700 ));
701 let task = task!(|events: Receiver<Event>, stop: Receiver<()>| async move {
703 loop {
704 select! {
705 v = events.recv().fuse() => {
706 if let Ok(v) = v {
707 log_info!("| {:?}",v);
708 }
709 },
710 _ = stop.recv().fuse() => {
711 log_info!("stop...");
712 break;
713 }
714 }
715 log_info!("in loop");
716 }
717 });
718 task.run(proc.events()).expect("task.run()");
719
720 proc.run().expect("proc.run()");
721
722 sleep(Duration::from_millis(5_000)).await;
723
724 proc.stop_and_join()
725 .await
726 .expect("proc.stop_and_join() failure");
727 task.stop_and_join()
728 .await
729 .expect("task.stop_and_join() failure");
730}