fuel_core_services/
state.rs

1//! The module related to state of the service.
2
3use tokio::sync::watch;
4
5/// The lifecycle state of the service
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum State {
8    /// Service is initialized but not started
9    NotStarted,
10    /// Service is starting up
11    Starting,
12    /// Service is running as normal
13    Started,
14    /// Service is shutting down
15    Stopping,
16    /// Service is stopped
17    Stopped,
18    /// Service shutdown due to an error (panic)
19    StoppedWithError(String),
20}
21
22impl State {
23    /// is not started
24    pub fn not_started(&self) -> bool {
25        self == &State::NotStarted
26    }
27
28    /// is starting
29    pub fn starting(&self) -> bool {
30        self == &State::Starting
31    }
32
33    /// is started
34    pub fn started(&self) -> bool {
35        self == &State::Started
36    }
37
38    /// is stopped
39    pub fn stopped(&self) -> bool {
40        matches!(self, State::Stopped | State::StoppedWithError(_))
41    }
42
43    /// is stopping
44    pub fn stopping(&self) -> bool {
45        self == &State::Stopping
46    }
47}
48
49/// The wrapper around the `watch::Receiver<State>`. It repeats the `Receiver` functionality +
50/// a new one.
51#[derive(Clone)]
52pub struct StateWatcher(watch::Receiver<State>);
53
54#[cfg(feature = "test-helpers")]
55impl Default for StateWatcher {
56    fn default() -> Self {
57        let (_, receiver) = watch::channel(State::NotStarted);
58        Self(receiver)
59    }
60}
61
62#[cfg(feature = "test-helpers")]
63impl StateWatcher {
64    /// Create a new `StateWatcher` with the `State::Started` state.
65    pub fn started() -> Self {
66        let (sender, receiver) = watch::channel(State::Started);
67        // This function is used only in tests, so for simplicity of the tests, we want to leak sender.
68        core::mem::forget(sender);
69        Self(receiver)
70    }
71
72    /// Create a new `StateWatcher` with the `State::Starting` state.
73    pub fn starting() -> Self {
74        let (sender, receiver) = watch::channel(State::Starting);
75        core::mem::forget(sender);
76        Self(receiver)
77    }
78}
79
80impl StateWatcher {
81    /// See [`watch::Receiver::borrow`].
82    pub fn borrow(&self) -> watch::Ref<'_, State> {
83        self.0.borrow()
84    }
85
86    /// See [`watch::Receiver::borrow_and_update`].
87    pub fn borrow_and_update(&mut self) -> watch::Ref<'_, State> {
88        self.0.borrow_and_update()
89    }
90
91    /// See [`watch::Receiver::has_changed`].
92    pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
93        self.0.has_changed()
94    }
95
96    /// See [`watch::Receiver::changed`].
97    pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
98        self.0.changed().await
99    }
100
101    /// See [`watch::Receiver::same_channel`].
102    pub fn same_channel(&self, other: &Self) -> bool {
103        self.0.same_channel(&other.0)
104    }
105}
106
107impl StateWatcher {
108    #[tracing::instrument(level = "debug", skip(self), err, ret)]
109    /// Infinity loop while the state is `State::Started`. Returns the next received state.
110    pub async fn while_started(&mut self) -> anyhow::Result<State> {
111        loop {
112            let state = self.borrow().clone();
113            if !state.started() {
114                return Ok(state);
115            }
116
117            self.changed().await?;
118        }
119    }
120
121    /// Future that resolves once the state is `State::Stopped`.
122    pub async fn wait_stopping_or_stopped(&mut self) -> anyhow::Result<()> {
123        let state = self.borrow().clone();
124        while !(state.stopped() || state.stopping()) {
125            self.changed().await?;
126        }
127        Ok(())
128    }
129}
130
131impl From<watch::Receiver<State>> for StateWatcher {
132    fn from(receiver: watch::Receiver<State>) -> Self {
133        Self(receiver)
134    }
135}