1use std::{convert::Infallible, error::Error, fmt, future::Future, pin::Pin, sync::Arc};
3
4use crate::{readiness::ReadinessMode, Context};
5
6pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
8
9#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum ServiceOutcome {
18 Completed,
20 Cancelled,
22 RequestedShutdown,
24 Error(ServiceError),
26}
27
28impl ServiceOutcome {
29 pub const fn completed() -> Self {
31 Self::Completed
32 }
33
34 pub const fn cancelled() -> Self {
36 Self::Cancelled
37 }
38
39 pub const fn requested_shutdown() -> Self {
41 Self::RequestedShutdown
42 }
43
44 pub fn failed(error: impl Into<ServiceError>) -> Self {
46 Self::Error(error.into())
47 }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct ServiceError {
53 message: Arc<str>,
54}
55
56impl ServiceError {
57 pub fn new(message: impl Into<String>) -> Self {
60 Self {
61 message: Arc::<str>::from(message.into()),
62 }
63 }
64
65 pub fn from_error(error: impl std::error::Error) -> Self {
67 Self::new(error.to_string())
68 }
69
70 pub fn message(&self) -> &str {
71 &self.message
72 }
73}
74
75impl fmt::Display for ServiceError {
76 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
77 formatter.write_str(&self.message)
78 }
79}
80
81impl From<String> for ServiceError {
82 fn from(message: String) -> Self {
83 Self::new(message)
84 }
85}
86
87impl From<&str> for ServiceError {
88 fn from(message: &str) -> Self {
89 Self::new(message)
90 }
91}
92
93pub trait IntoServiceError {
98 fn into_service_error(self) -> ServiceError;
100}
101
102impl IntoServiceError for ServiceError {
103 fn into_service_error(self) -> ServiceError {
104 self
105 }
106}
107
108impl<E> IntoServiceError for E
109where
110 E: Error + Send + Sync + 'static,
111{
112 fn into_service_error(self) -> ServiceError {
113 ServiceError::from_error(self)
114 }
115}
116
117pub trait IntoServiceOutcome {
124 fn into_service_outcome(self) -> ServiceOutcome;
126}
127
128impl IntoServiceOutcome for ServiceOutcome {
129 fn into_service_outcome(self) -> ServiceOutcome {
130 self
131 }
132}
133
134impl IntoServiceOutcome for () {
135 fn into_service_outcome(self) -> ServiceOutcome {
136 ServiceOutcome::Completed
137 }
138}
139
140impl IntoServiceOutcome for Infallible {
141 fn into_service_outcome(self) -> ServiceOutcome {
142 match self {}
143 }
144}
145
146impl<T, E> IntoServiceOutcome for Result<T, E>
147where
148 T: IntoServiceOutcome,
149 E: IntoServiceError,
150{
151 fn into_service_outcome(self) -> ServiceOutcome {
152 match self {
153 Ok(outcome) => outcome.into_service_outcome(),
154 Err(error) => ServiceOutcome::Error(error.into_service_error()),
155 }
156 }
157}
158
159pub trait SupervisedService: Send + Sync + 'static {
164 type Context: Clone + Send + Sync + 'static;
166
167 fn name(&self) -> &'static str;
169
170 fn readiness(&self) -> ReadinessMode {
176 ReadinessMode::Immediate
177 }
178
179 fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome>;
181}
182
183pub trait ServiceExt: SupervisedService + Sized {
185 fn when_ready(self) -> WhenReady<Self> {
187 when_ready(self)
188 }
189
190 fn until_cancelled(self) -> UntilCancelled<Self> {
192 until_cancelled(self)
193 }
194}
195
196impl<S> ServiceExt for S where S: SupervisedService {}
197
198#[derive(Clone)]
203pub struct FnService<C, F> {
204 name: &'static str,
205 run: F,
206 marker: std::marker::PhantomData<fn() -> C>,
207}
208
209impl<C, F> FnService<C, F> {
210 pub fn name(&self) -> &'static str {
211 self.name
212 }
213}
214
215pub fn service_fn<C, F, Fut, O>(name: &'static str, run: F) -> FnService<C, F>
229where
230 C: Clone + Send + Sync + 'static,
231 F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
232 Fut: Future<Output = O> + Send + 'static,
233 O: IntoServiceOutcome + Send + 'static,
234{
235 FnService {
236 name,
237 run,
238 marker: std::marker::PhantomData,
239 }
240}
241
242impl<C, F, Fut, O> SupervisedService for FnService<C, F>
243where
244 C: Clone + Send + Sync + 'static,
245 F: Fn(Context<C>) -> Fut + Send + Sync + 'static,
246 Fut: Future<Output = O> + Send + 'static,
247 O: IntoServiceOutcome + Send + 'static,
248{
249 type Context = C;
250
251 fn name(&self) -> &'static str {
252 self.name
253 }
254
255 fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
256 let future = (self.run)(ctx);
257 Box::pin(async move { future.await.into_service_outcome() })
258 }
259}
260
261pub fn when_ready<S>(service: S) -> WhenReady<S>
267where
268 S: SupervisedService,
269{
270 WhenReady {
271 service: Arc::new(service),
272 }
273}
274
275#[derive(Clone)]
277pub struct WhenReady<S>
278where
279 S: SupervisedService,
280{
281 service: Arc<S>,
282}
283
284impl<S> SupervisedService for WhenReady<S>
285where
286 S: SupervisedService,
287{
288 type Context = S::Context;
289
290 fn name(&self) -> &'static str {
291 self.service.name()
292 }
293
294 fn readiness(&self) -> ReadinessMode {
295 ReadinessMode::Explicit
296 }
297
298 fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
299 self.service.run(ctx)
300 }
301}
302
303pub fn until_cancelled<S>(service: S) -> UntilCancelled<S>
308where
309 S: SupervisedService,
310{
311 UntilCancelled {
312 service: Arc::new(service),
313 }
314}
315
316#[derive(Clone)]
318pub struct UntilCancelled<S>
319where
320 S: SupervisedService,
321{
322 service: Arc<S>,
323}
324
325impl<S> SupervisedService for UntilCancelled<S>
326where
327 S: SupervisedService,
328{
329 type Context = S::Context;
330
331 fn name(&self) -> &'static str {
332 self.service.name()
333 }
334
335 fn readiness(&self) -> ReadinessMode {
336 self.service.readiness()
337 }
338
339 fn run(&self, ctx: Context<Self::Context>) -> BoxFuture<ServiceOutcome> {
340 let token = ctx.token().clone();
341 let service = Arc::clone(&self.service);
342 Box::pin(async move {
343 match token.run_until_cancelled_owned(service.run(ctx)).await {
344 Some(outcome) => outcome,
345 None => ServiceOutcome::Cancelled,
346 }
347 })
348 }
349}