1use super::{ProcessKey, ProcessManager};
4use std::{borrow::Cow, future::Future, os::fd::OwnedFd, pin::Pin, time::Duration};
5use tokio::sync::mpsc;
6
7pub type ReturnFuture =
8 sync_wrapper::SyncFuture<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>;
9pub type StringCallback =
10 Box<dyn Fn(ProcessManager, ProcessKey, String) -> ReturnFuture + Send + Sync + 'static>;
11pub type StartedCallback =
12 Box<dyn Fn(ProcessManager, ProcessKey, bool) -> ReturnFuture + Send + Sync + 'static>;
13pub type ExitedCallback = Box<
17 dyn Fn(ProcessManager, ProcessKey, Option<i32>, bool) -> ReturnFuture + Send + Sync + 'static,
18>;
19pub type BlockingCallback = Box<dyn Fn(ProcessManager, ProcessKey, bool) + Send + Sync + 'static>;
20
21#[derive(Default)]
22pub(crate) struct ProcessCallbacks {
23 pub(crate) on_stdout: Option<StringCallback>,
24 pub(crate) on_stderr: Option<StringCallback>,
25 pub(crate) on_start: Option<StartedCallback>,
26 pub(crate) on_exit: Option<ExitedCallback>,
27 pub(crate) fds: Option<Box<dyn FnOnce() -> Vec<OwnedFd> + Send + Sync + 'static>>,
28}
29
30pub struct Process {
31 pub(crate) executable: String,
32 pub(crate) args: Vec<String>,
33 pub(crate) env: Vec<(String, String)>,
34 pub(crate) callbacks: ProcessCallbacks,
35 pub(crate) stdin_tx: mpsc::Sender<Cow<'static, [u8]>>,
36 pub(crate) stdin_rx: Option<mpsc::Receiver<Cow<'static, [u8]>>>,
37 pub(crate) cancel_timeout: Option<Duration>,
38}
39
40impl Process {
41 pub fn new() -> Self {
42 let (stdin_tx, stdin_rx) = mpsc::channel(10);
43 Self {
44 executable: String::new(),
45 args: Vec::new(),
46 env: Vec::new(),
47 callbacks: ProcessCallbacks::default(),
48 stdin_tx,
49 stdin_rx: Some(stdin_rx),
50 cancel_timeout: Some(Duration::from_secs(1)),
51 }
52 }
53
54 pub fn with_executable(mut self, executable: impl ToString) -> Self {
56 self.executable = executable.to_string();
57 self
58 }
59
60 pub fn with_args(mut self, args: impl IntoIterator<Item = impl ToString>) -> Self {
62 self.args = args.into_iter().map(|s| s.to_string()).collect();
63 self
64 }
65
66 pub fn with_cancel_timeout(mut self, t: impl Into<Option<Duration>>) -> Self {
68 self.cancel_timeout = t.into();
69 self
70 }
71
72 pub fn with_env(
74 mut self,
75 env: impl IntoIterator<Item = (impl ToString, impl ToString)>,
76 ) -> Self {
77 self.env = env
78 .into_iter()
79 .map(|(k, v)| (k.to_string(), v.to_string()))
80 .collect();
81 self
82 }
83
84 pub fn with_on_stdout<F, A>(mut self, on_stdout: F) -> Self
86 where
87 F: Fn(ProcessManager, ProcessKey, String) -> A + Unpin + Send + Sync + 'static,
88 A: Future<Output = ()> + Send + 'static,
89 {
90 self.callbacks.on_stdout = Some(Box::new(move |p, k, s| {
91 sync_wrapper::SyncFuture::new(Box::pin(on_stdout(p, k, s)))
92 }));
93 self
94 }
95
96 pub fn with_on_stderr<F, A>(mut self, on_stderr: F) -> Self
98 where
99 F: Fn(ProcessManager, ProcessKey, String) -> A + Unpin + Send + Sync + 'static,
100 A: Future<Output = ()> + Send + 'static,
101 {
102 self.callbacks.on_stderr = Some(Box::new(move |p, k, s| {
103 sync_wrapper::SyncFuture::new(Box::pin(on_stderr(p, k, s)))
104 }));
105 self
106 }
107
108 pub fn with_fds<F>(mut self, fds: F) -> Self
111 where
112 F: FnOnce() -> Vec<OwnedFd> + Send + Sync + 'static,
113 {
114 self.callbacks.fds = Some(Box::new(fds));
115 self
116 }
117
118 pub fn with_on_start<F, A>(mut self, on_start: F) -> Self
123 where
124 F: Fn(ProcessManager, ProcessKey, bool) -> A + Unpin + Send + Sync + 'static,
125 A: Future<Output = ()> + Send + 'static,
126 {
127 self.callbacks.on_start = Some(Box::new(move |p, k, r| {
128 sync_wrapper::SyncFuture::new(Box::pin(on_start(p, k, r)))
129 }));
130 self
131 }
132
133 pub fn with_on_exit<F, A>(mut self, on_exit: F) -> Self
139 where
140 F: Fn(ProcessManager, ProcessKey, Option<i32>, bool) -> A + Unpin + Send + Sync + 'static,
141 A: Future<Output = ()> + Send + 'static,
142 {
143 self.callbacks.on_exit = Some(Box::new(move |p, k, code, restarting| {
144 sync_wrapper::SyncFuture::new(Box::pin(on_exit(p, k, code, restarting)))
145 }));
146 self
147 }
148
149 pub(crate) fn exe_text(&self) -> Cow<'_, str> {
152 if self.executable.contains(' ') {
153 Cow::Owned(format!("\"{}\"", self.executable))
154 } else {
155 Cow::Borrowed(&self.executable)
156 }
157 }
158
159 pub(crate) fn env_text(&self) -> Cow<'static, str> {
162 if self.env.is_empty() {
163 Cow::Borrowed("")
164 } else {
165 Cow::Owned(self.env.iter().fold(String::new(), |acc, (k, v)| {
166 if v.contains(' ') {
167 format!("{} {}=\"{}\"", acc, k, v)
168 } else {
169 format!("{} {}={}", acc, k, v)
170 }
171 }))
172 }
173 }
174
175 pub(crate) fn args_text(&self) -> Cow<'static, str> {
178 if self.args.is_empty() {
179 Cow::Borrowed("")
180 } else {
181 Cow::Owned(self.args.iter().fold(String::new(), |acc, arg| {
182 if arg.contains(' ') {
183 format!("{} \"{}\"", acc, arg)
184 } else {
185 format!("{} {}", acc, arg)
186 }
187 }))
188 }
189 }
190}