kona_engine/task_queue/
core.rs1use super::EngineTaskExt;
4use crate::{
5 EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError,
6 EngineTaskErrorSeverity, ForkchoiceTask, Metrics, task_queue::EngineTaskErrors,
7};
8use alloy_provider::Provider;
9use alloy_rpc_types_eth::Transaction;
10use kona_genesis::{RollupConfig, SystemConfig};
11use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
12use kona_sources::{SyncStartError, find_starting_forkchoice};
13use op_alloy_consensus::OpTxEnvelope;
14use std::{collections::BinaryHeap, sync::Arc};
15use thiserror::Error;
16use tokio::sync::watch::Sender;
17
18#[derive(Debug)]
32pub struct Engine {
33 state: EngineState,
35 state_sender: Sender<EngineState>,
37 tasks: BinaryHeap<EngineTask>,
39}
40
41impl Engine {
42 pub fn new(initial_state: EngineState, state_sender: Sender<EngineState>) -> Self {
47 Self { state: initial_state, state_sender, tasks: BinaryHeap::default() }
48 }
49
50 pub const fn state(&self) -> &EngineState {
52 &self.state
53 }
54
55 pub fn subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
57 self.state_sender.subscribe()
58 }
59
60 pub fn enqueue(&mut self, task: EngineTask) {
62 self.tasks.push(task);
63 }
64
65 pub async fn reset(
69 &mut self,
70 client: Arc<EngineClient>,
71 config: Arc<RollupConfig>,
72 ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
73 self.clear();
75
76 let start =
77 find_starting_forkchoice(&config, client.l1_provider(), client.l2_provider()).await?;
78
79 if let Err(err) = ForkchoiceTask::new(
80 client.clone(),
81 config.clone(),
82 EngineSyncStateUpdate {
83 unsafe_head: Some(start.un_safe),
84 cross_unsafe_head: Some(start.un_safe),
85 local_safe_head: Some(start.safe),
86 safe_head: Some(start.safe),
87 finalized_head: Some(start.finalized),
88 },
89 None,
90 )
91 .execute(&mut self.state)
92 .await
93 {
94 if matches!(err.severity(), EngineTaskErrorSeverity::Temporary) {
96 debug!(target: "engine", "Forkchoice update failed temporarily during reset: {}", err);
97 } else {
98 return Err(EngineTaskErrors::Forkchoice(err).into());
99 }
100 }
101
102 let origin_block = start
104 .safe
105 .l1_origin
106 .number
107 .saturating_sub(config.channel_timeout(start.safe.block_info.timestamp));
108 let l1_origin_info: BlockInfo = client
109 .l1_provider()
110 .get_block(origin_block.into())
111 .await
112 .map_err(SyncStartError::RpcError)?
113 .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
114 .into_consensus()
115 .into();
116 let l2_safe_block = client
117 .l2_provider()
118 .get_block(start.safe.block_info.hash.into())
119 .full()
120 .await
121 .map_err(SyncStartError::RpcError)?
122 .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
123 .into_consensus()
124 .map_transactions(|t| <Transaction<OpTxEnvelope> as Clone>::clone(&t).into_inner());
125 let system_config = to_system_config(&l2_safe_block, &config)?;
126
127 kona_macros::inc!(counter, Metrics::ENGINE_RESET_COUNT);
128
129 Ok((start.safe, l1_origin_info, system_config))
130 }
131
132 pub fn clear(&mut self) {
134 self.tasks.clear();
135 }
136
137 pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> {
141 while let Some(task) = self.tasks.peek() {
143 task.execute(&mut self.state).await?;
145
146 self.state_sender.send_replace(self.state);
148
149 self.tasks.pop();
151 }
152
153 Ok(())
154 }
155}
156
157#[derive(Debug, Error)]
159pub enum EngineResetError {
160 #[error(transparent)]
162 Task(#[from] EngineTaskErrors),
163 #[error(transparent)]
165 SyncStart(#[from] SyncStartError),
166 #[error(transparent)]
168 SystemConfigConversion(#[from] OpBlockConversionError),
169}