kona_engine/task_queue/
core.rs1use super::EngineTaskExt;
4use crate::{
5 EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError,
6 EngineTaskErrorSeverity, Metrics, SynchronizeTask, SynchronizeTaskError,
7 task_queue::EngineTaskErrors,
8};
9use alloy_provider::Provider;
10use alloy_rpc_types_eth::Transaction;
11use kona_genesis::{RollupConfig, SystemConfig};
12use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
13use kona_sources::{SyncStartError, find_starting_forkchoice};
14use op_alloy_consensus::OpTxEnvelope;
15use std::{collections::BinaryHeap, sync::Arc};
16use thiserror::Error;
17use tokio::sync::watch::Sender;
18
19#[derive(Debug)]
33pub struct Engine {
34 state: EngineState,
36 state_sender: Sender<EngineState>,
38 task_queue_length: Sender<usize>,
40 tasks: BinaryHeap<EngineTask>,
42}
43
44impl Engine {
45 pub fn new(
47 initial_state: EngineState,
48 state_sender: Sender<EngineState>,
49 task_queue_length: Sender<usize>,
50 ) -> Self {
51 Self { state: initial_state, state_sender, task_queue_length, tasks: BinaryHeap::default() }
52 }
53
54 pub const fn state(&self) -> &EngineState {
56 &self.state
57 }
58
59 pub fn state_subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
61 self.state_sender.subscribe()
62 }
63
64 pub fn queue_length_subscribe(&self) -> tokio::sync::watch::Receiver<usize> {
66 self.task_queue_length.subscribe()
67 }
68
69 pub fn enqueue(&mut self, task: EngineTask) {
72 self.tasks.push(task);
73 self.task_queue_length.send_replace(self.tasks.len());
74 }
75
76 pub async fn reset(
80 &mut self,
81 client: Arc<EngineClient>,
82 config: Arc<RollupConfig>,
83 ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
84 self.clear();
86
87 let start =
88 find_starting_forkchoice(&config, client.l1_provider(), client.l2_engine()).await?;
89
90 while let Err(err) = SynchronizeTask::new(
92 client.clone(),
93 config.clone(),
94 EngineSyncStateUpdate {
95 unsafe_head: Some(start.un_safe),
96 cross_unsafe_head: Some(start.un_safe),
97 local_safe_head: Some(start.safe),
98 safe_head: Some(start.safe),
99 finalized_head: Some(start.finalized),
100 },
101 )
102 .execute(&mut self.state)
103 .await
104 {
105 match err.severity() {
106 EngineTaskErrorSeverity::Temporary |
107 EngineTaskErrorSeverity::Flush |
108 EngineTaskErrorSeverity::Reset => {
109 debug!(target: "engine", ?err, "Forkchoice update failed during reset. Trying again...");
110 }
111 EngineTaskErrorSeverity::Critical => {
112 return Err(EngineResetError::Forkchoice(err));
113 }
114 }
115 }
116
117 let origin_block = start
119 .safe
120 .l1_origin
121 .number
122 .saturating_sub(config.channel_timeout(start.safe.block_info.timestamp));
123 let l1_origin_info: BlockInfo = client
124 .l1_provider()
125 .get_block(origin_block.into())
126 .await
127 .map_err(SyncStartError::RpcError)?
128 .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
129 .into_consensus()
130 .into();
131 let l2_safe_block = client
132 .l2_engine()
133 .get_block(start.safe.block_info.hash.into())
134 .full()
135 .await
136 .map_err(SyncStartError::RpcError)?
137 .ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
138 .into_consensus()
139 .map_transactions(|t| <Transaction<OpTxEnvelope> as Clone>::clone(&t).into_inner());
140 let system_config = to_system_config(&l2_safe_block, &config)?;
141
142 kona_macros::inc!(counter, Metrics::ENGINE_RESET_COUNT);
143
144 Ok((start.safe, l1_origin_info, system_config))
145 }
146
147 pub fn clear(&mut self) {
149 self.tasks.clear();
150 }
151
152 pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> {
156 while let Some(task) = self.tasks.peek() {
158 task.execute(&mut self.state).await?;
160
161 self.state_sender.send_replace(self.state);
163
164 self.tasks.pop();
166
167 self.task_queue_length.send_replace(self.tasks.len());
168 }
169
170 Ok(())
171 }
172}
173
174#[derive(Debug, Error)]
176pub enum EngineResetError {
177 #[error(transparent)]
179 Forkchoice(#[from] SynchronizeTaskError),
180 #[error(transparent)]
182 SyncStart(#[from] SyncStartError),
183 #[error(transparent)]
185 SystemConfigConversion(#[from] OpBlockConversionError),
186}