use crate::child_process::{
spawn_with_args_and_options, ChildProcess, KillSignal, SpawnArgs, SpawnOptions,
};
use crate::error::Error;
use crate::result::Result;
use borsh::{BorshDeserialize, BorshSerialize};
use futures::{select, FutureExt};
use node_sys::*;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use wasm_bindgen::prelude::*;
use workflow_core::channel::{oneshot, Channel, Receiver, Sender};
use workflow_core::task::*;
use workflow_core::time::Instant;
use workflow_log::*;
use workflow_task::*;
use workflow_wasm::callback::*;
use workflow_wasm::jserror::*;
pub struct Version {
pub major: u64,
pub minor: u64,
pub patch: u64,
pub none: bool,
}
impl Version {
pub fn new(major: u64, minor: u64, patch: u64) -> Version {
Version {
major,
minor,
patch,
none: false,
}
}
pub fn none() -> Version {
Version {
major: 0,
minor: 0,
patch: 0,
none: true,
}
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.none {
write!(f, "n/a")
} else {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
}
pub struct ExecutionResult {
pub termination: Termination,
pub stdout: String,
pub stderr: String,
}
impl ExecutionResult {
pub fn is_error(&self) -> bool {
matches!(self.termination, Termination::Error(_))
}
}
pub enum Termination {
Exit(u32),
Error(String),
}
#[derive(Debug, Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize)]
pub enum Event {
Start,
Exit(u32),
Error(String),
Stdout(String),
Stderr(String),
}
pub struct Options {
argv: Vec<String>,
cwd: Option<PathBuf>,
restart: bool,
restart_delay: Duration,
use_force: bool,
use_force_delay: Duration,
events: Channel<Event>,
muted_buffer_capacity: Option<usize>,
mute: bool,
}
#[allow(clippy::too_many_arguments)]
impl Options {
pub fn new(
argv: &[&str],
cwd: Option<PathBuf>,
restart: bool,
restart_delay: Option<Duration>,
use_force: bool,
use_force_delay: Option<Duration>,
events: Channel<Event>,
muted_buffer_capacity: Option<usize>,
mute: bool,
) -> Options {
let argv = argv.iter().map(|s| s.to_string()).collect::<Vec<_>>();
Options {
argv,
cwd,
restart,
restart_delay: restart_delay.unwrap_or_default(),
use_force,
use_force_delay: use_force_delay.unwrap_or(Duration::from_millis(10_000)),
events,
muted_buffer_capacity,
mute,
}
}
}
impl Default for Options {
fn default() -> Self {
Self {
argv: Vec::new(),
cwd: None,
restart: true,
restart_delay: Duration::from_millis(3_000),
use_force: false,
use_force_delay: Duration::from_millis(10_000),
events: Channel::unbounded(),
muted_buffer_capacity: None,
mute: false,
}
}
}
struct Inner {
argv: Mutex<Vec<String>>,
cwd: Mutex<Option<PathBuf>>,
running: AtomicBool,
restart: AtomicBool,
restart_delay: Mutex<Duration>,
use_force: AtomicBool,
use_force_delay: Mutex<Duration>,
events: Channel<Event>,
proc: Arc<Mutex<Option<Arc<ChildProcess>>>>,
callbacks: CallbackMap,
start_time: Arc<Mutex<Option<Instant>>>,
mute: Arc<AtomicBool>,
muted_buffer_capacity: Option<usize>,
muted_buffer_stdout: Arc<Mutex<VecDeque<String>>>,
muted_buffer_stderr: Arc<Mutex<VecDeque<String>>>,
}
unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}
impl Inner {
pub fn new(options: Options) -> Inner {
Inner {
argv: Mutex::new(options.argv),
cwd: Mutex::new(options.cwd),
running: AtomicBool::new(false),
restart: AtomicBool::new(options.restart),
restart_delay: Mutex::new(options.restart_delay),
use_force: AtomicBool::new(options.use_force),
use_force_delay: Mutex::new(options.use_force_delay),
events: options.events,
proc: Arc::new(Mutex::new(None)),
callbacks: CallbackMap::new(),
start_time: Arc::new(Mutex::new(None)),
mute: Arc::new(AtomicBool::new(options.mute)),
muted_buffer_capacity: options.muted_buffer_capacity,
muted_buffer_stdout: Arc::new(Mutex::new(VecDeque::default())),
muted_buffer_stderr: Arc::new(Mutex::new(VecDeque::default())),
}
}
fn program(&self) -> String {
self.argv.lock().unwrap().get(0).unwrap().clone()
}
fn args(&self) -> Vec<String> {
self.argv.lock().unwrap()[1..].to_vec()
}
fn cwd(&self) -> Option<PathBuf> {
self.cwd.lock().unwrap().clone()
}
pub fn uptime(&self) -> Option<Duration> {
if self.running.load(Ordering::SeqCst) {
self.start_time.lock().unwrap().map(|ts| ts.elapsed())
} else {
None
}
}
fn buffer_muted(&self, data: buffer::Buffer, muted_buffer: &Arc<Mutex<VecDeque<String>>>) {
let muted_buffer_capacity = self.muted_buffer_capacity.unwrap_or_default();
if muted_buffer_capacity > 0 {
let mut muted_buffer = muted_buffer.lock().unwrap();
let buffer = String::from(data.to_string(None, None, None));
let lines = buffer.split('\n').collect::<Vec<_>>();
for line in lines {
let line = line.trim();
if !line.is_empty() {
muted_buffer.push_back(trim(line.to_string()));
}
}
while muted_buffer.len() > muted_buffer_capacity {
muted_buffer.pop_front();
}
}
}
fn drain_muted(
&self,
acc: &Arc<Mutex<VecDeque<String>>>,
sender: &Sender<Event>,
stdout: bool,
) -> Result<()> {
let mut acc = acc.lock().unwrap();
if stdout {
acc.drain(..).for_each(|line| {
sender.try_send(Event::Stdout(line)).unwrap();
});
} else {
acc.drain(..).for_each(|line| {
sender.try_send(Event::Stderr(line)).unwrap();
});
}
Ok(())
}
pub fn toggle_mute(&self) -> Result<bool> {
if self.mute.load(Ordering::SeqCst) {
self.mute.store(false, Ordering::SeqCst);
self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
Ok(false)
} else {
self.mute.store(true, Ordering::SeqCst);
Ok(true)
}
}
pub fn mute(&self, mute: bool) -> Result<()> {
if mute != self.mute.load(Ordering::SeqCst) {
self.mute.store(mute, Ordering::SeqCst);
if !mute {
self.drain_muted(&self.muted_buffer_stdout, &self.events.sender, true)?;
self.drain_muted(&self.muted_buffer_stderr, &self.events.sender, false)?;
}
}
Ok(())
}
pub async fn run(self: &Arc<Self>, stop: Receiver<()>) -> Result<()> {
if self.running.load(Ordering::SeqCst) {
return Err(Error::AlreadyRunning);
}
'outer: loop {
let termination = Channel::<Termination>::oneshot();
self.start_time.lock().unwrap().replace(Instant::now());
let proc = {
let program = self.program();
let args = &self.args();
let args: SpawnArgs = args.as_slice().into();
let options = SpawnOptions::new();
if let Some(cwd) = &self.cwd() {
options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
}));
}
Arc::new(spawn_with_args_and_options(&program, &args, &options))
};
let this = self.clone();
let exit_sender = termination.sender.clone();
let exit = callback!(move |code: JsValue| {
let code = code.as_f64().unwrap_or_default() as u32;
this.events.sender.try_send(Event::Exit(code)).ok();
exit_sender
.try_send(Termination::Exit(code))
.expect("unable to send close notification");
});
proc.on("exit", exit.as_ref());
self.callbacks.retain(exit.clone())?;
let this = self.clone();
let error_sender = termination.sender.clone();
let error = callback!(move |err: JsValue| {
let msg = JsErrorData::from(err);
this.events
.sender
.try_send(Event::Error(msg.to_string()))
.ok();
error_sender
.try_send(Termination::Error(msg.to_string()))
.expect("unable to send close notification");
});
proc.on("error", error.as_ref());
self.callbacks.retain(error.clone())?;
let this = self.clone();
let stdout_cb = callback!(move |data: buffer::Buffer| {
if this.mute.load(Ordering::SeqCst) {
this.buffer_muted(data, &this.muted_buffer_stdout);
} else {
this.events
.sender
.try_send(Event::Stdout(String::from(
data.to_string(None, None, None),
)))
.unwrap();
}
});
proc.stdout().on("data", stdout_cb.as_ref());
self.callbacks.retain(stdout_cb)?;
let this = self.clone();
let stderr_cb = callback!(move |data: buffer::Buffer| {
if this.mute.load(Ordering::SeqCst) {
this.buffer_muted(data, &this.muted_buffer_stderr);
} else {
this.events
.sender
.try_send(Event::Stderr(String::from(
data.to_string(None, None, None),
)))
.unwrap();
}
});
proc.stderr().on("data", stderr_cb.as_ref());
self.callbacks.retain(stderr_cb)?;
*self.proc.lock().unwrap() = Some(proc.clone());
self.running.store(true, Ordering::SeqCst);
self.events.sender.try_send(Event::Start).unwrap();
let kill = select! {
e = termination.receiver.recv().fuse() => {
if matches!(e,Ok(Termination::Error(_))) {
break;
}
if !self.restart.load(Ordering::SeqCst) {
break;
} else {
let restart_delay = *self.restart_delay.lock().unwrap();
select! {
_ = sleep(restart_delay).fuse() => {
false
},
_ = stop.recv().fuse() => {
break;
}
}
}
},
_ = stop.recv().fuse() => {
true
}
};
if kill {
self.restart.store(false, Ordering::SeqCst);
proc.kill_with_signal(KillSignal::SIGTERM);
if !self.use_force.load(Ordering::SeqCst) {
termination.receiver.recv().await?;
break;
} else {
let use_force_delay = sleep(*self.use_force_delay.lock().unwrap());
select! {
_ = termination.receiver.recv().fuse() => {
break 'outer;
},
_ = use_force_delay.fuse() => {
proc.kill_with_signal(KillSignal::SIGKILL);
termination.receiver.recv().await?;
break 'outer;
},
}
}
}
}
self.callbacks.clear();
*self.proc.lock().unwrap() = None;
self.running.store(false, Ordering::SeqCst);
Ok(())
}
}
#[derive(Clone)]
pub struct Process {
inner: Arc<Inner>,
task: Arc<Task<Arc<Inner>, ()>>,
}
unsafe impl Send for Process {}
unsafe impl Sync for Process {}
impl Process {
pub fn new(options: Options) -> Process {
let inner = Arc::new(Inner::new(options));
let task = task!(|inner: Arc<Inner>, stop| async move {
inner.run(stop).await.ok();
});
Process {
inner,
task: Arc::new(task),
}
}
pub fn new_once(path: &str) -> Process {
let options = Options::new(
&[path],
None,
false,
None,
false,
None,
Channel::unbounded(),
None,
false,
);
Self::new(options)
}
pub async fn version(path: &str) -> Result<Version> {
version(path).await
}
pub fn is_running(&self) -> bool {
self.inner.running.load(Ordering::SeqCst)
}
pub fn mute(&self, mute: bool) -> Result<()> {
self.inner.mute(mute)
}
pub fn toggle_mute(&self) -> Result<bool> {
self.inner.toggle_mute()
}
pub fn uptime(&self) -> Option<Duration> {
self.inner.uptime()
}
pub fn events(&self) -> Receiver<Event> {
self.inner.events.receiver.clone()
}
pub fn replace_argv(&self, argv: Vec<String>) {
*self.inner.argv.lock().unwrap() = argv;
}
pub fn run(&self) -> Result<()> {
self.task.run(self.inner.clone())?;
Ok(())
}
pub fn kill(&self) -> Result<()> {
if !self.inner.running.load(Ordering::SeqCst) {
Err(Error::NotRunning)
} else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
self.inner.restart.store(false, Ordering::SeqCst);
proc.kill_with_signal(KillSignal::SIGKILL);
Ok(())
} else {
Err(Error::ProcIsAbsent)
}
}
pub fn restart(&self) -> Result<()> {
if !self.inner.running.load(Ordering::SeqCst) {
Err(Error::NotRunning)
} else if let Some(proc) = self.inner.proc.lock().unwrap().as_ref() {
proc.kill_with_signal(KillSignal::SIGTERM);
Ok(())
} else {
Err(Error::ProcIsAbsent)
}
}
pub fn stop(&self) -> Result<()> {
if self.inner.running.load(Ordering::SeqCst) {
self.inner.restart.store(false, Ordering::SeqCst);
self.task.stop()?;
}
Ok(())
}
pub async fn join(&self) -> Result<()> {
if self.task.is_running() {
self.task.join().await?;
}
Ok(())
}
pub async fn stop_and_join(&self) -> Result<()> {
self.stop()?;
self.join().await?;
Ok(())
}
}
pub async fn exec(
argv: &[&str],
cwd: Option<PathBuf>,
) -> Result<ExecutionResult> {
let proc = *argv.first().unwrap();
let args: SpawnArgs = argv[1..].into();
let options = SpawnOptions::new();
if let Some(cwd) = cwd {
options.cwd(cwd.as_os_str().to_str().unwrap_or_else(|| {
panic!("Process::exec_with_args(): invalid path: {}", cwd.display())
}));
}
let termination = Channel::<Termination>::oneshot();
let (stdout_tx, stdout_rx) = oneshot();
let (stderr_tx, stderr_rx) = oneshot();
let cp = spawn_with_args_and_options(proc, &args, &options);
let exit = termination.sender.clone();
let exit = callback!(move |code: u32| {
exit.try_send(Termination::Exit(code))
.expect("unable to send close notification");
});
cp.on("exit", exit.as_ref());
let error = termination.sender.clone();
let error = callback!(move |err: JsValue| {
error
.try_send(Termination::Error(format!("{:?}", err)))
.expect("unable to send close notification");
});
cp.on("error", error.as_ref());
let stdout_cb = callback!(move |data: buffer::Buffer| {
stdout_tx
.try_send(String::from(data.to_string(None, None, None)))
.expect("unable to send stdout data");
});
cp.stdout().on("data", stdout_cb.as_ref());
let stderr_cb = callback!(move |data: buffer::Buffer| {
stderr_tx
.try_send(String::from(data.to_string(None, None, None)))
.expect("unable to send stderr data");
});
cp.stderr().on("data", stderr_cb.as_ref());
let termination = termination.recv().await?;
let mut stdout = String::new();
for _ in 0..stdout_rx.len() {
stdout.push_str(&stdout_rx.try_recv()?);
}
let mut stderr = String::new();
for _ in 0..stderr_rx.len() {
stderr.push_str(&stdout_rx.try_recv()?);
}
Ok(ExecutionResult {
termination,
stdout,
stderr,
})
}
pub async fn version(proc: &str) -> Result<Version> {
let text = exec([proc, "--version"].as_slice(), None).await?.stdout;
let vstr = if let Some(vstr) = text.split_whitespace().last() {
vstr
} else {
return Ok(Version::none());
};
let v = vstr
.split('.')
.flat_map(|v| v.parse::<u64>())
.collect::<Vec<_>>();
if v.len() != 3 {
return Ok(Version::none());
}
Ok(Version::new(v[0], v[1], v[2]))
}
pub fn trim(mut s: String) -> String {
if s.ends_with('\n') {
s.pop();
if s.ends_with('\r') {
s.pop();
}
}
s
}
#[wasm_bindgen]
pub async fn test() {
log_info!("running rust test() fn");
workflow_wasm::panic::init_console_panic_hook();
let proc = Process::new(Options::new(
&["/Users/aspect/dev/kaspa-dev/kaspad/kaspad"],
None,
true,
Some(Duration::from_millis(3000)),
true,
Some(Duration::from_millis(100)),
Channel::unbounded(),
None,
false,
));
let task = task!(|events: Receiver<Event>, stop: Receiver<()>| async move {
loop {
select! {
v = events.recv().fuse() => {
if let Ok(v) = v {
log_info!("| {:?}",v);
}
},
_ = stop.recv().fuse() => {
log_info!("stop...");
break;
}
}
log_info!("in loop");
}
});
task.run(proc.events()).expect("task.run()");
proc.run().expect("proc.run()");
sleep(Duration::from_millis(5_000)).await;
log_info!("### === STOPPING PROCESS ...");
proc.stop_and_join()
.await
.expect("proc.stop_and_join() failure");
task.stop_and_join()
.await
.expect("task.stop_and_join() failure");
}