kona_node_service/service/
core.rs1use crate::{
3 AttributesBuilderConfig, DerivationContext, EngineContext, L1WatcherRpcContext, NetworkContext,
4 NodeActor, NodeMode, RpcContext, SequencerContext, SequencerInboundData,
5 actors::{
6 DerivationInboundChannels, EngineInboundData, L1WatcherRpcInboundChannels,
7 NetworkInboundData, PipelineBuilder,
8 },
9 service::spawn_and_wait,
10};
11use async_trait::async_trait;
12use kona_derive::{AttributesBuilder, Pipeline, SignalReceiver};
13use std::fmt::Display;
14use tokio_util::sync::CancellationToken;
15
16#[async_trait]
42pub trait RollupNodeService {
43 type DataAvailabilityWatcher: NodeActor<
45 Error: Display,
46 OutboundData = L1WatcherRpcContext,
47 InboundData = L1WatcherRpcInboundChannels,
48 >;
49
50 type DerivationPipeline: Pipeline + SignalReceiver + Send + Sync + 'static;
52
53 type DerivationActor: NodeActor<
55 Error: Display,
56 Builder: PipelineBuilder<Pipeline = Self::DerivationPipeline>,
57 OutboundData = DerivationContext,
58 InboundData = DerivationInboundChannels,
59 >;
60
61 type EngineActor: NodeActor<Error: Display, OutboundData = EngineContext, InboundData = EngineInboundData>;
63
64 type NetworkActor: NodeActor<Error: Display, OutboundData = NetworkContext, InboundData = NetworkInboundData>;
66
67 type AttributesBuilder: AttributesBuilder + Send + Sync + 'static;
69
70 type SequencerActor: NodeActor<
72 Error: Display,
73 OutboundData = SequencerContext,
74 Builder: AttributesBuilderConfig<AB = Self::AttributesBuilder>,
75 InboundData = SequencerInboundData,
76 >;
77
78 type RpcActor: NodeActor<Error: Display, OutboundData = RpcContext, InboundData = ()>;
80
81 fn mode(&self) -> NodeMode;
83
84 fn da_watcher_builder(&self) -> <Self::DataAvailabilityWatcher as NodeActor>::Builder;
86
87 fn derivation_builder(&self) -> <Self::DerivationActor as NodeActor>::Builder;
89
90 fn network_builder(&self) -> <Self::NetworkActor as NodeActor>::Builder;
92
93 fn engine_builder(&self) -> <Self::EngineActor as NodeActor>::Builder;
95
96 fn rpc_builder(&self) -> Option<<Self::RpcActor as NodeActor>::Builder>;
98
99 fn sequencer_builder(&self) -> <Self::SequencerActor as NodeActor>::Builder;
101
102 async fn start(&self) -> Result<(), String> {
104 let cancellation = CancellationToken::new();
106
107 let (L1WatcherRpcInboundChannels { inbound_queries: da_watcher_rpc }, da_watcher) =
109 Self::DataAvailabilityWatcher::build(self.da_watcher_builder());
110
111 let (
113 DerivationInboundChannels {
114 derivation_signal_tx,
115 l1_head_updates_tx,
116 engine_l2_safe_head_tx,
117 el_sync_complete_tx,
118 },
119 derivation,
120 ) = Self::DerivationActor::build(self.derivation_builder());
121
122 let (
124 EngineInboundData {
125 build_request_tx,
126 attributes_tx,
127 unsafe_block_tx,
128 reset_request_tx,
129 inbound_queries_tx: engine_rpc,
130 finalized_l1_block_tx,
131 },
132 engine,
133 ) = Self::EngineActor::build(self.engine_builder());
134
135 let (
137 NetworkInboundData {
138 signer,
139 p2p_rpc: network_rpc,
140 gossip_payload_tx,
141 admin_rpc: net_admin_rpc,
142 },
143 network,
144 ) = Self::NetworkActor::build(self.network_builder());
145
146 let (_, rpc) = self.rpc_builder().map(Self::RpcActor::build).unzip();
148
149 let (sequencer_inbound_data, sequencer) = self
150 .mode()
151 .is_sequencer()
152 .then_some(Self::SequencerActor::build(self.sequencer_builder()))
153 .unzip();
154
155 spawn_and_wait!(
156 cancellation,
157 actors = [
158 rpc.map(|r| (
159 r,
160 RpcContext {
161 cancellation: cancellation.clone(),
162 p2p_network: network_rpc,
163 network_admin: net_admin_rpc,
164 sequencer_admin: sequencer_inbound_data.as_ref().map(|s| s.admin_query_tx.clone()),
165 l1_watcher_queries: da_watcher_rpc,
166 engine_query: engine_rpc,
167 }
168 )),
169 sequencer.map(|s| (
170 s,
171 SequencerContext {
172 l1_head_rx: l1_head_updates_tx.subscribe(),
173 reset_request_tx: reset_request_tx.clone(),
174 build_request_tx: build_request_tx.expect(
175 "`build_request_tx` not set while in sequencer mode. This should never happen.",
176 ),
177 gossip_payload_tx,
178 cancellation: cancellation.clone(),
179 })
180 ),
181 Some((
182 network,
183 NetworkContext { blocks: unsafe_block_tx, cancellation: cancellation.clone() }
184 )),
185 Some((
186 da_watcher,
187 L1WatcherRpcContext {
188 latest_head: l1_head_updates_tx,
189 latest_finalized: finalized_l1_block_tx,
190 block_signer_sender: signer,
191 cancellation: cancellation.clone(),
192 })
193 ),
194 Some((
195 derivation,
196 DerivationContext {
197 reset_request_tx: reset_request_tx.clone(),
198 derived_attributes_tx: attributes_tx,
199 cancellation: cancellation.clone(),
200 })),
201 Some((engine,
202 EngineContext {
203 engine_l2_safe_head_tx,
204 engine_unsafe_head_tx: sequencer_inbound_data
205 .map(|s| s.unsafe_head_tx),
206 sync_complete_tx: el_sync_complete_tx,
207 derivation_signal_tx,
208 cancellation: cancellation.clone(),
209 })
210 ),
211 ]
212 );
213 Ok(())
214 }
215}