Skip to main content

launch_pad/
process.rs

1// SPDX-License-Identifier: MPL-2.0
2
3use super::{ProcessKey, ProcessManager};
4use std::{borrow::Cow, future::Future, os::fd::OwnedFd, pin::Pin, time::Duration};
5use tokio::sync::mpsc;
6
7pub type ReturnFuture =
8    sync_wrapper::SyncFuture<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>;
9pub type StringCallback =
10    Box<dyn Fn(ProcessManager, ProcessKey, String) -> ReturnFuture + Send + Sync + 'static>;
11pub type StartedCallback =
12    Box<dyn Fn(ProcessManager, ProcessKey, bool) -> ReturnFuture + Send + Sync + 'static>;
13/// Type for the callback that is called when a process exits. Arguments are the
14/// process manager, the process key, the error code to return, and a
15/// bool indicating if the process is going to be restarted.
16pub type ExitedCallback = Box<
17    dyn Fn(ProcessManager, ProcessKey, Option<i32>, bool) -> ReturnFuture + Send + Sync + 'static,
18>;
19pub type BlockingCallback = Box<dyn Fn(ProcessManager, ProcessKey, bool) + Send + Sync + 'static>;
20
21#[derive(Default)]
22pub(crate) struct ProcessCallbacks {
23    pub(crate) on_stdout: Option<StringCallback>,
24    pub(crate) on_stderr: Option<StringCallback>,
25    pub(crate) on_start: Option<StartedCallback>,
26    pub(crate) on_exit: Option<ExitedCallback>,
27    pub(crate) fds: Option<Box<dyn FnOnce() -> Vec<OwnedFd> + Send + Sync + 'static>>,
28}
29
30pub struct Process {
31    pub(crate) executable: String,
32    pub(crate) args: Vec<String>,
33    pub(crate) env: Vec<(String, String)>,
34    pub(crate) callbacks: ProcessCallbacks,
35    pub(crate) stdin_tx: mpsc::Sender<Cow<'static, [u8]>>,
36    pub(crate) stdin_rx: Option<mpsc::Receiver<Cow<'static, [u8]>>>,
37    pub(crate) cancel_timeout: Option<Duration>,
38}
39
40impl Process {
41    pub fn new() -> Self {
42        let (stdin_tx, stdin_rx) = mpsc::channel(10);
43        Self {
44            executable: String::new(),
45            args: Vec::new(),
46            env: Vec::new(),
47            callbacks: ProcessCallbacks::default(),
48            stdin_tx,
49            stdin_rx: Some(stdin_rx),
50            cancel_timeout: Some(Duration::from_secs(1)),
51        }
52    }
53
54    /// Sets the executable to run.
55    pub fn with_executable(mut self, executable: impl ToString) -> Self {
56        self.executable = executable.to_string();
57        self
58    }
59
60    /// Sets the arguments to pass to the executable.
61    pub fn with_args(mut self, args: impl IntoIterator<Item = impl ToString>) -> Self {
62        self.args = args.into_iter().map(|s| s.to_string()).collect();
63        self
64    }
65
66    /// Sets the cancellation timeout before forcing the process to exit.
67    pub fn with_cancel_timeout(mut self, t: impl Into<Option<Duration>>) -> Self {
68        self.cancel_timeout = t.into();
69        self
70    }
71
72    /// Sets the environment variables to pass to the executable.
73    pub fn with_env(
74        mut self,
75        env: impl IntoIterator<Item = (impl ToString, impl ToString)>,
76    ) -> Self {
77        self.env = env
78            .into_iter()
79            .map(|(k, v)| (k.to_string(), v.to_string()))
80            .collect();
81        self
82    }
83
84    /// Sets the callback to run when the process writes to stdout.
85    pub fn with_on_stdout<F, A>(mut self, on_stdout: F) -> Self
86    where
87        F: Fn(ProcessManager, ProcessKey, String) -> A + Unpin + Send + Sync + 'static,
88        A: Future<Output = ()> + Send + 'static,
89    {
90        self.callbacks.on_stdout = Some(Box::new(move |p, k, s| {
91            sync_wrapper::SyncFuture::new(Box::pin(on_stdout(p, k, s)))
92        }));
93        self
94    }
95
96    /// Sets the callback to run when the process writes to stderr.
97    pub fn with_on_stderr<F, A>(mut self, on_stderr: F) -> Self
98    where
99        F: Fn(ProcessManager, ProcessKey, String) -> A + Unpin + Send + Sync + 'static,
100        A: Future<Output = ()> + Send + 'static,
101    {
102        self.callbacks.on_stderr = Some(Box::new(move |p, k, s| {
103            sync_wrapper::SyncFuture::new(Box::pin(on_stderr(p, k, s)))
104        }));
105        self
106    }
107
108    /// Shares Fds with the child process
109    /// Closure produces a vector of Fd to share with the child process
110    pub fn with_fds<F>(mut self, fds: F) -> Self
111    where
112        F: FnOnce() -> Vec<OwnedFd> + Send + Sync + 'static,
113    {
114        self.callbacks.fds = Some(Box::new(fds));
115        self
116    }
117
118    /// This is called when the process is started.
119    ///
120    /// It passes a single argument: a bool indicating whether the process was
121    /// restarted or if it was started for the first time.
122    pub fn with_on_start<F, A>(mut self, on_start: F) -> Self
123    where
124        F: Fn(ProcessManager, ProcessKey, bool) -> A + Unpin + Send + Sync + 'static,
125        A: Future<Output = ()> + Send + 'static,
126    {
127        self.callbacks.on_start = Some(Box::new(move |p, k, r| {
128            sync_wrapper::SyncFuture::new(Box::pin(on_start(p, k, r)))
129        }));
130        self
131    }
132
133    /// Sets the callback to run when the process exits.
134    /// This is called after the process exits, or before it restarts.
135    ///
136    /// It passes two arguments: an optional exit code, and a bool indicating
137    /// whether the process is going to be restarted or not.
138    pub fn with_on_exit<F, A>(mut self, on_exit: F) -> Self
139    where
140        F: Fn(ProcessManager, ProcessKey, Option<i32>, bool) -> A + Unpin + Send + Sync + 'static,
141        A: Future<Output = ()> + Send + 'static,
142    {
143        self.callbacks.on_exit = Some(Box::new(move |p, k, code, restarting| {
144            sync_wrapper::SyncFuture::new(Box::pin(on_exit(p, k, code, restarting)))
145        }));
146        self
147    }
148
149    /// Returns a human readable, escaped string of the executable.
150    /// Used for logging.
151    pub(crate) fn exe_text(&self) -> Cow<'_, str> {
152        if self.executable.contains(' ') {
153            Cow::Owned(format!("\"{}\"", self.executable))
154        } else {
155            Cow::Borrowed(&self.executable)
156        }
157    }
158
159    /// Returns a human readable, escaped string of the environment variables.
160    /// Used for logging.
161    pub(crate) fn env_text(&self) -> Cow<'static, str> {
162        if self.env.is_empty() {
163            Cow::Borrowed("")
164        } else {
165            Cow::Owned(self.env.iter().fold(String::new(), |acc, (k, v)| {
166                if v.contains(' ') {
167                    format!("{} {}=\"{}\"", acc, k, v)
168                } else {
169                    format!("{} {}={}", acc, k, v)
170                }
171            }))
172        }
173    }
174
175    /// Returns a human readable, escaped string of the arguments.
176    /// Used for logging.
177    pub(crate) fn args_text(&self) -> Cow<'static, str> {
178        if self.args.is_empty() {
179            Cow::Borrowed("")
180        } else {
181            Cow::Owned(self.args.iter().fold(String::new(), |acc, arg| {
182                if arg.contains(' ') {
183                    format!("{} \"{}\"", acc, arg)
184                } else {
185                    format!("{} {}", acc, arg)
186                }
187            }))
188        }
189    }
190}