p2panda_net/supervisor/
api.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::sync::Arc;
4
5use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
6use ractor::{ActorRef, call};
7use thiserror::Error;
8use tokio::sync::RwLock;
9
10use crate::supervisor::ChildActorFut;
11use crate::supervisor::actor::{SupervisorActor, SupervisorActorArgs, ToSupervisorActor};
12use crate::supervisor::builder::Builder;
13use crate::supervisor::traits::ChildActor;
14
15/// Monitor system with supervisors and restart modules on critical failure.
16///
17/// ## Example
18///
19/// ```rust
20/// # use std::error::Error;
21/// #
22/// # #[tokio::main]
23/// # async fn main() -> Result<(), Box<dyn Error>> {
24/// # use p2panda_net::{Discovery, Supervisor, AddressBook, Endpoint};
25/// # let address_book = AddressBook::builder().spawn().await?;
26/// # let endpoint = Endpoint::builder(address_book.clone())
27/// #     .spawn()
28/// #     .await?;
29/// let supervisor = Supervisor::builder()
30///     .spawn()
31///     .await?;
32///
33/// // Discovery service is now supervised and will restart automatically on failure.
34/// let discovery = Discovery::builder(address_book, endpoint).spawn_linked(&supervisor).await?;
35/// #
36/// # Ok(())
37/// # }
38/// ```
39#[derive(Clone)]
40pub struct Supervisor {
41    args: SupervisorActorArgs,
42    inner: Arc<RwLock<Inner>>,
43}
44
45struct Inner {
46    actor_ref: Option<ActorRef<ToSupervisorActor>>,
47}
48
49impl Supervisor {
50    pub(crate) fn new(
51        actor_ref: Option<ActorRef<ToSupervisorActor>>,
52        args: SupervisorActorArgs,
53    ) -> Self {
54        Self {
55            inner: Arc::new(RwLock::new(Inner { actor_ref })),
56            args,
57        }
58    }
59
60    pub fn builder() -> Builder {
61        Builder::new()
62    }
63
64    pub(crate) async fn start_child_actor<C>(&self, child: C) -> Result<(), SupervisorError>
65    where
66        C: ChildActor + 'static,
67    {
68        let inner = self.inner.read().await;
69        call!(
70            inner.actor_ref.as_ref().expect("actor spawned in builder"),
71            ToSupervisorActor::StartChildActor,
72            Box::new(child)
73        )
74        .map_err(Box::new)?;
75        Ok(())
76    }
77
78    pub(crate) fn thread_pool(&self) -> ThreadLocalActorSpawner {
79        self.args.1.clone()
80    }
81}
82
83impl Drop for Inner {
84    fn drop(&mut self) {
85        if let Some(actor_ref) = self.actor_ref.take() {
86            actor_ref.stop(None);
87        }
88    }
89}
90
91impl ChildActor for Supervisor {
92    fn on_start(
93        &self,
94        supervisor: ractor::ActorCell,
95        thread_pool: ThreadLocalActorSpawner,
96    ) -> ChildActorFut<'_> {
97        Box::pin(async move {
98            // Spawn our actor as a child of the supervisor.
99            let (actor_ref, _) =
100                SupervisorActor::spawn_linked(None, self.args.clone(), supervisor, thread_pool)
101                    .await?;
102
103            // Update the reference to inner actor, we need this to send messages to it.
104            let mut inner = self.inner.write().await;
105            inner.actor_ref.replace(actor_ref.clone());
106
107            Ok(actor_ref.into())
108        })
109    }
110}
111
112#[derive(Debug, Error)]
113pub enum SupervisorError {
114    /// Spawning the internal actor failed.
115    #[error(transparent)]
116    ActorSpawn(#[from] ractor::SpawnErr),
117
118    /// Messaging with internal actor via RPC failed.
119    #[error(transparent)]
120    ActorRpc(#[from] Box<ractor::RactorErr<ToSupervisorActor>>),
121}