kona_engine/task_queue/
core.rs

1//! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s.
2
3use super::EngineTaskExt;
4use crate::{
5    EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError,
6    EngineTaskErrorSeverity, Metrics, SynchronizeTask, SynchronizeTaskError,
7    task_queue::EngineTaskErrors,
8};
9use alloy_provider::Provider;
10use alloy_rpc_types_eth::Transaction;
11use kona_genesis::{RollupConfig, SystemConfig};
12use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
13use kona_sources::{SyncStartError, find_starting_forkchoice};
14use op_alloy_consensus::OpTxEnvelope;
15use std::{collections::BinaryHeap, sync::Arc};
16use thiserror::Error;
17use tokio::sync::watch::Sender;
18
19/// The [`Engine`] task queue.
20///
21/// Tasks of a shared [`EngineTask`] variant are processed in FIFO order, providing synchronization
22/// guarantees for the L2 execution layer and other actors. A priority queue, ordered by
23/// [`EngineTask`]'s [`Ord`] implementation, is used to prioritize tasks executed by the
24/// [`Engine::drain`] method.
25///
26///  Because tasks are executed one at a time, they are considered to be atomic operations over the
27/// [`EngineState`], and are given exclusive access to the engine state during execution.
28///
29/// Tasks within the queue are also considered fallible. If they fail with a temporary error,
30/// they are not popped from the queue, the error is returned, and they are retried on the
31/// next call to [`Engine::drain`].
32#[derive(Debug)]
33pub struct Engine {
34    /// The state of the engine.
35    state: EngineState,
36    /// A sender that can be used to notify the engine actor of state changes.
37    state_sender: Sender<EngineState>,
38    /// A sender that can be used to notify the engine actor of task queue length changes.
39    task_queue_length: Sender<usize>,
40    /// The task queue.
41    tasks: BinaryHeap<EngineTask>,
42}
43
44impl Engine {
45    /// Creates a new [`Engine`] with an empty task queue and the passed initial [`EngineState`].
46    pub fn new(
47        initial_state: EngineState,
48        state_sender: Sender<EngineState>,
49        task_queue_length: Sender<usize>,
50    ) -> Self {
51        Self { state: initial_state, state_sender, task_queue_length, tasks: BinaryHeap::default() }
52    }
53
54    /// Returns a reference to the inner [`EngineState`].
55    pub const fn state(&self) -> &EngineState {
56        &self.state
57    }
58
59    /// Returns a receiver that can be used to listen to engine state updates.
60    pub fn state_subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
61        self.state_sender.subscribe()
62    }
63
64    /// Returns a receiver that can be used to listen to engine queue length updates.
65    pub fn queue_length_subscribe(&self) -> tokio::sync::watch::Receiver<usize> {
66        self.task_queue_length.subscribe()
67    }
68
69    /// Enqueues a new [`EngineTask`] for execution.
70    /// Updates the queue length and notifies listeners of the change.
71    pub fn enqueue(&mut self, task: EngineTask) {
72        self.tasks.push(task);
73        self.task_queue_length.send_replace(self.tasks.len());
74    }
75
76    /// Resets the engine by finding a plausible sync starting point via
77    /// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a
78    /// forkchoice update will be enqueued in order to reorg the execution layer.
79    pub async fn reset(
80        &mut self,
81        client: Arc<EngineClient>,
82        config: Arc<RollupConfig>,
83    ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
84        // Clear any outstanding tasks to prepare for the reset.
85        self.clear();
86
87        let start =
88            find_starting_forkchoice(&config, client.l1_provider(), client.l2_engine()).await?;
89
90        // Retry to synchronize the engine until we succeeds or a critical error occurs.
91        while let Err(err) = SynchronizeTask::new(
92            client.clone(),
93            config.clone(),
94            EngineSyncStateUpdate {
95                unsafe_head: Some(start.un_safe),
96                cross_unsafe_head: Some(start.un_safe),
97                local_safe_head: Some(start.safe),
98                safe_head: Some(start.safe),
99                finalized_head: Some(start.finalized),
100            },
101        )
102        .execute(&mut self.state)
103        .await
104        {
105            match err.severity() {
106                EngineTaskErrorSeverity::Temporary |
107                EngineTaskErrorSeverity::Flush |
108                EngineTaskErrorSeverity::Reset => {
109                    debug!(target: "engine", ?err, "Forkchoice update failed during reset. Trying again...");
110                }
111                EngineTaskErrorSeverity::Critical => {
112                    return Err(EngineResetError::Forkchoice(err));
113                }
114            }
115        }
116
117        // Find the new safe head's L1 origin and SystemConfig.
118        let origin_block = start
119            .safe
120            .l1_origin
121            .number
122            .saturating_sub(config.channel_timeout(start.safe.block_info.timestamp));
123        let l1_origin_info: BlockInfo = client
124            .l1_provider()
125            .get_block(origin_block.into())
126            .await
127            .map_err(SyncStartError::RpcError)?
128            .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
129            .into_consensus()
130            .into();
131        let l2_safe_block = client
132            .l2_engine()
133            .get_block(start.safe.block_info.hash.into())
134            .full()
135            .await
136            .map_err(SyncStartError::RpcError)?
137            .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
138            .into_consensus()
139            .map_transactions(|t| <Transaction<OpTxEnvelope> as Clone>::clone(&t).into_inner());
140        let system_config = to_system_config(&l2_safe_block, &config)?;
141
142        kona_macros::inc!(counter, Metrics::ENGINE_RESET_COUNT);
143
144        Ok((start.safe, l1_origin_info, system_config))
145    }
146
147    /// Clears the task queue.
148    pub fn clear(&mut self) {
149        self.tasks.clear();
150    }
151
152    /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
153    /// an error along the way, it is not popped from the queue (in case it must be retried) and
154    /// the error is returned.
155    pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> {
156        // Drain tasks in order of priority, halting on errors for a retry to be attempted.
157        while let Some(task) = self.tasks.peek() {
158            // Execute the task
159            task.execute(&mut self.state).await?;
160
161            // Update the state and notify the engine actor.
162            self.state_sender.send_replace(self.state);
163
164            // Pop the task from the queue now that it's been executed.
165            self.tasks.pop();
166
167            self.task_queue_length.send_replace(self.tasks.len());
168        }
169
170        Ok(())
171    }
172}
173
174/// An error occurred while attempting to reset the [`Engine`].
175#[derive(Debug, Error)]
176pub enum EngineResetError {
177    /// An error that occurred while updating the forkchoice state.
178    #[error(transparent)]
179    Forkchoice(#[from] SynchronizeTaskError),
180    /// An error occurred while traversing the L1 for the sync starting point.
181    #[error(transparent)]
182    SyncStart(#[from] SyncStartError),
183    /// An error occurred while constructing the SystemConfig for the new safe head.
184    #[error(transparent)]
185    SystemConfigConversion(#[from] OpBlockConversionError),
186}