kona_node_service/actors/engine/
actor.rs

1//! The [`EngineActor`].
2
3use super::{EngineError, L2Finalizer};
4use alloy_rpc_types_engine::JwtSecret;
5use async_trait::async_trait;
6use futures::future::OptionFuture;
7use kona_derive::{ResetSignal, Signal};
8use kona_engine::{
9    BuildTask, ConsolidateTask, Engine, EngineClient, EngineQueries,
10    EngineState as InnerEngineState, EngineTask, EngineTaskError, EngineTaskErrorSeverity,
11    InsertTask,
12};
13use kona_genesis::RollupConfig;
14use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
15use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
16use std::sync::Arc;
17use tokio::{
18    sync::{mpsc, oneshot, watch},
19    task::JoinHandle,
20};
21use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
22use url::Url;
23
24use crate::{NodeActor, NodeMode, actors::CancellableContext};
25
26/// The [`EngineActor`] is responsible for managing the operations sent to the execution layer's
27/// Engine API. To accomplish this, it uses the [`Engine`] task queue to order Engine API
28/// interactions based off of the [`Ord`] implementation of [`EngineTask`].
29#[derive(Debug)]
30pub struct EngineActor {
31    /// The [`EngineActorState`] used to build the actor.
32    builder: EngineBuilder,
33    /// A channel to receive [`OpAttributesWithParent`] from the derivation actor.
34    attributes_rx: mpsc::Receiver<OpAttributesWithParent>,
35    /// A channel to receive [`OpExecutionPayloadEnvelope`] from the network actor.
36    unsafe_block_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
37    /// A channel to receive reset requests.
38    reset_request_rx: mpsc::Receiver<()>,
39    /// Handler for inbound queries to the engine.
40    inbound_queries: mpsc::Receiver<EngineQueries>,
41    /// A channel to receive build requests from the sequencer actor.
42    ///
43    /// ## Note
44    /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator
45    /// mode.
46    build_request_rx:
47        Option<mpsc::Receiver<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>>,
48    /// The [`L2Finalizer`], used to finalize L2 blocks.
49    finalizer: L2Finalizer,
50}
51
52/// The outbound data for the [`EngineActor`].
53#[derive(Debug)]
54pub struct EngineInboundData {
55    /// The channel used by the sequencer actor to send build requests to the engine actor.
56    ///
57    /// ## Note
58    /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator
59    /// mode.
60    pub build_request_tx:
61        Option<mpsc::Sender<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>>,
62    /// A channel to send [`OpAttributesWithParent`] to the engine actor.
63    pub attributes_tx: mpsc::Sender<OpAttributesWithParent>,
64    /// A channel to send [`OpExecutionPayloadEnvelope`] to the engine actor.
65    ///
66    /// ## Note
67    /// The sequencer actor should not need to send [`OpExecutionPayloadEnvelope`]s to the engine
68    /// actor through that channel. Instead, it should use the `build_request_tx` channel to
69    /// trigger [`BuildTask`] tasks which should insert the block newly built to the engine
70    /// state upon completion.
71    pub unsafe_block_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
72    /// A channel to send reset requests.
73    pub reset_request_tx: mpsc::Sender<()>,
74    /// Handler to send inbound queries to the engine.
75    pub inbound_queries_tx: mpsc::Sender<EngineQueries>,
76    /// A channel that sends new finalized L1 blocks intermittently.
77    pub finalized_l1_block_tx: watch::Sender<Option<BlockInfo>>,
78}
79
80/// Configuration for the Engine Actor.
81#[derive(Debug, Clone)]
82pub struct EngineBuilder {
83    /// The [`RollupConfig`].
84    pub config: Arc<RollupConfig>,
85    /// The engine rpc url.
86    pub engine_url: Url,
87    /// The L1 rpc url.
88    pub l1_rpc_url: Url,
89    /// The engine jwt secret.
90    pub jwt_secret: JwtSecret,
91    /// The mode of operation for the node.
92    /// When the node is in sequencer mode, the engine actor will receive requests to build blocks
93    /// from the sequencer actor.
94    pub mode: NodeMode,
95}
96
97impl EngineBuilder {
98    /// Launches the [`Engine`]. Returns the [`Engine`] and a channel to receive engine state
99    /// updates.
100    fn build_state(self) -> EngineActorState {
101        let client = self.client();
102        let state = InnerEngineState::default();
103        let (engine_state_send, _) = tokio::sync::watch::channel(state);
104        let (engine_queue_length_send, _) = tokio::sync::watch::channel(0);
105
106        EngineActorState {
107            rollup: self.config,
108            client,
109            engine: Engine::new(state, engine_state_send, engine_queue_length_send),
110        }
111    }
112
113    /// Returns the [`EngineClient`].
114    pub fn client(&self) -> Arc<EngineClient> {
115        EngineClient::new_http(
116            self.engine_url.clone(),
117            self.l1_rpc_url.clone(),
118            self.config.clone(),
119            self.jwt_secret,
120        )
121        .into()
122    }
123}
124
125/// The configuration for the [`EngineActor`].
126#[derive(Debug)]
127pub(super) struct EngineActorState {
128    /// The [`RollupConfig`] used to build tasks.
129    pub(super) rollup: Arc<RollupConfig>,
130    /// An [`EngineClient`] used for creating engine tasks.
131    pub(super) client: Arc<EngineClient>,
132    /// The [`Engine`] task queue.
133    pub(super) engine: Engine,
134}
135
136/// The communication context used by the engine actor.
137#[derive(Debug)]
138pub struct EngineContext {
139    /// The cancellation token, shared between all tasks.
140    pub cancellation: CancellationToken,
141    /// A sender for L2 unsafe head update notifications.
142    /// Is optional because it is only used in sequencer mode.
143    pub engine_unsafe_head_tx: Option<watch::Sender<L2BlockInfo>>,
144    /// The sender for L2 safe head update notifications.
145    pub engine_l2_safe_head_tx: watch::Sender<L2BlockInfo>,
146    /// A channel to send a signal that EL sync has completed. Informs the derivation actor to
147    /// start. Because the EL sync state machine within [`InnerEngineState`] can only complete
148    /// once, this channel is consumed after the first successful send. Future cases where EL
149    /// sync is re-triggered can occur, but we will not block derivation on it.
150    pub sync_complete_tx: oneshot::Sender<()>,
151    /// A way for the engine actor to send a [`Signal`] back to the derivation actor.
152    pub derivation_signal_tx: mpsc::Sender<Signal>,
153}
154
155impl CancellableContext for EngineContext {
156    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
157        self.cancellation.cancelled()
158    }
159}
160
161impl EngineActor {
162    /// Constructs a new [`EngineActor`] from the params.
163    pub fn new(config: EngineBuilder) -> (EngineInboundData, Self) {
164        let (finalized_l1_block_tx, finalized_l1_block_rx) = watch::channel(None);
165        let (inbound_queries_tx, inbound_queries_rx) = mpsc::channel(1024);
166        let (attributes_tx, attributes_rx) = mpsc::channel(1024);
167        let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024);
168        let (reset_request_tx, reset_request_rx) = mpsc::channel(1024);
169
170        let (build_request_tx, build_request_rx) = if config.mode.is_sequencer() {
171            let (tx, rx) = mpsc::channel(1024);
172            (Some(tx), Some(rx))
173        } else {
174            (None, None)
175        };
176
177        let actor = Self {
178            builder: config,
179            attributes_rx,
180            unsafe_block_rx,
181            reset_request_rx,
182            inbound_queries: inbound_queries_rx,
183            build_request_rx,
184            finalizer: L2Finalizer::new(finalized_l1_block_rx),
185        };
186
187        let outbound_data = EngineInboundData {
188            build_request_tx,
189            finalized_l1_block_tx,
190            inbound_queries_tx,
191            attributes_tx,
192            unsafe_block_tx,
193            reset_request_tx,
194        };
195
196        (outbound_data, actor)
197    }
198}
199
200impl EngineActorState {
201    /// Starts a task to handle engine queries.
202    fn start_query_task(
203        &self,
204        mut inbound_query_channel: tokio::sync::mpsc::Receiver<EngineQueries>,
205    ) -> JoinHandle<()> {
206        let state_recv = self.engine.state_subscribe();
207        let queue_length_recv = self.engine.queue_length_subscribe();
208        let engine_client = self.client.clone();
209        let rollup_config = self.rollup.clone();
210
211        tokio::spawn(async move {
212            while let Some(req) = inbound_query_channel.recv().await {
213                {
214                    trace!(target: "engine", ?req, "Received engine query request.");
215
216                    if let Err(e) = req
217                        .handle(&state_recv, &queue_length_recv, &engine_client, &rollup_config)
218                        .await
219                    {
220                        warn!(target: "engine", err = ?e, "Failed to handle engine query request.");
221                    }
222                }
223            }
224        })
225    }
226
227    /// Resets the inner [`Engine`] and propagates the reset to the derivation actor.
228    pub(super) async fn reset(
229        &mut self,
230        derivation_signal_tx: &mpsc::Sender<Signal>,
231        engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
232        finalizer: &mut L2Finalizer,
233    ) -> Result<(), EngineError> {
234        // Reset the engine.
235        let (l2_safe_head, l1_origin, system_config) =
236            self.engine.reset(self.client.clone(), self.rollup.clone()).await?;
237
238        // Signal the derivation actor to reset.
239        let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) };
240        match derivation_signal_tx.send(signal.signal()).await {
241            Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"),
242            Err(err) => {
243                error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor");
244                return Err(EngineError::ChannelClosed);
245            }
246        }
247
248        // Attempt to update the safe head following the reset.
249        self.maybe_update_safe_head(engine_l2_safe_head_tx);
250
251        // Clear the queue of L2 blocks awaiting finalization.
252        finalizer.clear();
253
254        Ok(())
255    }
256
257    /// Drains the inner [`Engine`] task queue and attempts to update the safe head.
258    async fn drain(
259        &mut self,
260        derivation_signal_tx: &mpsc::Sender<Signal>,
261        sync_complete_tx: &mut Option<oneshot::Sender<()>>,
262        engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
263        finalizer: &mut L2Finalizer,
264    ) -> Result<(), EngineError> {
265        match self.engine.drain().await {
266            Ok(_) => {
267                trace!(target: "engine", "[ENGINE] tasks drained");
268            }
269            Err(err) => {
270                match err.severity() {
271                    EngineTaskErrorSeverity::Critical => {
272                        error!(target: "engine", ?err, "Critical error draining engine tasks");
273                        return Err(err.into());
274                    }
275                    EngineTaskErrorSeverity::Reset => {
276                        warn!(target: "engine", ?err, "Received reset request");
277                        self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?;
278                    }
279                    EngineTaskErrorSeverity::Flush => {
280                        // This error is encountered when the payload is marked INVALID
281                        // by the engine api. Post-holocene, the payload is replaced by
282                        // a "deposits-only" block and re-executed. At the same time,
283                        // the channel and any remaining buffered batches are flushed.
284                        warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline.");
285                        match derivation_signal_tx.send(Signal::FlushChannel).await {
286                            Ok(_) => {
287                                debug!(target: "engine", "Sent flush signal to derivation actor")
288                            }
289                            Err(err) => {
290                                error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor.");
291                                return Err(EngineError::ChannelClosed);
292                            }
293                        }
294                    }
295                    EngineTaskErrorSeverity::Temporary => {
296                        trace!(target: "engine", ?err, "Temporary error draining engine tasks");
297                    }
298                }
299            }
300        }
301
302        self.maybe_update_safe_head(engine_l2_safe_head_tx);
303        self.check_el_sync(
304            derivation_signal_tx,
305            engine_l2_safe_head_tx,
306            sync_complete_tx,
307            finalizer,
308        )
309        .await?;
310
311        Ok(())
312    }
313
314    /// Checks if the EL has finished syncing, notifying the derivation actor if it has.
315    async fn check_el_sync(
316        &mut self,
317        derivation_signal_tx: &mpsc::Sender<Signal>,
318        engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
319        sync_complete_tx: &mut Option<oneshot::Sender<()>>,
320        finalizer: &mut L2Finalizer,
321    ) -> Result<(), EngineError> {
322        if self.engine.state().el_sync_finished {
323            let Some(sync_complete_tx) = std::mem::take(sync_complete_tx) else {
324                return Ok(());
325            };
326
327            // Only reset the engine if the sync state does not already know about a finalized
328            // block.
329            if self.engine.state().sync_state.finalized_head() != L2BlockInfo::default() {
330                return Ok(());
331            }
332
333            // If the sync status is finished, we can reset the engine and start derivation.
334            info!(target: "engine", "Performing initial engine reset");
335            self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?;
336            sync_complete_tx.send(()).ok();
337        }
338
339        Ok(())
340    }
341
342    /// Attempts to update the safe head via the watch channel.
343    fn maybe_update_safe_head(&self, engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>) {
344        let state_safe_head = self.engine.state().sync_state.safe_head();
345        let update = |head: &mut L2BlockInfo| {
346            if head != &state_safe_head {
347                *head = state_safe_head;
348                return true;
349            }
350            false
351        };
352        let sent = engine_l2_safe_head_tx.send_if_modified(update);
353        trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update");
354    }
355}
356
357#[async_trait]
358impl NodeActor for EngineActor {
359    type Error = EngineError;
360    type OutboundData = EngineContext;
361    type InboundData = EngineInboundData;
362    type Builder = EngineBuilder;
363
364    fn build(config: Self::Builder) -> (Self::InboundData, Self) {
365        Self::new(config)
366    }
367
368    async fn start(
369        mut self,
370        EngineContext {
371            cancellation,
372            engine_l2_safe_head_tx,
373            sync_complete_tx,
374            derivation_signal_tx,
375            mut engine_unsafe_head_tx,
376        }: Self::OutboundData,
377    ) -> Result<(), Self::Error> {
378        let mut state = self.builder.build_state();
379
380        // Start the engine query server in a separate task to avoid blocking the main task.
381        let handle = state.start_query_task(self.inbound_queries);
382
383        // The sync complete tx is consumed after the first successful send. Hence we need to wrap
384        // it in an `Option` to ensure we satisfy the borrow checker.
385        let mut sync_complete_tx = Some(sync_complete_tx);
386
387        loop {
388            // Attempt to drain all outstanding tasks from the engine queue before adding new ones.
389            state
390                .drain(
391                    &derivation_signal_tx,
392                    &mut sync_complete_tx,
393                    &engine_l2_safe_head_tx,
394                    &mut self.finalizer,
395                )
396                .await?;
397
398            // If the unsafe head has updated, propagate it to the outbound channels.
399            if let Some(unsafe_head_tx) = engine_unsafe_head_tx.as_mut() {
400                unsafe_head_tx.send_if_modified(|val| {
401                    let new_head = state.engine.state().sync_state.unsafe_head();
402                    (*val != new_head).then(|| *val = new_head).is_some()
403                });
404            }
405
406            tokio::select! {
407                biased;
408
409                _ = cancellation.cancelled() => {
410                    warn!(target: "engine", "EngineActor received shutdown signal. Aborting engine query task.");
411
412                    handle.abort();
413
414                    return Ok(());
415                }
416                reset = self.reset_request_rx.recv() => {
417                    if reset.is_none() {
418                        error!(target: "engine", "Reset request receiver closed unexpectedly");
419                        cancellation.cancel();
420                        return Err(EngineError::ChannelClosed);
421                    }
422                    warn!(target: "engine", "Received reset request");
423                    state
424                        .reset(&derivation_signal_tx, &engine_l2_safe_head_tx, &mut self.finalizer)
425                        .await?;
426                }
427                Some(res) = OptionFuture::from(self.build_request_rx.as_mut().map(|rx| rx.recv())), if self.build_request_rx.is_some() => {
428                    let Some((attributes, response_tx)) = res else {
429                        error!(target: "engine", "Build request receiver closed unexpectedly while in sequencer mode");
430                        cancellation.cancel();
431                        return Err(EngineError::ChannelClosed);
432                    };
433
434                    let task = EngineTask::Build(BuildTask::new(
435                        state.client.clone(),
436                        state.rollup.clone(),
437                        attributes,
438                        // The payload is not derived in this case.
439                        false,
440                        Some(response_tx),
441                    ));
442                    state.engine.enqueue(task);
443                }
444                unsafe_block = self.unsafe_block_rx.recv() => {
445                    let Some(envelope) = unsafe_block else {
446                        error!(target: "engine", "Unsafe block receiver closed unexpectedly");
447                        cancellation.cancel();
448                        return Err(EngineError::ChannelClosed);
449                    };
450                    let task = EngineTask::Insert(InsertTask::new(
451                        state.client.clone(),
452                        state.rollup.clone(),
453                        envelope,
454                        false, // The payload is not derived in this case. This is an unsafe block.
455                    ));
456                    state.engine.enqueue(task);
457                }
458                attributes = self.attributes_rx.recv() => {
459                    let Some(attributes) = attributes else {
460                        error!(target: "engine", "Attributes receiver closed unexpectedly");
461                        cancellation.cancel();
462                        return Err(EngineError::ChannelClosed);
463                    };
464                    self.finalizer.enqueue_for_finalization(&attributes);
465
466                    let task = EngineTask::Consolidate(ConsolidateTask::new(
467                        state.client.clone(),
468                        state.rollup.clone(),
469                        attributes,
470                        true,
471                    ));
472                    state.engine.enqueue(task);
473                }
474                msg = self.finalizer.new_finalized_block() => {
475                    if let Err(err) = msg {
476                        error!(target: "engine", ?err, "L1 finalized block receiver closed unexpectedly");
477                        cancellation.cancel();
478                        return Err(EngineError::ChannelClosed);
479                    }
480                    // Attempt to finalize any L2 blocks that are contained within the finalized L1
481                    // chain.
482                    self.finalizer.try_finalize_next(&mut state).await;
483                }
484            }
485        }
486    }
487}