1use crate::{
2 Shared,
3 state::{
4 State,
5 StateWatcher,
6 },
7};
8use anyhow::anyhow;
9use fuel_core_metrics::futures::{
10 FuturesMetrics,
11 future_tracker::FutureTracker,
12};
13use futures::FutureExt;
14use std::any::Any;
15use tokio::sync::watch;
16use tracing::Instrument;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct EmptyShared;
21
22#[async_trait::async_trait]
25pub trait Service {
26 fn start(&self) -> anyhow::Result<()>;
29
30 async fn start_and_await(&self) -> anyhow::Result<State>;
33
34 async fn await_start_or_stop(&self) -> anyhow::Result<State>;
36
37 fn stop(&self) -> bool;
40
41 async fn stop_and_await(&self) -> anyhow::Result<State>;
43
44 async fn await_stop(&self) -> anyhow::Result<State>;
46
47 fn state(&self) -> State;
49
50 fn state_watcher(&self) -> StateWatcher;
52}
53
54#[async_trait::async_trait]
56pub trait RunnableService: Send {
57 const NAME: &'static str;
59
60 type SharedData: Clone + Send + Sync;
65
66 type Task: RunnableTask;
68
69 type TaskParams: Send;
71
72 fn shared_data(&self) -> Self::SharedData;
74
75 async fn into_task(
80 self,
81 state_watcher: &StateWatcher,
82 params: Self::TaskParams,
83 ) -> anyhow::Result<Self::Task>;
84}
85
86#[derive(Debug)]
88#[must_use]
89pub enum TaskNextAction {
90 Continue,
92 Stop,
94 ErrorContinue(anyhow::Error),
96}
97
98impl TaskNextAction {
99 pub fn always_continue<T, E: Into<anyhow::Error>>(
101 res: Result<T, E>,
102 ) -> TaskNextAction {
103 match res {
104 Ok(_) => TaskNextAction::Continue,
105 Err(e) => TaskNextAction::ErrorContinue(e.into()),
106 }
107 }
108}
109
110impl From<Result<bool, anyhow::Error>> for TaskNextAction {
111 fn from(result: Result<bool, anyhow::Error>) -> Self {
112 match result {
113 Ok(should_continue) => {
114 if should_continue {
115 TaskNextAction::Continue
116 } else {
117 TaskNextAction::Stop
118 }
119 }
120 Err(e) => TaskNextAction::ErrorContinue(e),
121 }
122 }
123}
124
125#[macro_export]
128macro_rules! try_or_continue {
129 ($expr:expr_2021, $custom:expr_2021) => {{
130 match $expr {
131 Ok(val) => val,
132 Err(err) => {
133 $custom(&err);
134 return TaskNextAction::ErrorContinue(err.into());
135 }
136 }
137 }};
138 ($expr:expr_2021) => {{
139 match $expr {
140 Ok(val) => val,
141 Err(err) => return TaskNextAction::ErrorContinue(err.into()),
142 }
143 }};
144}
145
146#[macro_export]
149macro_rules! try_or_stop {
150 ($expr:expr_2021, $custom:expr_2021) => {{
151 match $expr {
152 Ok(val) => val,
153 Err(err) => {
154 $custom(&err);
155 return TaskNextAction::Stop;
156 }
157 }
158 }};
159 ($expr:expr_2021) => {{
160 match $expr {
161 Ok(val) => val,
162 Err(err) => return TaskNextAction::Stop,
163 }
164 }};
165}
166
167pub trait RunnableTask: Send {
170 fn run(
180 &mut self,
181 watcher: &mut StateWatcher,
182 ) -> impl core::future::Future<Output = TaskNextAction> + Send;
183
184 fn shutdown(self) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
186}
187
188#[derive(Debug)]
191pub struct ServiceRunner<S>
192where
193 S: RunnableService + 'static,
194{
195 pub shared: S::SharedData,
197 state: Shared<watch::Sender<State>>,
198}
199
200impl<S> Drop for ServiceRunner<S>
201where
202 S: RunnableService + 'static,
203{
204 fn drop(&mut self) {
205 self.stop();
206 }
207}
208
209impl<S> ServiceRunner<S>
210where
211 S: RunnableService + 'static,
212 S::TaskParams: Default,
213{
214 pub fn new(service: S) -> Self {
216 Self::new_with_params(service, S::TaskParams::default())
217 }
218}
219
220impl<S> ServiceRunner<S>
221where
222 S: RunnableService + 'static,
223{
224 pub fn new_with_params(service: S, params: S::TaskParams) -> Self {
226 let shared = service.shared_data();
227 let metric = FuturesMetrics::obtain_futures_metrics(S::NAME);
228 let state = initialize_loop(service, params, metric);
229 Self { shared, state }
230 }
231
232 async fn _await_start_or_stop(
233 &self,
234 mut start: StateWatcher,
235 ) -> anyhow::Result<State> {
236 loop {
237 let state = start.borrow().clone();
238 if !state.starting() {
239 return Ok(state);
240 }
241 start.changed().await?;
242 }
243 }
244
245 async fn _await_stop(&self, mut stop: StateWatcher) -> anyhow::Result<State> {
246 loop {
247 let state = stop.borrow().clone();
248 if state.stopped() {
249 return Ok(state);
250 }
251 stop.changed().await?;
252 }
253 }
254}
255
256#[async_trait::async_trait]
257impl<S> Service for ServiceRunner<S>
258where
259 S: RunnableService + 'static,
260{
261 fn start(&self) -> anyhow::Result<()> {
262 let started = self.state.send_if_modified(|state| {
263 if state.not_started() {
264 *state = State::Starting;
265 true
266 } else {
267 false
268 }
269 });
270
271 if started {
272 Ok(())
273 } else {
274 Err(anyhow!(
275 "The service `{}` already has been started.",
276 S::NAME
277 ))
278 }
279 }
280
281 async fn start_and_await(&self) -> anyhow::Result<State> {
282 let start = self.state.subscribe().into();
283 self.start()?;
284 self._await_start_or_stop(start).await
285 }
286
287 async fn await_start_or_stop(&self) -> anyhow::Result<State> {
288 let start = self.state.subscribe().into();
289 self._await_start_or_stop(start).await
290 }
291
292 fn stop(&self) -> bool {
293 self.state.send_if_modified(|state| {
294 if state.not_started() || state.starting() || state.started() {
295 *state = State::Stopping;
296 true
297 } else {
298 false
299 }
300 })
301 }
302
303 async fn stop_and_await(&self) -> anyhow::Result<State> {
304 let stop = self.state.subscribe().into();
305 self.stop();
306 self._await_stop(stop).await
307 }
308
309 async fn await_stop(&self) -> anyhow::Result<State> {
310 let stop = self.state.subscribe().into();
311 self._await_stop(stop).await
312 }
313
314 fn state(&self) -> State {
315 self.state.borrow().clone()
316 }
317
318 fn state_watcher(&self) -> StateWatcher {
319 self.state.subscribe().into()
320 }
321}
322
323#[tracing::instrument(skip_all, fields(service = S::NAME))]
324fn initialize_loop<S>(
326 service: S,
327 params: S::TaskParams,
328 metric: FuturesMetrics,
329) -> Shared<watch::Sender<State>>
330where
331 S: RunnableService + 'static,
332{
333 let (sender, _) = watch::channel(State::NotStarted);
334 let state = Shared::new(sender);
335 let stop_sender = state.clone();
336 tokio::task::spawn(
338 async move {
339 tracing::debug!("running");
340 let run = std::panic::AssertUnwindSafe(run(
341 service,
342 stop_sender.clone(),
343 params,
344 metric,
345 ));
346 tracing::debug!("awaiting run");
347 let result = run.catch_unwind().await;
348
349 let stopped_state = match result {
350 Err(e) => {
351 let panic_information = panic_to_string(e);
352 State::StoppedWithError(panic_information)
353 }
354 _ => State::Stopped,
355 };
356
357 tracing::debug!("shutting down {:?}", stopped_state);
358
359 let _ = stop_sender.send_if_modified(|state| {
360 if !state.stopped() {
361 *state = stopped_state.clone();
362 tracing::debug!("Wasn't stopped, so sent stop.");
363 true
364 } else {
365 tracing::debug!("Was already stopped.");
366 false
367 }
368 });
369
370 tracing::info!("The service {} is shut down", S::NAME);
371
372 if let State::StoppedWithError(err) = stopped_state {
373 std::panic::resume_unwind(Box::new(err));
374 }
375 }
376 .in_current_span(),
377 );
378 state
379}
380
381async fn run<S>(
383 service: S,
384 sender: Shared<watch::Sender<State>>,
385 params: S::TaskParams,
386 metric: FuturesMetrics,
387) where
388 S: RunnableService + 'static,
389{
390 let mut state: StateWatcher = sender.subscribe().into();
391 if state.borrow_and_update().not_started() {
392 state.changed().await.expect("The service is destroyed");
394 }
395
396 if !state.borrow().starting() {
398 return;
399 }
400
401 tracing::info!("Starting {} service", S::NAME);
403 let mut task = service
404 .into_task(&state, params)
405 .await
406 .unwrap_or_else(|e| panic!("The initialization of {} failed: {}", S::NAME, e));
407
408 sender.send_if_modified(|s| {
409 if s.starting() {
410 *s = State::Started;
411 true
412 } else {
413 false
414 }
415 });
416
417 let got_panic = run_task(&mut task, state, &metric).await;
418
419 let got_panic = shutdown_task(S::NAME, task, got_panic).await;
420
421 if let Some(panic) = got_panic {
422 std::panic::resume_unwind(panic)
423 }
424}
425
426async fn run_task<S: RunnableTask>(
427 task: &mut S,
428 mut state: StateWatcher,
429 metric: &FuturesMetrics,
430) -> Option<Box<dyn Any + Send>> {
431 let mut got_panic = None;
432
433 while state.borrow_and_update().started() {
434 let tracked_task = FutureTracker::new(task.run(&mut state));
435 let task = std::panic::AssertUnwindSafe(tracked_task);
436 let panic_result = task.catch_unwind().await;
437
438 if let Err(panic) = panic_result {
439 tracing::debug!("got a panic");
440 got_panic = Some(panic);
441 break;
442 }
443
444 let tracked_result = panic_result.expect("Checked the panic above");
445 let result = tracked_result.extract(metric);
446
447 match result {
448 TaskNextAction::Continue => {
449 tracing::debug!("run loop");
450 }
451 TaskNextAction::Stop => {
452 tracing::debug!("stopping");
453 break;
454 }
455 TaskNextAction::ErrorContinue(e) => {
456 let e: &dyn std::error::Error = &*e;
457 tracing::error!(e);
458 }
459 }
460 }
461 got_panic
462}
463
464async fn shutdown_task<S>(
465 name: &str,
466 task: S,
467 mut got_panic: Option<Box<dyn Any + Send>>,
468) -> Option<Box<dyn Any + Send>>
469where
470 S: RunnableTask,
471{
472 tracing::info!("Shutting down {} service", name);
473 let shutdown = std::panic::AssertUnwindSafe(task.shutdown());
474 match shutdown.catch_unwind().await {
475 Ok(Ok(_)) => {}
476 Ok(Err(e)) => {
477 tracing::error!("Got an error during shutdown of the task: {e}");
478 }
479 Err(e) => {
480 if got_panic.is_some() {
481 let panic_information = panic_to_string(e);
482 tracing::error!(
483 "Go a panic during execution and shutdown of the task. \
484 The error during shutdown: {panic_information}"
485 );
486 } else {
487 got_panic = Some(e);
488 }
489 }
490 }
491 got_panic
492}
493
494fn panic_to_string(e: Box<dyn core::any::Any + Send>) -> String {
495 match e.downcast::<String>() {
496 Ok(v) => *v,
497 Err(e) => match e.downcast::<&str>() {
498 Ok(v) => v.to_string(),
499 _ => "Unknown Source of Error".to_owned(),
500 },
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 mockall::mock! {
509 Service {}
510
511 #[async_trait::async_trait]
512 impl RunnableService for Service {
513 const NAME: &'static str = "MockService";
514
515 type SharedData = EmptyShared;
516 type Task = MockTask;
517 type TaskParams = ();
518
519 fn shared_data(&self) -> EmptyShared;
520
521 async fn into_task(self, state: &StateWatcher, params: <MockService as RunnableService>::TaskParams) -> anyhow::Result<MockTask>;
522 }
523 }
524
525 mockall::mock! {
526 Task {}
527
528 impl RunnableTask for Task {
529 fn run(
530 &mut self,
531 state: &mut StateWatcher
532 ) -> impl core::future::Future<Output = TaskNextAction> + Send;
533
534 async fn shutdown(self) -> anyhow::Result<()>;
535 }
536 }
537
538 impl MockService {
539 fn new_empty() -> Self {
540 let mut mock = MockService::default();
541 mock.expect_shared_data().returning(|| EmptyShared);
542 mock.expect_into_task().returning(|_, _| {
543 let mut mock = MockTask::default();
544 mock.expect_run().returning(|watcher| {
545 let mut watcher = watcher.clone();
546 Box::pin(async move {
547 watcher.while_started().await.unwrap();
548 TaskNextAction::Stop
549 })
550 });
551 mock.expect_shutdown().times(1).returning(|| Ok(()));
552 Ok(mock)
553 });
554 mock
555 }
556 }
557
558 #[tokio::test]
559 async fn start_and_await_stop_and_await_works() {
560 let service = ServiceRunner::new(MockService::new_empty());
561 let state = service.start_and_await().await.unwrap();
562 assert!(state.started());
563 let state = service.stop_and_await().await.unwrap();
564 assert!(matches!(state, State::Stopped));
565 }
566
567 #[tokio::test]
568 async fn double_start_fails() {
569 let service = ServiceRunner::new(MockService::new_empty());
570 assert!(service.start().is_ok());
571 assert!(service.start().is_err());
572 }
573
574 #[tokio::test]
575 async fn double_start_and_await_fails() {
576 let service = ServiceRunner::new(MockService::new_empty());
577 assert!(service.start_and_await().await.is_ok());
578 assert!(service.start_and_await().await.is_err());
579 }
580
581 #[tokio::test]
582 async fn stop_without_start() {
583 let service = ServiceRunner::new(MockService::new_empty());
584 service.stop_and_await().await.unwrap();
585 assert!(matches!(service.state(), State::Stopped));
586 }
587
588 #[tokio::test]
589 async fn panic_during_run() {
590 let mut mock = MockService::default();
591 mock.expect_shared_data().returning(|| EmptyShared);
592 mock.expect_into_task().returning(|_, _| {
593 let mut mock = MockTask::default();
594 mock.expect_run().returning(|_| panic!("Should fail"));
595 mock.expect_shutdown().times(1).returning(|| Ok(()));
596 Ok(mock)
597 });
598 let service = ServiceRunner::new(mock);
599 let state = service.start_and_await().await.unwrap();
600 assert!(matches!(state, State::StoppedWithError(s) if s.contains("Should fail")));
601
602 let state = service.await_stop().await.unwrap();
603 assert!(matches!(state, State::StoppedWithError(s) if s.contains("Should fail")));
604 }
605
606 #[tokio::test]
607 async fn panic_during_shutdown() {
608 let mut mock = MockService::default();
609 mock.expect_shared_data().returning(|| EmptyShared);
610 mock.expect_into_task().returning(|_, _| {
611 let mut mock = MockTask::default();
612 mock.expect_run()
613 .returning(|_| Box::pin(async move { TaskNextAction::Stop }));
614 mock.expect_shutdown()
615 .times(1)
616 .returning(|| panic!("Shutdown should fail"));
617 Ok(mock)
618 });
619 let service = ServiceRunner::new(mock);
620 let state = service.start_and_await().await.unwrap();
621 assert!(
622 matches!(state, State::StoppedWithError(s) if s.contains("Shutdown should fail"))
623 );
624
625 let state = service.await_stop().await.unwrap();
626 assert!(
627 matches!(state, State::StoppedWithError(s) if s.contains("Shutdown should fail"))
628 );
629 }
630
631 #[tokio::test]
632 async fn double_await_stop_works() {
633 let service = ServiceRunner::new(MockService::new_empty());
634 service.start().unwrap();
635 service.stop();
636
637 let state = service.await_stop().await.unwrap();
638 assert!(matches!(state, State::Stopped));
639 let state = service.await_stop().await.unwrap();
640 assert!(matches!(state, State::Stopped));
641 }
642
643 #[tokio::test]
644 async fn double_stop_and_await_works() {
645 let service = ServiceRunner::new(MockService::new_empty());
646 service.start().unwrap();
647
648 let state = service.stop_and_await().await.unwrap();
649 assert!(matches!(state, State::Stopped));
650 let state = service.stop_and_await().await.unwrap();
651 assert!(matches!(state, State::Stopped));
652 }
653
654 #[tokio::test]
655 async fn stop_unused_service() {
656 let mut receiver;
657 {
658 let service = ServiceRunner::new(MockService::new_empty());
659 service.start().unwrap();
660 receiver = service.state.subscribe();
661 }
662
663 receiver.changed().await.unwrap();
664 assert!(matches!(receiver.borrow().clone(), State::Stopping));
665 receiver.changed().await.unwrap();
666 assert!(matches!(receiver.borrow().clone(), State::Stopped));
667 }
668}