watchexec_supervisor/job/job.rs
1#![allow(clippy::must_use_candidate)] // Ticket-returning methods are supposed to be used without awaiting
2
3use std::{future::Future, sync::Arc, time::Duration};
4
5use process_wrap::tokio::TokioCommandWrap;
6use watchexec_signals::Signal;
7
8use crate::{command::Command, errors::SyncIoError, flag::Flag};
9
10use super::{
11 messages::{Control, ControlMessage, Ticket},
12 priority::{Priority, PrioritySender},
13 JobTaskContext,
14};
15
16/// A handle to a job task spawned in the supervisor.
17///
18/// A job is a task which manages a [`Command`]. It is responsible for spawning the command's
19/// program, for handling messages which control it, for managing the program's lifetime, and for
20/// collecting its exit status and some timing information.
21///
22/// Most of the methods here queue [`Control`]s to the job task and return [`Ticket`]s. Controls
23/// execute in order, except where noted. Tickets are futures which resolve when the corresponding
24/// control has been run. Unlike most futures, tickets don't need to be polled for controls to make
25/// progress; the future is only used to signal completion. Dropping a ticket will not drop the
26/// control, so it's safe to do so if you don't care about when the control completes.
27///
28/// Note that controls are not guaranteed to run, like if the job task stops or panics before a
29/// control is processed. If a job task stops gracefully, all pending tickets will resolve
30/// immediately. If a job task panics (outside of hooks, panics are bugs!), pending tickets will
31/// never resolve.
32///
33/// This struct is cloneable (internally it is made of Arcs). Dropping the last instance of a Job
34/// will close the job's control queue, which will cause the job task to stop gracefully. Note that
35/// a task graceful stop is not the same as a graceful stop of the contained command; when the job
36/// drops, the command will be dropped in turn, and forcefully terminated via `kill_on_drop`.
37#[derive(Debug, Clone)]
38pub struct Job {
39 pub(crate) command: Arc<Command>,
40 pub(crate) control_queue: PrioritySender,
41
42 /// Set to true when the command task has stopped gracefully.
43 pub(crate) gone: Flag,
44}
45
46impl Job {
47 /// The [`Command`] this job is managing.
48 pub fn command(&self) -> Arc<Command> {
49 self.command.clone()
50 }
51
52 /// If this job is dead.
53 pub fn is_dead(&self) -> bool {
54 self.gone.raised()
55 }
56
57 fn prepare_control(&self, control: Control) -> (Ticket, ControlMessage) {
58 let done = Flag::default();
59 (
60 Ticket {
61 job_gone: self.gone.clone(),
62 control_done: done.clone(),
63 },
64 ControlMessage { control, done },
65 )
66 }
67
68 pub(crate) fn send_controls<const N: usize>(
69 &self,
70 controls: [Control; N],
71 priority: Priority,
72 ) -> Ticket {
73 if N == 0 || self.gone.raised() {
74 Ticket::cancelled()
75 } else if N == 1 {
76 let control = controls.into_iter().next().expect("UNWRAP: N > 0");
77 let (ticket, control) = self.prepare_control(control);
78 self.control_queue.send(control, priority);
79 ticket
80 } else {
81 let mut last_ticket = None;
82 for control in controls {
83 let (ticket, control) = self.prepare_control(control);
84 last_ticket = Some(ticket);
85 self.control_queue.send(control, priority);
86 }
87 last_ticket.expect("UNWRAP: N > 0")
88 }
89 }
90
91 /// Send a control message to the command.
92 ///
93 /// All control messages are queued in the order they're sent and processed in order.
94 ///
95 /// In general prefer using the other methods on this struct rather than sending [`Control`]s
96 /// directly.
97 pub fn control(&self, control: Control) -> Ticket {
98 self.send_controls([control], Priority::Normal)
99 }
100
101 /// Start the command if it's not running.
102 pub fn start(&self) -> Ticket {
103 self.control(Control::Start)
104 }
105
106 /// Stop the command if it's running and wait for completion.
107 ///
108 /// If you don't want to wait for completion, use `signal(Signal::ForceStop)` instead.
109 pub fn stop(&self) -> Ticket {
110 self.control(Control::Stop)
111 }
112
113 /// Gracefully stop the command if it's running.
114 ///
115 /// The command will be sent `signal` and then given `grace` time before being forcefully
116 /// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
117 /// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
118 /// do anything.
119 pub fn stop_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
120 if cfg!(unix) {
121 self.control(Control::GracefulStop { signal, grace })
122 } else {
123 self.stop()
124 }
125 }
126
127 /// Restart the command if it's running, or start it if it's not.
128 pub fn restart(&self) -> Ticket {
129 self.send_controls([Control::Stop, Control::Start], Priority::Normal)
130 }
131
132 /// Gracefully restart the command if it's running, or start it if it's not.
133 ///
134 /// The command will be sent `signal` and then given `grace` time before being forcefully
135 /// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
136 /// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
137 /// do anything.
138 pub fn restart_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
139 if cfg!(unix) {
140 self.send_controls(
141 [Control::GracefulStop { signal, grace }, Control::Start],
142 Priority::Normal,
143 )
144 } else {
145 self.restart()
146 }
147 }
148
149 /// Restart the command if it's running, but don't start it if it's not.
150 pub fn try_restart(&self) -> Ticket {
151 self.control(Control::TryRestart)
152 }
153
154 /// Restart the command if it's running, but don't start it if it's not.
155 ///
156 /// The command will be sent `signal` and then given `grace` time before being forcefully
157 /// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
158 /// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
159 /// do anything.
160 pub fn try_restart_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
161 if cfg!(unix) {
162 self.control(Control::TryGracefulRestart { signal, grace })
163 } else {
164 self.try_restart()
165 }
166 }
167
168 /// Send a signal to the command.
169 ///
170 /// Sends a signal to the current program, if there is one. If there isn't, this is a no-op.
171 ///
172 /// On Windows, this is a no-op for all signals but [`Signal::ForceStop`], which tries to stop
173 /// the command like a `stop()` would, but doesn't wait for completion. This is because Windows
174 /// doesn't have signals; in future [`Hangup`](Signal::Hangup), [`Interrupt`](Signal::Interrupt),
175 /// and [`Terminate`](Signal::Terminate) may be implemented using [GenerateConsoleCtrlEvent],
176 /// see [tracking issue #219](https://github.com/watchexec/watchexec/issues/219).
177 ///
178 /// [GenerateConsoleCtrlEvent]: https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent
179 pub fn signal(&self, sig: Signal) -> Ticket {
180 self.control(Control::Signal(sig))
181 }
182
183 /// Stop the command, then mark it for garbage collection.
184 ///
185 /// The underlying control messages are sent like normal, so they wait for all pending controls
186 /// to process. If you want to delete the command immediately, use `delete_now()`.
187 pub fn delete(&self) -> Ticket {
188 self.send_controls([Control::Stop, Control::Delete], Priority::Normal)
189 }
190
191 /// Stop the command immediately, then mark it for garbage collection.
192 ///
193 /// The underlying control messages are sent with higher priority than normal, so they bypass
194 /// all others. If you want to delete after all current controls are processed, use `delete()`.
195 pub fn delete_now(&self) -> Ticket {
196 self.send_controls([Control::Stop, Control::Delete], Priority::Urgent)
197 }
198
199 /// Get a future which resolves when the command ends.
200 ///
201 /// If the command is not running, the future resolves immediately.
202 ///
203 /// The underlying control message is sent with higher priority than normal, so it targets the
204 /// actively running command, not the one that will be running after the rest of the controls
205 /// get done; note that may still be racy if the command ends between the time the message is
206 /// sent and the time it's processed.
207 pub fn to_wait(&self) -> Ticket {
208 self.send_controls([Control::NextEnding], Priority::High)
209 }
210
211 /// Run an arbitrary function.
212 ///
213 /// The function is given [`&JobTaskContext`](JobTaskContext), which contains the state of the
214 /// currently executing, next-to-start, or just-finished command, as well as the final state of
215 /// the _previous_ run of the command.
216 ///
217 /// Technically, some operations can be done through a `&self` shared borrow on the running
218 /// command's [`TokioChildWrapper`], but this library recommends against taking advantage of this,
219 /// and prefer using the methods on here instead, so that the supervisor can keep track of
220 /// what's going on.
221 pub fn run(&self, fun: impl FnOnce(&JobTaskContext<'_>) + Send + Sync + 'static) -> Ticket {
222 self.control(Control::SyncFunc(Box::new(fun)))
223 }
224
225 /// Run an arbitrary function and await the returned future.
226 ///
227 /// The function is given [`&JobTaskContext`](JobTaskContext), which contains the state of the
228 /// currently executing, next-to-start, or just-finished command, as well as the final state of
229 /// the _previous_ run of the command.
230 ///
231 /// Technically, some operations can be done through a `&self` shared borrow on the running
232 /// command's [`TokioChildWrapper`], but this library recommends against taking advantage of this,
233 /// and prefer using the methods on here instead, so that the supervisor can keep track of
234 /// what's going on.
235 ///
236 /// A gotcha when using this method is that the future returned by the function can live longer
237 /// than the `&JobTaskContext` it was given, so you can't bring the context into the async block
238 /// and instead must clone or copy the parts you need beforehand, in the sync portion.
239 ///
240 /// For example, this won't compile:
241 ///
242 /// ```compile_fail
243 /// # use std::sync::Arc;
244 /// # use tokio::sync::mpsc;
245 /// # use watchexec_supervisor::command::{Command, Program};
246 /// # use watchexec_supervisor::job::{CommandState, start_job};
247 /// #
248 /// # let (job, _task) = start_job(Arc::new(Command { program: Program::Exec { prog: "/bin/date".into(), args: Vec::new() }.into(), options: Default::default() }));
249 /// let (channel, receiver) = mpsc::channel(10);
250 /// job.run_async(|context| Box::new(async move {
251 /// if let CommandState::Finished { status, .. } = context.current {
252 /// channel.send(status).await.ok();
253 /// }
254 /// }));
255 /// ```
256 ///
257 /// But this does:
258 ///
259 /// ```no_run
260 /// # use std::sync::Arc;
261 /// # use tokio::sync::mpsc;
262 /// # use watchexec_supervisor::command::{Command, Program};
263 /// # use watchexec_supervisor::job::{CommandState, start_job};
264 /// #
265 /// # let (job, _task) = start_job(Arc::new(Command { program: Program::Exec { prog: "/bin/date".into(), args: Vec::new() }.into(), options: Default::default() }));
266 /// let (channel, receiver) = mpsc::channel(10);
267 /// job.run_async(|context| {
268 /// let status = if let CommandState::Finished { status, .. } = context.current {
269 /// Some(*status)
270 /// } else {
271 /// None
272 /// };
273 ///
274 /// Box::new(async move {
275 /// if let Some(status) = status {
276 /// channel.send(status).await.ok();
277 /// }
278 /// })
279 /// });
280 /// ```
281 pub fn run_async(
282 &self,
283 fun: impl (FnOnce(&JobTaskContext<'_>) -> Box<dyn Future<Output = ()> + Send + Sync>)
284 + Send
285 + Sync
286 + 'static,
287 ) -> Ticket {
288 self.control(Control::AsyncFunc(Box::new(fun)))
289 }
290
291 /// Set the spawn hook.
292 ///
293 /// The hook will be called once per process spawned, before the process is spawned. It's given
294 /// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it
295 /// can modify or further [wrap](process_wrap) the command as it sees fit.
296 pub fn set_spawn_hook(
297 &self,
298 fun: impl Fn(&mut TokioCommandWrap, &JobTaskContext<'_>) + Send + Sync + 'static,
299 ) -> Ticket {
300 self.control(Control::SetSyncSpawnHook(Arc::new(fun)))
301 }
302
303 /// Set the spawn hook (async version).
304 ///
305 /// The hook will be called once per process spawned, before the process is spawned. It's given
306 /// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it
307 /// can modify or further [wrap](process_wrap) the command as it sees fit.
308 ///
309 /// A gotcha when using this method is that the future returned by the function can live longer
310 /// than the references it was given, so you can't bring the command or context into the async
311 /// block and instead must clone or copy the parts you need beforehand, in the sync portion. See
312 /// the documentation for [`run_async`](Job::run_async) for an example.
313 ///
314 /// Fortunately, async spawn hooks should be exceedingly rare: there's very few things to do in
315 /// spawn hooks that can't be done in the simpler sync version.
316 pub fn set_spawn_async_hook(
317 &self,
318 fun: impl (Fn(
319 &mut TokioCommandWrap,
320 &JobTaskContext<'_>,
321 ) -> Box<dyn Future<Output = ()> + Send + Sync>)
322 + Send
323 + Sync
324 + 'static,
325 ) -> Ticket {
326 self.control(Control::SetAsyncSpawnHook(Arc::new(fun)))
327 }
328
329 /// Unset any spawn hook.
330 pub fn unset_spawn_hook(&self) -> Ticket {
331 self.control(Control::UnsetSpawnHook)
332 }
333
334 /// Set the error handler.
335 pub fn set_error_handler(&self, fun: impl Fn(SyncIoError) + Send + Sync + 'static) -> Ticket {
336 self.control(Control::SetSyncErrorHandler(Arc::new(fun)))
337 }
338
339 /// Set the error handler (async version).
340 pub fn set_async_error_handler(
341 &self,
342 fun: impl (Fn(SyncIoError) -> Box<dyn Future<Output = ()> + Send + Sync>)
343 + Send
344 + Sync
345 + 'static,
346 ) -> Ticket {
347 self.control(Control::SetAsyncErrorHandler(Arc::new(fun)))
348 }
349
350 /// Unset the error handler.
351 ///
352 /// Errors will be silently ignored.
353 pub fn unset_error_handler(&self) -> Ticket {
354 self.control(Control::UnsetErrorHandler)
355 }
356}