fuel_core_services/
service.rs

1use crate::{
2    Shared,
3    state::{
4        State,
5        StateWatcher,
6    },
7};
8use anyhow::anyhow;
9use fuel_core_metrics::futures::{
10    FuturesMetrics,
11    future_tracker::FutureTracker,
12};
13use futures::FutureExt;
14use std::any::Any;
15use tokio::sync::watch;
16use tracing::Instrument;
17
18/// Used if services have no asynchronously shared data
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct EmptyShared;
21
22/// Trait for service runners, providing a minimal interface for managing
23/// the lifecycle of services such as start/stop and health status.
24#[async_trait::async_trait]
25pub trait Service {
26    /// Send a start signal to the service without waiting for it to start.
27    /// Returns an error if the service was already started.
28    fn start(&self) -> anyhow::Result<()>;
29
30    /// Send a start signal to the service and wait for it to start up.
31    /// Returns an error if the service was already started.
32    async fn start_and_await(&self) -> anyhow::Result<State>;
33
34    /// Wait for service to start or stop (without sending any signal).
35    async fn await_start_or_stop(&self) -> anyhow::Result<State>;
36
37    /// Send a stop signal to the service without waiting for it to shutdown.
38    /// Returns false if the service was already stopped, true if it is running.
39    fn stop(&self) -> bool;
40
41    /// Send stop signal to service and wait for it to shutdown.
42    async fn stop_and_await(&self) -> anyhow::Result<State>;
43
44    /// Wait for service to stop (without sending a stop signal).
45    async fn await_stop(&self) -> anyhow::Result<State>;
46
47    /// The current state of the service (i.e. `Started`, `Stopped`, etc..)
48    fn state(&self) -> State;
49
50    /// Returns the state watcher of the service.
51    fn state_watcher(&self) -> StateWatcher;
52}
53
54/// Trait used by `ServiceRunner` to encapsulate the business logic tasks for a service.
55#[async_trait::async_trait]
56pub trait RunnableService: Send {
57    /// The name of the runnable service, used for namespacing error messages.
58    const NAME: &'static str;
59
60    /// Service specific shared data. This is used when you have data that needs to be shared by
61    /// one or more tasks. It is the implementors responsibility to ensure cloning this
62    /// type is shallow and doesn't provide a full duplication of data that is meant
63    /// to be shared between asynchronous processes.
64    type SharedData: Clone + Send + Sync;
65
66    /// The initialized runnable task type.
67    type Task: RunnableTask;
68
69    /// Optional parameters used to when initializing into task.
70    type TaskParams: Send;
71
72    /// A cloned instance of the shared data
73    fn shared_data(&self) -> Self::SharedData;
74
75    /// Converts the service into a runnable task before the main run loop.
76    ///
77    /// The `state` is a `State` watcher of the service. Some tasks may handle state changes
78    /// on their own.
79    async fn into_task(
80        self,
81        state_watcher: &StateWatcher,
82        params: Self::TaskParams,
83    ) -> anyhow::Result<Self::Task>;
84}
85
86/// The result of a single iteration of the service task
87#[derive(Debug)]
88#[must_use]
89pub enum TaskNextAction {
90    /// Request the task to be run again
91    Continue,
92    /// Request the task to be abandoned
93    Stop,
94    /// Request the task to be run again, but report an error
95    ErrorContinue(anyhow::Error),
96}
97
98impl TaskNextAction {
99    /// Creates a `TaskRunResult` from a `Result` where `Ok` means `Continue` and any error is reported
100    pub fn always_continue<T, E: Into<anyhow::Error>>(
101        res: Result<T, E>,
102    ) -> TaskNextAction {
103        match res {
104            Ok(_) => TaskNextAction::Continue,
105            Err(e) => TaskNextAction::ErrorContinue(e.into()),
106        }
107    }
108}
109
110impl From<Result<bool, anyhow::Error>> for TaskNextAction {
111    fn from(result: Result<bool, anyhow::Error>) -> Self {
112        match result {
113            Ok(should_continue) => {
114                if should_continue {
115                    TaskNextAction::Continue
116                } else {
117                    TaskNextAction::Stop
118                }
119            }
120            Err(e) => TaskNextAction::ErrorContinue(e),
121        }
122    }
123}
124
125/// A replacement for the `?` operator for tasks. It will return a `TaskNextAction::ErrorContinue` if the
126/// expression returns an error.
127#[macro_export]
128macro_rules! try_or_continue {
129    ($expr:expr_2021, $custom:expr_2021) => {{
130        match $expr {
131            Ok(val) => val,
132            Err(err) => {
133                $custom(&err);
134                return TaskNextAction::ErrorContinue(err.into());
135            }
136        }
137    }};
138    ($expr:expr_2021) => {{
139        match $expr {
140            Ok(val) => val,
141            Err(err) => return TaskNextAction::ErrorContinue(err.into()),
142        }
143    }};
144}
145
146/// A replacement for the `?` operator for tasks. It will return a `TaskNextAction::Stop` if the
147/// expression returns an error.
148#[macro_export]
149macro_rules! try_or_stop {
150    ($expr:expr_2021, $custom:expr_2021) => {{
151        match $expr {
152            Ok(val) => val,
153            Err(err) => {
154                $custom(&err);
155                return TaskNextAction::Stop;
156            }
157        }
158    }};
159    ($expr:expr_2021) => {{
160        match $expr {
161            Ok(val) => val,
162            Err(err) => return TaskNextAction::Stop,
163        }
164    }};
165}
166
167/// The trait is implemented by the service task and contains a single iteration of the infinity
168/// loop.
169pub trait RunnableTask: Send {
170    /// This function should contain the main business logic of the service task. It will run until
171    /// the service either returns false, panics or a stop signal is received.
172    /// If the service returns an error, it will be logged and execution will resume.
173    /// This is intended to be called only by the `ServiceRunner`.
174    ///
175    /// The `ServiceRunner` continue to call the `run` method in the loop while the state is
176    /// `State::Started`. So first, the `run` method should return a value, and after, the service
177    /// will stop. If the service should react to the state change earlier, it should handle it in
178    /// the `run` loop on its own. See [`StateWatcher::while_started`].
179    fn run(
180        &mut self,
181        watcher: &mut StateWatcher,
182    ) -> impl core::future::Future<Output = TaskNextAction> + Send;
183
184    /// Gracefully shutdowns the task after the end of the execution cycle.
185    fn shutdown(self) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
186}
187
188/// The service runner manages the lifecycle, execution and error handling of a `RunnableService`.
189/// It can be cloned and passed between threads.
190#[derive(Debug)]
191pub struct ServiceRunner<S>
192where
193    S: RunnableService + 'static,
194{
195    /// The shared state of the service
196    pub shared: S::SharedData,
197    state: Shared<watch::Sender<State>>,
198}
199
200impl<S> Drop for ServiceRunner<S>
201where
202    S: RunnableService + 'static,
203{
204    fn drop(&mut self) {
205        self.stop();
206    }
207}
208
209impl<S> ServiceRunner<S>
210where
211    S: RunnableService + 'static,
212    S::TaskParams: Default,
213{
214    /// Initializes a new `ServiceRunner` containing a `RunnableService`
215    pub fn new(service: S) -> Self {
216        Self::new_with_params(service, S::TaskParams::default())
217    }
218}
219
220impl<S> ServiceRunner<S>
221where
222    S: RunnableService + 'static,
223{
224    /// Initializes a new `ServiceRunner` containing a `RunnableService` with parameters for underlying `Task`
225    pub fn new_with_params(service: S, params: S::TaskParams) -> Self {
226        let shared = service.shared_data();
227        let metric = FuturesMetrics::obtain_futures_metrics(S::NAME);
228        let state = initialize_loop(service, params, metric);
229        Self { shared, state }
230    }
231
232    async fn _await_start_or_stop(
233        &self,
234        mut start: StateWatcher,
235    ) -> anyhow::Result<State> {
236        loop {
237            let state = start.borrow().clone();
238            if !state.starting() {
239                return Ok(state);
240            }
241            start.changed().await?;
242        }
243    }
244
245    async fn _await_stop(&self, mut stop: StateWatcher) -> anyhow::Result<State> {
246        loop {
247            let state = stop.borrow().clone();
248            if state.stopped() {
249                return Ok(state);
250            }
251            stop.changed().await?;
252        }
253    }
254}
255
256#[async_trait::async_trait]
257impl<S> Service for ServiceRunner<S>
258where
259    S: RunnableService + 'static,
260{
261    fn start(&self) -> anyhow::Result<()> {
262        let started = self.state.send_if_modified(|state| {
263            if state.not_started() {
264                *state = State::Starting;
265                true
266            } else {
267                false
268            }
269        });
270
271        if started {
272            Ok(())
273        } else {
274            Err(anyhow!(
275                "The service `{}` already has been started.",
276                S::NAME
277            ))
278        }
279    }
280
281    async fn start_and_await(&self) -> anyhow::Result<State> {
282        let start = self.state.subscribe().into();
283        self.start()?;
284        self._await_start_or_stop(start).await
285    }
286
287    async fn await_start_or_stop(&self) -> anyhow::Result<State> {
288        let start = self.state.subscribe().into();
289        self._await_start_or_stop(start).await
290    }
291
292    fn stop(&self) -> bool {
293        self.state.send_if_modified(|state| {
294            if state.not_started() || state.starting() || state.started() {
295                *state = State::Stopping;
296                true
297            } else {
298                false
299            }
300        })
301    }
302
303    async fn stop_and_await(&self) -> anyhow::Result<State> {
304        let stop = self.state.subscribe().into();
305        self.stop();
306        self._await_stop(stop).await
307    }
308
309    async fn await_stop(&self) -> anyhow::Result<State> {
310        let stop = self.state.subscribe().into();
311        self._await_stop(stop).await
312    }
313
314    fn state(&self) -> State {
315        self.state.borrow().clone()
316    }
317
318    fn state_watcher(&self) -> StateWatcher {
319        self.state.subscribe().into()
320    }
321}
322
323#[tracing::instrument(skip_all, fields(service = S::NAME))]
324/// Initialize the background loop as a spawned task.
325fn initialize_loop<S>(
326    service: S,
327    params: S::TaskParams,
328    metric: FuturesMetrics,
329) -> Shared<watch::Sender<State>>
330where
331    S: RunnableService + 'static,
332{
333    let (sender, _) = watch::channel(State::NotStarted);
334    let state = Shared::new(sender);
335    let stop_sender = state.clone();
336    // Spawned as a task to check if the service is already running and to capture any panics.
337    tokio::task::spawn(
338        async move {
339            tracing::debug!("running");
340            let run = std::panic::AssertUnwindSafe(run(
341                service,
342                stop_sender.clone(),
343                params,
344                metric,
345            ));
346            tracing::debug!("awaiting run");
347            let result = run.catch_unwind().await;
348
349            let stopped_state = match result {
350                Err(e) => {
351                    let panic_information = panic_to_string(e);
352                    State::StoppedWithError(panic_information)
353                }
354                _ => State::Stopped,
355            };
356
357            tracing::debug!("shutting down {:?}", stopped_state);
358
359            let _ = stop_sender.send_if_modified(|state| {
360                if !state.stopped() {
361                    *state = stopped_state.clone();
362                    tracing::debug!("Wasn't stopped, so sent stop.");
363                    true
364                } else {
365                    tracing::debug!("Was already stopped.");
366                    false
367                }
368            });
369
370            tracing::info!("The service {} is shut down", S::NAME);
371
372            if let State::StoppedWithError(err) = stopped_state {
373                std::panic::resume_unwind(Box::new(err));
374            }
375        }
376        .in_current_span(),
377    );
378    state
379}
380
381/// Runs the main loop.
382async fn run<S>(
383    service: S,
384    sender: Shared<watch::Sender<State>>,
385    params: S::TaskParams,
386    metric: FuturesMetrics,
387) where
388    S: RunnableService + 'static,
389{
390    let mut state: StateWatcher = sender.subscribe().into();
391    if state.borrow_and_update().not_started() {
392        // We can panic here, because it is inside of the task.
393        state.changed().await.expect("The service is destroyed");
394    }
395
396    // If the state after update is not `Starting` then return to stop the service.
397    if !state.borrow().starting() {
398        return;
399    }
400
401    // We can panic here, because it is inside of the task.
402    tracing::info!("Starting {} service", S::NAME);
403    let mut task = service
404        .into_task(&state, params)
405        .await
406        .unwrap_or_else(|e| panic!("The initialization of {} failed: {}", S::NAME, e));
407
408    sender.send_if_modified(|s| {
409        if s.starting() {
410            *s = State::Started;
411            true
412        } else {
413            false
414        }
415    });
416
417    let got_panic = run_task(&mut task, state, &metric).await;
418
419    let got_panic = shutdown_task(S::NAME, task, got_panic).await;
420
421    if let Some(panic) = got_panic {
422        std::panic::resume_unwind(panic)
423    }
424}
425
426async fn run_task<S: RunnableTask>(
427    task: &mut S,
428    mut state: StateWatcher,
429    metric: &FuturesMetrics,
430) -> Option<Box<dyn Any + Send>> {
431    let mut got_panic = None;
432
433    while state.borrow_and_update().started() {
434        let tracked_task = FutureTracker::new(task.run(&mut state));
435        let task = std::panic::AssertUnwindSafe(tracked_task);
436        let panic_result = task.catch_unwind().await;
437
438        if let Err(panic) = panic_result {
439            tracing::debug!("got a panic");
440            got_panic = Some(panic);
441            break;
442        }
443
444        let tracked_result = panic_result.expect("Checked the panic above");
445        let result = tracked_result.extract(metric);
446
447        match result {
448            TaskNextAction::Continue => {
449                tracing::debug!("run loop");
450            }
451            TaskNextAction::Stop => {
452                tracing::debug!("stopping");
453                break;
454            }
455            TaskNextAction::ErrorContinue(e) => {
456                let e: &dyn std::error::Error = &*e;
457                tracing::error!(e);
458            }
459        }
460    }
461    got_panic
462}
463
464async fn shutdown_task<S>(
465    name: &str,
466    task: S,
467    mut got_panic: Option<Box<dyn Any + Send>>,
468) -> Option<Box<dyn Any + Send>>
469where
470    S: RunnableTask,
471{
472    tracing::info!("Shutting down {} service", name);
473    let shutdown = std::panic::AssertUnwindSafe(task.shutdown());
474    match shutdown.catch_unwind().await {
475        Ok(Ok(_)) => {}
476        Ok(Err(e)) => {
477            tracing::error!("Got an error during shutdown of the task: {e}");
478        }
479        Err(e) => {
480            if got_panic.is_some() {
481                let panic_information = panic_to_string(e);
482                tracing::error!(
483                    "Go a panic during execution and shutdown of the task. \
484                    The error during shutdown: {panic_information}"
485                );
486            } else {
487                got_panic = Some(e);
488            }
489        }
490    }
491    got_panic
492}
493
494fn panic_to_string(e: Box<dyn core::any::Any + Send>) -> String {
495    match e.downcast::<String>() {
496        Ok(v) => *v,
497        Err(e) => match e.downcast::<&str>() {
498            Ok(v) => v.to_string(),
499            _ => "Unknown Source of Error".to_owned(),
500        },
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    mockall::mock! {
509        Service {}
510
511        #[async_trait::async_trait]
512        impl RunnableService for Service {
513            const NAME: &'static str = "MockService";
514
515            type SharedData = EmptyShared;
516            type Task = MockTask;
517            type TaskParams = ();
518
519            fn shared_data(&self) -> EmptyShared;
520
521            async fn into_task(self, state: &StateWatcher, params: <MockService as RunnableService>::TaskParams) -> anyhow::Result<MockTask>;
522        }
523    }
524
525    mockall::mock! {
526        Task {}
527
528        impl RunnableTask for Task {
529            fn run(
530                &mut self,
531                state: &mut StateWatcher
532            ) -> impl core::future::Future<Output = TaskNextAction> + Send;
533
534            async fn shutdown(self) -> anyhow::Result<()>;
535        }
536    }
537
538    impl MockService {
539        fn new_empty() -> Self {
540            let mut mock = MockService::default();
541            mock.expect_shared_data().returning(|| EmptyShared);
542            mock.expect_into_task().returning(|_, _| {
543                let mut mock = MockTask::default();
544                mock.expect_run().returning(|watcher| {
545                    let mut watcher = watcher.clone();
546                    Box::pin(async move {
547                        watcher.while_started().await.unwrap();
548                        TaskNextAction::Stop
549                    })
550                });
551                mock.expect_shutdown().times(1).returning(|| Ok(()));
552                Ok(mock)
553            });
554            mock
555        }
556    }
557
558    #[tokio::test]
559    async fn start_and_await_stop_and_await_works() {
560        let service = ServiceRunner::new(MockService::new_empty());
561        let state = service.start_and_await().await.unwrap();
562        assert!(state.started());
563        let state = service.stop_and_await().await.unwrap();
564        assert!(matches!(state, State::Stopped));
565    }
566
567    #[tokio::test]
568    async fn double_start_fails() {
569        let service = ServiceRunner::new(MockService::new_empty());
570        assert!(service.start().is_ok());
571        assert!(service.start().is_err());
572    }
573
574    #[tokio::test]
575    async fn double_start_and_await_fails() {
576        let service = ServiceRunner::new(MockService::new_empty());
577        assert!(service.start_and_await().await.is_ok());
578        assert!(service.start_and_await().await.is_err());
579    }
580
581    #[tokio::test]
582    async fn stop_without_start() {
583        let service = ServiceRunner::new(MockService::new_empty());
584        service.stop_and_await().await.unwrap();
585        assert!(matches!(service.state(), State::Stopped));
586    }
587
588    #[tokio::test]
589    async fn panic_during_run() {
590        let mut mock = MockService::default();
591        mock.expect_shared_data().returning(|| EmptyShared);
592        mock.expect_into_task().returning(|_, _| {
593            let mut mock = MockTask::default();
594            mock.expect_run().returning(|_| panic!("Should fail"));
595            mock.expect_shutdown().times(1).returning(|| Ok(()));
596            Ok(mock)
597        });
598        let service = ServiceRunner::new(mock);
599        let state = service.start_and_await().await.unwrap();
600        assert!(matches!(state, State::StoppedWithError(s) if s.contains("Should fail")));
601
602        let state = service.await_stop().await.unwrap();
603        assert!(matches!(state, State::StoppedWithError(s) if s.contains("Should fail")));
604    }
605
606    #[tokio::test]
607    async fn panic_during_shutdown() {
608        let mut mock = MockService::default();
609        mock.expect_shared_data().returning(|| EmptyShared);
610        mock.expect_into_task().returning(|_, _| {
611            let mut mock = MockTask::default();
612            mock.expect_run()
613                .returning(|_| Box::pin(async move { TaskNextAction::Stop }));
614            mock.expect_shutdown()
615                .times(1)
616                .returning(|| panic!("Shutdown should fail"));
617            Ok(mock)
618        });
619        let service = ServiceRunner::new(mock);
620        let state = service.start_and_await().await.unwrap();
621        assert!(
622            matches!(state, State::StoppedWithError(s) if s.contains("Shutdown should fail"))
623        );
624
625        let state = service.await_stop().await.unwrap();
626        assert!(
627            matches!(state, State::StoppedWithError(s) if s.contains("Shutdown should fail"))
628        );
629    }
630
631    #[tokio::test]
632    async fn double_await_stop_works() {
633        let service = ServiceRunner::new(MockService::new_empty());
634        service.start().unwrap();
635        service.stop();
636
637        let state = service.await_stop().await.unwrap();
638        assert!(matches!(state, State::Stopped));
639        let state = service.await_stop().await.unwrap();
640        assert!(matches!(state, State::Stopped));
641    }
642
643    #[tokio::test]
644    async fn double_stop_and_await_works() {
645        let service = ServiceRunner::new(MockService::new_empty());
646        service.start().unwrap();
647
648        let state = service.stop_and_await().await.unwrap();
649        assert!(matches!(state, State::Stopped));
650        let state = service.stop_and_await().await.unwrap();
651        assert!(matches!(state, State::Stopped));
652    }
653
654    #[tokio::test]
655    async fn stop_unused_service() {
656        let mut receiver;
657        {
658            let service = ServiceRunner::new(MockService::new_empty());
659            service.start().unwrap();
660            receiver = service.state.subscribe();
661        }
662
663        receiver.changed().await.unwrap();
664        assert!(matches!(receiver.borrow().clone(), State::Stopping));
665        receiver.changed().await.unwrap();
666        assert!(matches!(receiver.borrow().clone(), State::Stopped));
667    }
668}