watchexec_supervisor/job/
job.rs

1#![allow(clippy::must_use_candidate)] // Ticket-returning methods are supposed to be used without awaiting
2
3use std::{future::Future, sync::Arc, time::Duration};
4
5use process_wrap::tokio::TokioCommandWrap;
6use watchexec_signals::Signal;
7
8use crate::{command::Command, errors::SyncIoError, flag::Flag};
9
10use super::{
11	messages::{Control, ControlMessage, Ticket},
12	priority::{Priority, PrioritySender},
13	JobTaskContext,
14};
15
16/// A handle to a job task spawned in the supervisor.
17///
18/// A job is a task which manages a [`Command`]. It is responsible for spawning the command's
19/// program, for handling messages which control it, for managing the program's lifetime, and for
20/// collecting its exit status and some timing information.
21///
22/// Most of the methods here queue [`Control`]s to the job task and return [`Ticket`]s. Controls
23/// execute in order, except where noted. Tickets are futures which resolve when the corresponding
24/// control has been run. Unlike most futures, tickets don't need to be polled for controls to make
25/// progress; the future is only used to signal completion. Dropping a ticket will not drop the
26/// control, so it's safe to do so if you don't care about when the control completes.
27///
28/// Note that controls are not guaranteed to run, like if the job task stops or panics before a
29/// control is processed. If a job task stops gracefully, all pending tickets will resolve
30/// immediately. If a job task panics (outside of hooks, panics are bugs!), pending tickets will
31/// never resolve.
32///
33/// This struct is cloneable (internally it is made of Arcs). Dropping the last instance of a Job
34/// will close the job's control queue, which will cause the job task to stop gracefully. Note that
35/// a task graceful stop is not the same as a graceful stop of the contained command; when the job
36/// drops, the command will be dropped in turn, and forcefully terminated via `kill_on_drop`.
37#[derive(Debug, Clone)]
38pub struct Job {
39	pub(crate) command: Arc<Command>,
40	pub(crate) control_queue: PrioritySender,
41
42	/// Set to true when the command task has stopped gracefully.
43	pub(crate) gone: Flag,
44}
45
46impl Job {
47	/// The [`Command`] this job is managing.
48	pub fn command(&self) -> Arc<Command> {
49		self.command.clone()
50	}
51
52	/// If this job is dead.
53	pub fn is_dead(&self) -> bool {
54		self.gone.raised()
55	}
56
57	fn prepare_control(&self, control: Control) -> (Ticket, ControlMessage) {
58		let done = Flag::default();
59		(
60			Ticket {
61				job_gone: self.gone.clone(),
62				control_done: done.clone(),
63			},
64			ControlMessage { control, done },
65		)
66	}
67
68	pub(crate) fn send_controls<const N: usize>(
69		&self,
70		controls: [Control; N],
71		priority: Priority,
72	) -> Ticket {
73		if N == 0 || self.gone.raised() {
74			Ticket::cancelled()
75		} else if N == 1 {
76			let control = controls.into_iter().next().expect("UNWRAP: N > 0");
77			let (ticket, control) = self.prepare_control(control);
78			self.control_queue.send(control, priority);
79			ticket
80		} else {
81			let mut last_ticket = None;
82			for control in controls {
83				let (ticket, control) = self.prepare_control(control);
84				last_ticket = Some(ticket);
85				self.control_queue.send(control, priority);
86			}
87			last_ticket.expect("UNWRAP: N > 0")
88		}
89	}
90
91	/// Send a control message to the command.
92	///
93	/// All control messages are queued in the order they're sent and processed in order.
94	///
95	/// In general prefer using the other methods on this struct rather than sending [`Control`]s
96	/// directly.
97	pub fn control(&self, control: Control) -> Ticket {
98		self.send_controls([control], Priority::Normal)
99	}
100
101	/// Start the command if it's not running.
102	pub fn start(&self) -> Ticket {
103		self.control(Control::Start)
104	}
105
106	/// Stop the command if it's running and wait for completion.
107	///
108	/// If you don't want to wait for completion, use `signal(Signal::ForceStop)` instead.
109	pub fn stop(&self) -> Ticket {
110		self.control(Control::Stop)
111	}
112
113	/// Gracefully stop the command if it's running.
114	///
115	/// The command will be sent `signal` and then given `grace` time before being forcefully
116	/// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
117	/// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
118	/// do anything.
119	pub fn stop_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
120		if cfg!(unix) {
121			self.control(Control::GracefulStop { signal, grace })
122		} else {
123			self.stop()
124		}
125	}
126
127	/// Restart the command if it's running, or start it if it's not.
128	pub fn restart(&self) -> Ticket {
129		self.send_controls([Control::Stop, Control::Start], Priority::Normal)
130	}
131
132	/// Gracefully restart the command if it's running, or start it if it's not.
133	///
134	/// The command will be sent `signal` and then given `grace` time before being forcefully
135	/// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
136	/// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
137	/// do anything.
138	pub fn restart_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
139		if cfg!(unix) {
140			self.send_controls(
141				[Control::GracefulStop { signal, grace }, Control::Start],
142				Priority::Normal,
143			)
144		} else {
145			self.restart()
146		}
147	}
148
149	/// Restart the command if it's running, but don't start it if it's not.
150	pub fn try_restart(&self) -> Ticket {
151		self.control(Control::TryRestart)
152	}
153
154	/// Restart the command if it's running, but don't start it if it's not.
155	///
156	/// The command will be sent `signal` and then given `grace` time before being forcefully
157	/// terminated. If `grace` is zero, that still happens, but the command is terminated forcefully
158	/// on the next "tick" of the supervisor loop, which doesn't leave the process a lot of time to
159	/// do anything.
160	pub fn try_restart_with_signal(&self, signal: Signal, grace: Duration) -> Ticket {
161		if cfg!(unix) {
162			self.control(Control::TryGracefulRestart { signal, grace })
163		} else {
164			self.try_restart()
165		}
166	}
167
168	/// Send a signal to the command.
169	///
170	/// Sends a signal to the current program, if there is one. If there isn't, this is a no-op.
171	///
172	/// On Windows, this is a no-op for all signals but [`Signal::ForceStop`], which tries to stop
173	/// the command like a `stop()` would, but doesn't wait for completion. This is because Windows
174	/// doesn't have signals; in future [`Hangup`](Signal::Hangup), [`Interrupt`](Signal::Interrupt),
175	/// and [`Terminate`](Signal::Terminate) may be implemented using [GenerateConsoleCtrlEvent],
176	/// see [tracking issue #219](https://github.com/watchexec/watchexec/issues/219).
177	///
178	/// [GenerateConsoleCtrlEvent]: https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent
179	pub fn signal(&self, sig: Signal) -> Ticket {
180		self.control(Control::Signal(sig))
181	}
182
183	/// Stop the command, then mark it for garbage collection.
184	///
185	/// The underlying control messages are sent like normal, so they wait for all pending controls
186	/// to process. If you want to delete the command immediately, use `delete_now()`.
187	pub fn delete(&self) -> Ticket {
188		self.send_controls([Control::Stop, Control::Delete], Priority::Normal)
189	}
190
191	/// Stop the command immediately, then mark it for garbage collection.
192	///
193	/// The underlying control messages are sent with higher priority than normal, so they bypass
194	/// all others. If you want to delete after all current controls are processed, use `delete()`.
195	pub fn delete_now(&self) -> Ticket {
196		self.send_controls([Control::Stop, Control::Delete], Priority::Urgent)
197	}
198
199	/// Get a future which resolves when the command ends.
200	///
201	/// If the command is not running, the future resolves immediately.
202	///
203	/// The underlying control message is sent with higher priority than normal, so it targets the
204	/// actively running command, not the one that will be running after the rest of the controls
205	/// get done; note that may still be racy if the command ends between the time the message is
206	/// sent and the time it's processed.
207	pub fn to_wait(&self) -> Ticket {
208		self.send_controls([Control::NextEnding], Priority::High)
209	}
210
211	/// Run an arbitrary function.
212	///
213	/// The function is given [`&JobTaskContext`](JobTaskContext), which contains the state of the
214	/// currently executing, next-to-start, or just-finished command, as well as the final state of
215	/// the _previous_ run of the command.
216	///
217	/// Technically, some operations can be done through a `&self` shared borrow on the running
218	/// command's [`TokioChildWrapper`], but this library recommends against taking advantage of this,
219	/// and prefer using the methods on here instead, so that the supervisor can keep track of
220	/// what's going on.
221	pub fn run(&self, fun: impl FnOnce(&JobTaskContext<'_>) + Send + Sync + 'static) -> Ticket {
222		self.control(Control::SyncFunc(Box::new(fun)))
223	}
224
225	/// Run an arbitrary function and await the returned future.
226	///
227	/// The function is given [`&JobTaskContext`](JobTaskContext), which contains the state of the
228	/// currently executing, next-to-start, or just-finished command, as well as the final state of
229	/// the _previous_ run of the command.
230	///
231	/// Technically, some operations can be done through a `&self` shared borrow on the running
232	/// command's [`TokioChildWrapper`], but this library recommends against taking advantage of this,
233	/// and prefer using the methods on here instead, so that the supervisor can keep track of
234	/// what's going on.
235	///
236	/// A gotcha when using this method is that the future returned by the function can live longer
237	/// than the `&JobTaskContext` it was given, so you can't bring the context into the async block
238	/// and instead must clone or copy the parts you need beforehand, in the sync portion.
239	///
240	/// For example, this won't compile:
241	///
242	/// ```compile_fail
243	/// # use std::sync::Arc;
244	/// # use tokio::sync::mpsc;
245	/// # use watchexec_supervisor::command::{Command, Program};
246	/// # use watchexec_supervisor::job::{CommandState, start_job};
247	/// #
248	/// # let (job, _task) = start_job(Arc::new(Command { program: Program::Exec { prog: "/bin/date".into(), args: Vec::new() }.into(), options: Default::default() }));
249	/// let (channel, receiver) = mpsc::channel(10);
250	/// job.run_async(|context| Box::new(async move {
251	///     if let CommandState::Finished { status, .. } = context.current {
252	///         channel.send(status).await.ok();
253	///     }
254	/// }));
255	/// ```
256	///
257	/// But this does:
258	///
259	/// ```no_run
260	/// # use std::sync::Arc;
261	/// # use tokio::sync::mpsc;
262	/// # use watchexec_supervisor::command::{Command, Program};
263	/// # use watchexec_supervisor::job::{CommandState, start_job};
264	/// #
265	/// # let (job, _task) = start_job(Arc::new(Command { program: Program::Exec { prog: "/bin/date".into(), args: Vec::new() }.into(), options: Default::default() }));
266	/// let (channel, receiver) = mpsc::channel(10);
267	/// job.run_async(|context| {
268	///     let status = if let CommandState::Finished { status, .. } = context.current {
269	///         Some(*status)
270	///     } else {
271	///         None
272	///     };
273	///
274	///     Box::new(async move {
275	///         if let Some(status) = status {
276	///             channel.send(status).await.ok();
277	///         }
278	///     })
279	/// });
280	/// ```
281	pub fn run_async(
282		&self,
283		fun: impl (FnOnce(&JobTaskContext<'_>) -> Box<dyn Future<Output = ()> + Send + Sync>)
284			+ Send
285			+ Sync
286			+ 'static,
287	) -> Ticket {
288		self.control(Control::AsyncFunc(Box::new(fun)))
289	}
290
291	/// Set the spawn hook.
292	///
293	/// The hook will be called once per process spawned, before the process is spawned. It's given
294	/// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it
295	/// can modify or further [wrap](process_wrap) the command as it sees fit.
296	pub fn set_spawn_hook(
297		&self,
298		fun: impl Fn(&mut TokioCommandWrap, &JobTaskContext<'_>) + Send + Sync + 'static,
299	) -> Ticket {
300		self.control(Control::SetSyncSpawnHook(Arc::new(fun)))
301	}
302
303	/// Set the spawn hook (async version).
304	///
305	/// The hook will be called once per process spawned, before the process is spawned. It's given
306	/// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it
307	/// can modify or further [wrap](process_wrap) the command as it sees fit.
308	///
309	/// A gotcha when using this method is that the future returned by the function can live longer
310	/// than the references it was given, so you can't bring the command or context into the async
311	/// block and instead must clone or copy the parts you need beforehand, in the sync portion. See
312	/// the documentation for [`run_async`](Job::run_async) for an example.
313	///
314	/// Fortunately, async spawn hooks should be exceedingly rare: there's very few things to do in
315	/// spawn hooks that can't be done in the simpler sync version.
316	pub fn set_spawn_async_hook(
317		&self,
318		fun: impl (Fn(
319				&mut TokioCommandWrap,
320				&JobTaskContext<'_>,
321			) -> Box<dyn Future<Output = ()> + Send + Sync>)
322			+ Send
323			+ Sync
324			+ 'static,
325	) -> Ticket {
326		self.control(Control::SetAsyncSpawnHook(Arc::new(fun)))
327	}
328
329	/// Unset any spawn hook.
330	pub fn unset_spawn_hook(&self) -> Ticket {
331		self.control(Control::UnsetSpawnHook)
332	}
333
334	/// Set the error handler.
335	pub fn set_error_handler(&self, fun: impl Fn(SyncIoError) + Send + Sync + 'static) -> Ticket {
336		self.control(Control::SetSyncErrorHandler(Arc::new(fun)))
337	}
338
339	/// Set the error handler (async version).
340	pub fn set_async_error_handler(
341		&self,
342		fun: impl (Fn(SyncIoError) -> Box<dyn Future<Output = ()> + Send + Sync>)
343			+ Send
344			+ Sync
345			+ 'static,
346	) -> Ticket {
347		self.control(Control::SetAsyncErrorHandler(Arc::new(fun)))
348	}
349
350	/// Unset the error handler.
351	///
352	/// Errors will be silently ignored.
353	pub fn unset_error_handler(&self) -> Ticket {
354		self.control(Control::UnsetErrorHandler)
355	}
356}