fuel_core_services/
state.rs1use tokio::sync::watch;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum State {
8 NotStarted,
10 Starting,
12 Started,
14 Stopping,
16 Stopped,
18 StoppedWithError(String),
20}
21
22impl State {
23 pub fn not_started(&self) -> bool {
25 self == &State::NotStarted
26 }
27
28 pub fn starting(&self) -> bool {
30 self == &State::Starting
31 }
32
33 pub fn started(&self) -> bool {
35 self == &State::Started
36 }
37
38 pub fn stopped(&self) -> bool {
40 matches!(self, State::Stopped | State::StoppedWithError(_))
41 }
42
43 pub fn stopping(&self) -> bool {
45 self == &State::Stopping
46 }
47}
48
49#[derive(Clone)]
52pub struct StateWatcher(watch::Receiver<State>);
53
54#[cfg(feature = "test-helpers")]
55impl Default for StateWatcher {
56 fn default() -> Self {
57 let (_, receiver) = watch::channel(State::NotStarted);
58 Self(receiver)
59 }
60}
61
62#[cfg(feature = "test-helpers")]
63impl StateWatcher {
64 pub fn started() -> Self {
66 let (sender, receiver) = watch::channel(State::Started);
67 core::mem::forget(sender);
69 Self(receiver)
70 }
71
72 pub fn starting() -> Self {
74 let (sender, receiver) = watch::channel(State::Starting);
75 core::mem::forget(sender);
76 Self(receiver)
77 }
78}
79
80impl StateWatcher {
81 pub fn borrow(&self) -> watch::Ref<'_, State> {
83 self.0.borrow()
84 }
85
86 pub fn borrow_and_update(&mut self) -> watch::Ref<'_, State> {
88 self.0.borrow_and_update()
89 }
90
91 pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
93 self.0.has_changed()
94 }
95
96 pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
98 self.0.changed().await
99 }
100
101 pub fn same_channel(&self, other: &Self) -> bool {
103 self.0.same_channel(&other.0)
104 }
105}
106
107impl StateWatcher {
108 #[tracing::instrument(level = "debug", skip(self), err, ret)]
109 pub async fn while_started(&mut self) -> anyhow::Result<State> {
111 loop {
112 let state = self.borrow().clone();
113 if !state.started() {
114 return Ok(state);
115 }
116
117 self.changed().await?;
118 }
119 }
120
121 pub async fn wait_stopping_or_stopped(&mut self) -> anyhow::Result<()> {
123 let state = self.borrow().clone();
124 while !(state.stopped() || state.stopping()) {
125 self.changed().await?;
126 }
127 Ok(())
128 }
129}
130
131impl From<watch::Receiver<State>> for StateWatcher {
132 fn from(receiver: watch::Receiver<State>) -> Self {
133 Self(receiver)
134 }
135}