kona_engine/task_queue/
core.rs

1//! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s.
2
3use super::EngineTaskExt;
4use crate::{
5    EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError,
6    EngineTaskErrorSeverity, ForkchoiceTask, Metrics, task_queue::EngineTaskErrors,
7};
8use alloy_provider::Provider;
9use alloy_rpc_types_eth::Transaction;
10use kona_genesis::{RollupConfig, SystemConfig};
11use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
12use kona_sources::{SyncStartError, find_starting_forkchoice};
13use op_alloy_consensus::OpTxEnvelope;
14use std::{collections::BinaryHeap, sync::Arc};
15use thiserror::Error;
16use tokio::sync::watch::Sender;
17
18/// The [`Engine`] task queue.
19///
20/// Tasks of a shared [`EngineTask`] variant are processed in FIFO order, providing synchronization
21/// guarantees for the L2 execution layer and other actors. A priority queue, ordered by
22/// [`EngineTask`]'s [`Ord`] implementation, is used to prioritize tasks executed by the
23/// [`Engine::drain`] method.
24///
25///  Because tasks are executed one at a time, they are considered to be atomic operations over the
26/// [`EngineState`], and are given exclusive access to the engine state during execution.
27///
28/// Tasks within the queue are also considered fallible. If they fail with a temporary error,
29/// they are not popped from the queue, the error is returned, and they are retried on the
30/// next call to [`Engine::drain`].
31#[derive(Debug)]
32pub struct Engine {
33    /// The state of the engine.
34    state: EngineState,
35    /// A sender that can be used to notify the engine actor of state changes.
36    state_sender: Sender<EngineState>,
37    /// The task queue.
38    tasks: BinaryHeap<EngineTask>,
39}
40
41impl Engine {
42    /// Creates a new [`Engine`] with an empty task queue and the passed initial [`EngineState`].
43    ///
44    /// An initial [`EngineTask::ForkchoiceUpdate`] is added to the task queue to synchronize the
45    /// engine with the forkchoice state of the [`EngineState`].
46    pub fn new(initial_state: EngineState, state_sender: Sender<EngineState>) -> Self {
47        Self { state: initial_state, state_sender, tasks: BinaryHeap::default() }
48    }
49
50    /// Returns a reference to the inner [`EngineState`].
51    pub const fn state(&self) -> &EngineState {
52        &self.state
53    }
54
55    /// Returns a receiver that can be used to listen to engine state updates.
56    pub fn subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
57        self.state_sender.subscribe()
58    }
59
60    /// Enqueues a new [`EngineTask`] for execution.
61    pub fn enqueue(&mut self, task: EngineTask) {
62        self.tasks.push(task);
63    }
64
65    /// Resets the engine by finding a plausible sync starting point via
66    /// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a
67    /// forkchoice update will be enqueued in order to reorg the execution layer.
68    pub async fn reset(
69        &mut self,
70        client: Arc<EngineClient>,
71        config: Arc<RollupConfig>,
72    ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
73        // Clear any outstanding tasks to prepare for the reset.
74        self.clear();
75
76        let start =
77            find_starting_forkchoice(&config, client.l1_provider(), client.l2_provider()).await?;
78
79        if let Err(err) = ForkchoiceTask::new(
80            client.clone(),
81            config.clone(),
82            EngineSyncStateUpdate {
83                unsafe_head: Some(start.un_safe),
84                cross_unsafe_head: Some(start.un_safe),
85                local_safe_head: Some(start.safe),
86                safe_head: Some(start.safe),
87                finalized_head: Some(start.finalized),
88            },
89            None,
90        )
91        .execute(&mut self.state)
92        .await
93        {
94            // Ignore temporary errors.
95            if matches!(err.severity(), EngineTaskErrorSeverity::Temporary) {
96                debug!(target: "engine", "Forkchoice update failed temporarily during reset: {}", err);
97            } else {
98                return Err(EngineTaskErrors::Forkchoice(err).into());
99            }
100        }
101
102        // Find the new safe head's L1 origin and SystemConfig.
103        let origin_block = start
104            .safe
105            .l1_origin
106            .number
107            .saturating_sub(config.channel_timeout(start.safe.block_info.timestamp));
108        let l1_origin_info: BlockInfo = client
109            .l1_provider()
110            .get_block(origin_block.into())
111            .await
112            .map_err(SyncStartError::RpcError)?
113            .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
114            .into_consensus()
115            .into();
116        let l2_safe_block = client
117            .l2_provider()
118            .get_block(start.safe.block_info.hash.into())
119            .full()
120            .await
121            .map_err(SyncStartError::RpcError)?
122            .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
123            .into_consensus()
124            .map_transactions(|t| <Transaction<OpTxEnvelope> as Clone>::clone(&t).into_inner());
125        let system_config = to_system_config(&l2_safe_block, &config)?;
126
127        kona_macros::inc!(counter, Metrics::ENGINE_RESET_COUNT);
128
129        Ok((start.safe, l1_origin_info, system_config))
130    }
131
132    /// Clears the task queue.
133    pub fn clear(&mut self) {
134        self.tasks.clear();
135    }
136
137    /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
138    /// an error along the way, it is not popped from the queue (in case it must be retried) and
139    /// the error is returned.
140    pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> {
141        // Drain tasks in order of priority, halting on errors for a retry to be attempted.
142        while let Some(task) = self.tasks.peek() {
143            // Execute the task
144            task.execute(&mut self.state).await?;
145
146            // Update the state and notify the engine actor.
147            self.state_sender.send_replace(self.state);
148
149            // Pop the task from the queue now that it's been executed.
150            self.tasks.pop();
151        }
152
153        Ok(())
154    }
155}
156
157/// An error occurred while attempting to reset the [`Engine`].
158#[derive(Debug, Error)]
159pub enum EngineResetError {
160    /// An error that originated from within an engine task.
161    #[error(transparent)]
162    Task(#[from] EngineTaskErrors),
163    /// An error occurred while traversing the L1 for the sync starting point.
164    #[error(transparent)]
165    SyncStart(#[from] SyncStartError),
166    /// An error occurred while constructing the SystemConfig for the new safe head.
167    #[error(transparent)]
168    SystemConfigConversion(#[from] OpBlockConversionError),
169}