kona_node_service/service/
core.rs

1//! The core [`RollupNodeService`] trait
2use crate::{
3    AttributesBuilderConfig, DerivationContext, EngineContext, L1WatcherRpcContext, NetworkContext,
4    NodeActor, NodeMode, RpcContext, SequencerContext, SequencerInboundData,
5    actors::{
6        DerivationInboundChannels, EngineInboundData, L1WatcherRpcInboundChannels,
7        NetworkInboundData, PipelineBuilder,
8    },
9    service::spawn_and_wait,
10};
11use async_trait::async_trait;
12use kona_derive::{AttributesBuilder, Pipeline, SignalReceiver};
13use std::fmt::Display;
14use tokio_util::sync::CancellationToken;
15
16/// The [`RollupNodeService`] trait defines the common interface for running a rollup node.
17///
18/// ## Validator Mode
19///
20/// The rollup node, in validator mode, listens to two sources of information to sync the L2 chain:
21///
22/// 1. The data availability layer, with a watcher that listens for new updates. L2 inputs (L2
23///    transaction batches + deposits) are then derived from the DA layer.
24/// 2. The L2 sequencer, which produces unsafe L2 blocks and sends them to the network over p2p
25///    gossip.
26///
27/// From these two sources, the node imports `unsafe` blocks from the L2 sequencer, `safe` blocks
28/// from the L2 derivation pipeline into the L2 execution layer via the Engine API, and finalizes
29/// `safe` blocks that it has derived when L1 finalized block updates are received.
30///
31/// ## Sequencer Mode
32///
33/// _Unimplemented - coming soon_.
34///
35/// ## Types
36///
37/// - `DataAvailabilityWatcher`: The type of [`NodeActor`] to use for the DA watcher service.
38/// - `DerivationPipeline`: The type of [Pipeline] to use for the service. Can be swapped out from
39///   the default implementation for the sake of plugins like Alt DA.
40/// - `Error`: The type of error for the service's entrypoint.
41#[async_trait]
42pub trait RollupNodeService {
43    /// The type of [`NodeActor`] to use for the DA watcher service.
44    type DataAvailabilityWatcher: NodeActor<
45            Error: Display,
46            OutboundData = L1WatcherRpcContext,
47            InboundData = L1WatcherRpcInboundChannels,
48        >;
49
50    /// The type of derivation pipeline to use for the service.
51    type DerivationPipeline: Pipeline + SignalReceiver + Send + Sync + 'static;
52
53    /// The type of derivation actor to use for the service.
54    type DerivationActor: NodeActor<
55            Error: Display,
56            Builder: PipelineBuilder<Pipeline = Self::DerivationPipeline>,
57            OutboundData = DerivationContext,
58            InboundData = DerivationInboundChannels,
59        >;
60
61    /// The type of engine actor to use for the service.
62    type EngineActor: NodeActor<Error: Display, OutboundData = EngineContext, InboundData = EngineInboundData>;
63
64    /// The type of network actor to use for the service.
65    type NetworkActor: NodeActor<Error: Display, OutboundData = NetworkContext, InboundData = NetworkInboundData>;
66
67    /// The type of attributes builder to use for the sequener.
68    type AttributesBuilder: AttributesBuilder + Send + Sync + 'static;
69
70    /// The type of sequencer actor to use for the service.
71    type SequencerActor: NodeActor<
72            Error: Display,
73            OutboundData = SequencerContext,
74            Builder: AttributesBuilderConfig<AB = Self::AttributesBuilder>,
75            InboundData = SequencerInboundData,
76        >;
77
78    /// The type of rpc actor to use for the service.
79    type RpcActor: NodeActor<Error: Display, OutboundData = RpcContext, InboundData = ()>;
80
81    /// The mode of operation for the node.
82    fn mode(&self) -> NodeMode;
83
84    /// Returns a DA watcher builder for the node.
85    fn da_watcher_builder(&self) -> <Self::DataAvailabilityWatcher as NodeActor>::Builder;
86
87    /// Returns a derivation builder for the node.
88    fn derivation_builder(&self) -> <Self::DerivationActor as NodeActor>::Builder;
89
90    /// Creates a network builder for the node.
91    fn network_builder(&self) -> <Self::NetworkActor as NodeActor>::Builder;
92
93    /// Returns an engine builder for the node.
94    fn engine_builder(&self) -> <Self::EngineActor as NodeActor>::Builder;
95
96    /// Returns an rpc builder for the node.
97    fn rpc_builder(&self) -> Option<<Self::RpcActor as NodeActor>::Builder>;
98
99    /// Returns the sequencer builder for the node.
100    fn sequencer_builder(&self) -> <Self::SequencerActor as NodeActor>::Builder;
101
102    /// Starts the rollup node service.
103    async fn start(&self) -> Result<(), String> {
104        // Create a global cancellation token for graceful shutdown of tasks.
105        let cancellation = CancellationToken::new();
106
107        // Create the DA watcher actor.
108        let (L1WatcherRpcInboundChannels { inbound_queries: da_watcher_rpc }, da_watcher) =
109            Self::DataAvailabilityWatcher::build(self.da_watcher_builder());
110
111        // Create the derivation actor.
112        let (
113            DerivationInboundChannels {
114                derivation_signal_tx,
115                l1_head_updates_tx,
116                engine_l2_safe_head_tx,
117                el_sync_complete_tx,
118            },
119            derivation,
120        ) = Self::DerivationActor::build(self.derivation_builder());
121
122        // Create the engine actor.
123        let (
124            EngineInboundData {
125                build_request_tx,
126                attributes_tx,
127                unsafe_block_tx,
128                reset_request_tx,
129                inbound_queries_tx: engine_rpc,
130                finalized_l1_block_tx,
131            },
132            engine,
133        ) = Self::EngineActor::build(self.engine_builder());
134
135        // Create the p2p actor.
136        let (
137            NetworkInboundData {
138                signer,
139                p2p_rpc: network_rpc,
140                gossip_payload_tx,
141                admin_rpc: net_admin_rpc,
142            },
143            network,
144        ) = Self::NetworkActor::build(self.network_builder());
145
146        // Create the RPC server actor.
147        let (_, rpc) = self.rpc_builder().map(Self::RpcActor::build).unzip();
148
149        let (sequencer_inbound_data, sequencer) = self
150            .mode()
151            .is_sequencer()
152            .then_some(Self::SequencerActor::build(self.sequencer_builder()))
153            .unzip();
154
155        spawn_and_wait!(
156            cancellation,
157            actors = [
158                rpc.map(|r| (
159                    r,
160                    RpcContext {
161                        cancellation: cancellation.clone(),
162                        p2p_network: network_rpc,
163                        network_admin: net_admin_rpc,
164                        sequencer_admin: sequencer_inbound_data.as_ref().map(|s| s.admin_query_tx.clone()),
165                        l1_watcher_queries: da_watcher_rpc,
166                        engine_query: engine_rpc,
167                    }
168                )),
169                sequencer.map(|s| (
170                    s,
171                    SequencerContext {
172                        l1_head_rx: l1_head_updates_tx.subscribe(),
173                        reset_request_tx: reset_request_tx.clone(),
174                        build_request_tx: build_request_tx.expect(
175                            "`build_request_tx` not set while in sequencer mode. This should never happen.",
176                        ),
177                        gossip_payload_tx,
178                        cancellation: cancellation.clone(),
179                    })
180                ),
181                Some((
182                    network,
183                    NetworkContext { blocks: unsafe_block_tx, cancellation: cancellation.clone() }
184                )),
185                Some((
186                    da_watcher,
187                    L1WatcherRpcContext {
188                        latest_head: l1_head_updates_tx,
189                        latest_finalized: finalized_l1_block_tx,
190                        block_signer_sender: signer,
191                        cancellation: cancellation.clone(),
192                    })
193                ),
194                Some((
195                    derivation,
196                    DerivationContext {
197                        reset_request_tx: reset_request_tx.clone(),
198                        derived_attributes_tx: attributes_tx,
199                        cancellation: cancellation.clone(),
200                })),
201                Some((engine,
202                    EngineContext {
203                        engine_l2_safe_head_tx,
204                        engine_unsafe_head_tx: sequencer_inbound_data
205                            .map(|s| s.unsafe_head_tx),
206                        sync_complete_tx: el_sync_complete_tx,
207                        derivation_signal_tx,
208                        cancellation: cancellation.clone(),
209                    })
210                ),
211            ]
212        );
213        Ok(())
214    }
215}