informalsystems_malachitebft_engine/
node.rs1use async_trait::async_trait;
2use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
3use tokio::task::JoinHandle;
4use tracing::{error, info, warn};
5
6use malachitebft_core_types::Context;
7
8use crate::consensus::ConsensusRef;
9use crate::host::HostRef;
10use crate::network::NetworkRef;
11use crate::sync::SyncRef;
12use crate::wal::WalRef;
13
14pub type NodeRef = ActorRef<()>;
15
16#[allow(dead_code)]
17pub struct Node<Ctx: Context> {
18 ctx: Ctx,
19 network: NetworkRef<Ctx>,
20 consensus: ConsensusRef<Ctx>,
21 wal: WalRef<Ctx>,
22 sync: Option<SyncRef<Ctx>>,
23 host: HostRef<Ctx>,
24 span: tracing::Span,
25}
26
27impl<Ctx> Node<Ctx>
28where
29 Ctx: Context,
30{
31 #[allow(clippy::too_many_arguments)]
32 pub fn new(
33 ctx: Ctx,
34 network: NetworkRef<Ctx>,
35 consensus: ConsensusRef<Ctx>,
36 wal: WalRef<Ctx>,
37 sync: Option<SyncRef<Ctx>>,
38 host: HostRef<Ctx>,
39 span: tracing::Span,
40 ) -> Self {
41 Self {
42 ctx,
43 network,
44 consensus,
45 wal,
46 sync,
47 host,
48 span,
49 }
50 }
51
52 pub async fn spawn(self) -> Result<(ActorRef<()>, JoinHandle<()>), ractor::SpawnErr> {
53 Actor::spawn(None, self, ()).await
54 }
55}
56
57#[async_trait]
58impl<Ctx> Actor for Node<Ctx>
59where
60 Ctx: Context,
61{
62 type Msg = ();
63 type State = ();
64 type Arguments = ();
65
66 async fn pre_start(
67 &self,
68 myself: ActorRef<Self::Msg>,
69 _args: (),
70 ) -> Result<(), ActorProcessingErr> {
71 self.network.link(myself.get_cell());
73 self.consensus.link(myself.get_cell());
74 self.host.link(myself.get_cell());
75 self.wal.link(myself.get_cell());
76
77 if let Some(actor) = &self.sync {
78 actor.link(myself.get_cell());
79 }
80
81 Ok(())
82 }
83
84 #[tracing::instrument(name = "node", parent = &self.span, skip_all)]
85 async fn handle(
86 &self,
87 _myself: ActorRef<Self::Msg>,
88 _msg: Self::Msg,
89 _state: &mut (),
90 ) -> Result<(), ActorProcessingErr> {
91 Ok(())
92 }
93
94 #[tracing::instrument(name = "node", parent = &self.span, skip_all)]
95 async fn handle_supervisor_evt(
96 &self,
97 _myself: ActorRef<Self::Msg>,
98 evt: SupervisionEvent,
99 _state: &mut (),
100 ) -> Result<(), ActorProcessingErr> {
101 match evt {
102 SupervisionEvent::ActorStarted(cell) => {
103 info!(actor = %cell.get_id(), "Actor has started");
104 }
105 SupervisionEvent::ActorTerminated(cell, _state, reason) => {
106 warn!(
107 "Actor {} has terminated: {}",
108 cell.get_id(),
109 reason.unwrap_or_default()
110 );
111 }
112 SupervisionEvent::ActorFailed(cell, error) => {
113 error!("Actor {} has failed: {error}", cell.get_id());
114 }
115 SupervisionEvent::ProcessGroupChanged(_) => (),
116 }
117
118 Ok(())
119 }
120}