Skip to main content

supervised/
service.rs

1//! Supervised service vocabulary and adapters.
2use std::{convert::Infallible, error::Error, fmt, future::Future, pin::Pin, sync::Arc};
3
4use crate::{readiness::ReadinessMode, Context};
5
6/// Heap-allocated future returned by [`SupervisedService::run`].
7pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
8
9/// Terminal outcome reported by a supervised task.
10///
11/// This enum describes what happened inside the service.
12/// The supervisor still decides what the system should do next through
13/// [`ServicePolicy`](crate::ServicePolicy),
14/// [`RestartPolicy`](crate::RestartPolicy),
15/// and [`ExitAction`](crate::ExitAction).
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum ServiceOutcome {
18    /// The service finished normally.
19    Completed,
20    /// The service observed supervisor shutdown and exited cooperatively.
21    Cancelled,
22    /// The service is asking the supervisor to begin global shutdown.
23    RequestedShutdown,
24    /// The service ended with a typed failure payload.
25    Error(ServiceError),
26}
27
28impl ServiceOutcome {
29    /// Convenience constructor for [`ServiceOutcome::Completed`].
30    pub const fn completed() -> Self {
31        Self::Completed
32    }
33
34    /// Convenience constructor for [`ServiceOutcome::Cancelled`].
35    pub const fn cancelled() -> Self {
36        Self::Cancelled
37    }
38
39    /// Convenience constructor for [`ServiceOutcome::RequestedShutdown`].
40    pub const fn requested_shutdown() -> Self {
41        Self::RequestedShutdown
42    }
43
44    /// Convenience constructor for [`ServiceOutcome::Error`].
45    pub fn failed(error: impl Into<ServiceError>) -> Self {
46        Self::Error(error.into())
47    }
48}
49
50/// Stable, displayable failure payload carried by [`ServiceOutcome::Error`].
51#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct ServiceError {
53    message: Arc<str>,
54}
55
56impl ServiceError {
57    /// Builds a new service error from a message that should remain meaningful
58    /// at supervisor boundaries and in summaries.
59    pub fn new(message: impl Into<String>) -> Self {
60        Self {
61            message: Arc::<str>::from(message.into()),
62        }
63    }
64
65    /// Converts an external error into a stable [`ServiceError`] message.
66    pub fn from_error(error: impl std::error::Error) -> Self {
67        Self::new(error.to_string())
68    }
69
70    pub fn message(&self) -> &str {
71        &self.message
72    }
73}
74
75impl fmt::Display for ServiceError {
76    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
77        formatter.write_str(&self.message)
78    }
79}
80
81impl From<String> for ServiceError {
82    fn from(message: String) -> Self {
83        Self::new(message)
84    }
85}
86
87impl From<&str> for ServiceError {
88    fn from(message: &str) -> Self {
89        Self::new(message)
90    }
91}
92
93/// Converts fallible service errors into [`ServiceError`].
94///
95/// This trait keeps [`service_fn`] ergonomic for module-local error enums
96/// while preserving [`ServiceOutcome`] as the supervisor runtime boundary.
97pub trait IntoServiceError {
98    /// Converts `self` into the stable service error payload used in summaries.
99    fn into_service_error(self) -> ServiceError;
100}
101
102impl IntoServiceError for ServiceError {
103    fn into_service_error(self) -> ServiceError {
104        self
105    }
106}
107
108impl<E> IntoServiceError for E
109where
110    E: Error + Send + Sync + 'static,
111{
112    fn into_service_error(self) -> ServiceError {
113        ServiceError::from_error(self)
114    }
115}
116
117/// Converts function-backed service return values into [`ServiceOutcome`].
118///
119/// Concrete [`SupervisedService`] implementations still return
120/// [`ServiceOutcome`] directly. This trait is intentionally used by
121/// [`service_fn`] so one-off async functions can use natural signatures like
122/// `Result<(), Error>` without widening the core service trait.
123pub trait IntoServiceOutcome {
124    /// Converts `self` into the terminal outcome observed by the supervisor.
125    fn into_service_outcome(self) -> ServiceOutcome;
126}
127
128impl IntoServiceOutcome for ServiceOutcome {
129    fn into_service_outcome(self) -> ServiceOutcome {
130        self
131    }
132}
133
134impl IntoServiceOutcome for () {
135    fn into_service_outcome(self) -> ServiceOutcome {
136        ServiceOutcome::Completed
137    }
138}
139
140impl IntoServiceOutcome for Infallible {
141    fn into_service_outcome(self) -> ServiceOutcome {
142        match self {}
143    }
144}
145
146impl<T, E> IntoServiceOutcome for Result<T, E>
147where
148    T: IntoServiceOutcome,
149    E: IntoServiceError,
150{
151    fn into_service_outcome(self) -> ServiceOutcome {
152        match self {
153            Ok(outcome) => outcome.into_service_outcome(),
154            Err(error) => ServiceOutcome::Error(error.into_service_error()),
155        }
156    }
157}
158
159/// Long-lived async subsystem owned by the supervisor.
160///
161/// A [`SupervisedService`] gets a typed [`Context`] and returns a
162/// [`ServiceOutcome`]. It does not decide restart or shutdown policy itself.
163pub trait SupervisedService: Send + Sync + 'static {
164    /// Typed payload injected alongside the supervisor cancellation token.
165    type Context: Clone + Send + Sync + 'static;
166
167    /// Stable identifier used in summaries and shutdown causes.
168    fn name(&self) -> &'static str;
169
170    /// Startup readiness behavior for this service.
171    ///
172    /// Most services are ready immediately. Use [`when_ready`] to opt a
173    /// service into explicit startup readiness without widening the builder
174    /// method surface.
175    fn readiness(&self) -> ReadinessMode {
176        ReadinessMode::Immediate
177    }
178
179    /// Runs the service until it reaches a terminal [`ServiceOutcome`].
180    fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome>;
181}
182
183/// Fluent decorators available on every [`SupervisedService`].
184pub trait ServiceExt: SupervisedService + Sized {
185    /// Wraps this service with [`when_ready`].
186    fn when_ready(self) -> WhenReady<Self> {
187        when_ready(self)
188    }
189
190    /// Wraps this service with [`until_cancelled`].
191    fn until_cancelled(self) -> UntilCancelled<Self> {
192        until_cancelled(self)
193    }
194}
195
196impl<S> ServiceExt for S where S: SupervisedService {}
197
198/// Function-backed [`SupervisedService`] returned by [`service_fn`].
199///
200/// `C` is inferred from the closure argument type, so call sites can describe
201/// their required context without passing a separate context value.
202#[derive(Clone)]
203pub struct FnService<C, F> {
204    name: &'static str,
205    run: F,
206    marker: std::marker::PhantomData<fn() -> C>,
207}
208
209impl<C, F> FnService<C, F> {
210    pub fn name(&self) -> &'static str {
211        self.name
212    }
213}
214
215/// Wraps an async function as a [`SupervisedService`].
216///
217/// This keeps one-off services ergonomic without widening the core trait
218/// surface or forcing callers into a monolithic service object pattern.
219///
220/// The resulting service can be registered with
221/// [`SupervisorBuilder::add`](crate::SupervisorBuilder::add) or
222/// [`SupervisorBuilder::add_with_options`](crate::SupervisorBuilder::add_with_options).
223///
224/// The closure's [`Context`] parameter determines the extracted service
225/// context. With a stateful builder, that usually means:
226/// - `Context<AppState>` when the service wants the full root state
227/// - `Context<Subset>` when `Subset: FromSupervisorState<AppState>`
228pub fn service_fn<C, F, Fut, O>(name: &'static str, run: F) -> FnService<C, F>
229where
230    C: Clone + Send + Sync + 'static,
231    F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
232    Fut: Future<Output = O> + Send + 'static,
233    O: IntoServiceOutcome + Send + 'static,
234{
235    FnService {
236        name,
237        run,
238        marker: std::marker::PhantomData,
239    }
240}
241
242impl<C, F, Fut, O> SupervisedService for FnService<C, F>
243where
244    C: Clone + Send + Sync + 'static,
245    F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
246    Fut: Future<Output = O> + Send + 'static,
247    O: IntoServiceOutcome + Send + 'static,
248{
249    type Context = C;
250
251    fn name(&self) -> &'static str {
252        self.name
253    }
254
255    fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
256        let future = (self.run)(ctx);
257        Box::pin(async move { future.await.into_service_outcome() })
258    }
259}
260
261/// Wraps a service so supervisor startup readiness waits for it.
262///
263/// A startup-gated service becomes ready when it calls
264/// [`ReadySignal::mark_ready`](crate::ReadySignal::mark_ready) or returns
265/// [`ServiceOutcome::Completed`].
266pub fn when_ready<S>(service: S) -> WhenReady<S>
267where
268    S: SupervisedService,
269{
270    WhenReady {
271        service: Arc::new(service),
272    }
273}
274
275/// [`SupervisedService`] adapter returned by [`when_ready`].
276#[derive(Clone)]
277pub struct WhenReady<S>
278where
279    S: SupervisedService,
280{
281    service: Arc<S>,
282}
283
284impl<S> SupervisedService for WhenReady<S>
285where
286    S: SupervisedService,
287{
288    type Context = S::Context;
289
290    fn name(&self) -> &'static str {
291        self.service.name()
292    }
293
294    fn readiness(&self) -> ReadinessMode {
295        ReadinessMode::Explicit
296    }
297
298    fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
299        self.service.run(ctx)
300    }
301}
302
303/// Wraps a service so the supervisor cancellation token wins the outer race.
304///
305/// This is the convenient default for long-lived async loops that should stop
306/// as soon as the supervisor begins shutdown.
307pub fn until_cancelled<S>(service: S) -> UntilCancelled<S>
308where
309    S: SupervisedService,
310{
311    UntilCancelled {
312        service: Arc::new(service),
313    }
314}
315
316/// [`SupervisedService`] adapter returned by [`until_cancelled`].
317#[derive(Clone)]
318pub struct UntilCancelled<S>
319where
320    S: SupervisedService,
321{
322    service: Arc<S>,
323}
324
325impl<S> SupervisedService for UntilCancelled<S>
326where
327    S: SupervisedService,
328{
329    type Context = S::Context;
330
331    fn name(&self) -> &'static str {
332        self.service.name()
333    }
334
335    fn readiness(&self) -> ReadinessMode {
336        self.service.readiness()
337    }
338
339    fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
340        let token = ctx.token().clone();
341        let service = Arc::clone(&self.service);
342        Box::pin(async move {
343            match token.run_until_cancelled_owned(service.run(ctx)).await {
344                Some(outcome) => outcome,
345                None => ServiceOutcome::Cancelled,
346            }
347        })
348    }
349}