informalsystems_malachitebft_engine/
node.rs

1use async_trait::async_trait;
2use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
3use tokio::task::JoinHandle;
4use tracing::{error, info, warn};
5
6use malachitebft_core_types::Context;
7
8use crate::consensus::ConsensusRef;
9use crate::host::HostRef;
10use crate::network::NetworkRef;
11use crate::sync::SyncRef;
12use crate::wal::WalRef;
13
14pub type NodeRef = ActorRef<()>;
15
16#[allow(dead_code)]
17pub struct Node<Ctx: Context> {
18    ctx: Ctx,
19    network: NetworkRef<Ctx>,
20    consensus: ConsensusRef<Ctx>,
21    wal: WalRef<Ctx>,
22    sync: Option<SyncRef<Ctx>>,
23    host: HostRef<Ctx>,
24    span: tracing::Span,
25}
26
27impl<Ctx> Node<Ctx>
28where
29    Ctx: Context,
30{
31    #[allow(clippy::too_many_arguments)]
32    pub fn new(
33        ctx: Ctx,
34        network: NetworkRef<Ctx>,
35        consensus: ConsensusRef<Ctx>,
36        wal: WalRef<Ctx>,
37        sync: Option<SyncRef<Ctx>>,
38        host: HostRef<Ctx>,
39        span: tracing::Span,
40    ) -> Self {
41        Self {
42            ctx,
43            network,
44            consensus,
45            wal,
46            sync,
47            host,
48            span,
49        }
50    }
51
52    pub async fn spawn(self) -> Result<(ActorRef<()>, JoinHandle<()>), ractor::SpawnErr> {
53        Actor::spawn(None, self, ()).await
54    }
55}
56
57#[async_trait]
58impl<Ctx> Actor for Node<Ctx>
59where
60    Ctx: Context,
61{
62    type Msg = ();
63    type State = ();
64    type Arguments = ();
65
66    async fn pre_start(
67        &self,
68        myself: ActorRef<Self::Msg>,
69        _args: (),
70    ) -> Result<(), ActorProcessingErr> {
71        // Set ourselves as the supervisor of the other actors
72        self.network.link(myself.get_cell());
73        self.consensus.link(myself.get_cell());
74        self.host.link(myself.get_cell());
75        self.wal.link(myself.get_cell());
76
77        if let Some(actor) = &self.sync {
78            actor.link(myself.get_cell());
79        }
80
81        Ok(())
82    }
83
84    #[tracing::instrument(name = "node", parent = &self.span, skip_all)]
85    async fn handle(
86        &self,
87        _myself: ActorRef<Self::Msg>,
88        _msg: Self::Msg,
89        _state: &mut (),
90    ) -> Result<(), ActorProcessingErr> {
91        Ok(())
92    }
93
94    #[tracing::instrument(name = "node", parent = &self.span, skip_all)]
95    async fn handle_supervisor_evt(
96        &self,
97        _myself: ActorRef<Self::Msg>,
98        evt: SupervisionEvent,
99        _state: &mut (),
100    ) -> Result<(), ActorProcessingErr> {
101        match evt {
102            SupervisionEvent::ActorStarted(cell) => {
103                info!(actor = %cell.get_id(), "Actor has started");
104            }
105            SupervisionEvent::ActorTerminated(cell, _state, reason) => {
106                warn!(
107                    "Actor {} has terminated: {}",
108                    cell.get_id(),
109                    reason.unwrap_or_default()
110                );
111            }
112            SupervisionEvent::ActorFailed(cell, error) => {
113                error!("Actor {} has failed: {error}", cell.get_id());
114            }
115            SupervisionEvent::ProcessGroupChanged(_) => (),
116        }
117
118        Ok(())
119    }
120}