kona_engine/task_queue/tasks/consolidate/
task.rs1use crate::{
4 BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, SynchronizeTask,
5 state::EngineSyncStateUpdate,
6};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
10use std::{sync::Arc, time::Instant};
11
12#[derive(Debug, Clone)]
17pub struct ConsolidateTask {
18 pub client: Arc<EngineClient>,
20 pub cfg: Arc<RollupConfig>,
22 pub attributes: OpAttributesWithParent,
24 pub is_attributes_derived: bool,
26}
27
28impl ConsolidateTask {
29 pub const fn new(
31 client: Arc<EngineClient>,
32 config: Arc<RollupConfig>,
33 attributes: OpAttributesWithParent,
34 is_attributes_derived: bool,
35 ) -> Self {
36 Self { client, cfg: config, attributes, is_attributes_derived }
37 }
38
39 async fn execute_build_task(
42 &self,
43 state: &mut EngineState,
44 ) -> Result<(), ConsolidateTaskError> {
45 let build_task = BuildTask::new(
46 self.client.clone(),
47 self.cfg.clone(),
48 self.attributes.clone(),
49 self.is_attributes_derived,
50 None,
51 );
52 Ok(build_task.execute(state).await?)
53 }
54
55 pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
57 let global_start = Instant::now();
58
59 let block_num = self.attributes.block_number();
61 let fetch_start = Instant::now();
62 let block = match self.client.l2_block_by_label(block_num.into()).await {
63 Ok(Some(block)) => block,
64 Ok(None) => {
65 warn!(target: "engine", "Received `None` block for {}", block_num);
66 return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num));
67 }
68 Err(_) => {
69 warn!(target: "engine", "Failed to fetch unsafe l2 block for consolidation");
70 return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block);
71 }
72 };
73 let block_fetch_duration = fetch_start.elapsed();
74
75 let block_hash = block.header.hash;
79 if crate::AttributesMatch::check(&self.cfg, &self.attributes, &block).is_match() {
80 trace!(
81 target: "engine",
82 attributes = ?self.attributes,
83 block_hash = %block_hash,
84 "Consolidating engine state",
85 );
86
87 match L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis) {
88 Ok(block_info) if !self.attributes.is_last_in_span => {
92 let total_duration = global_start.elapsed();
93
94 state.sync_state = state.sync_state.apply_update(EngineSyncStateUpdate {
96 safe_head: Some(block_info),
97 local_safe_head: Some(block_info),
98 ..Default::default()
99 });
100
101 info!(
102 target: "engine",
103 hash = %block_info.block_info.hash,
104 number = block_info.block_info.number,
105 ?total_duration,
106 ?block_fetch_duration,
107 "Updated safe head via L1 consolidation"
108 );
109
110 return Ok(());
111 }
112 Ok(block_info) => {
113 let fcu_start = Instant::now();
114
115 SynchronizeTask::new(
116 Arc::clone(&self.client),
117 self.cfg.clone(),
118 EngineSyncStateUpdate {
119 safe_head: Some(block_info),
120 local_safe_head: Some(block_info),
121 ..Default::default()
122 },
123 )
124 .execute(state)
125 .await
126 .map_err(|e| {
127 warn!(target: "engine", ?e, "Consolidation failed");
128 e
129 })?;
130
131 let fcu_duration = fcu_start.elapsed();
132
133 let total_duration = global_start.elapsed();
134
135 info!(
136 target: "engine",
137 hash = %block_info.block_info.hash,
138 number = block_info.block_info.number,
139 ?total_duration,
140 ?block_fetch_duration,
141 fcu_duration = ?fcu_duration,
142 "Updated safe head via L1 consolidation"
143 );
144
145 return Ok(());
146 }
147 Err(e) => {
148 warn!(target: "engine", ?e, "Failed to construct L2BlockInfo, proceeding to build task");
150 }
151 }
152 }
153
154 debug!(
156 target: "engine",
157 attributes = ?self.attributes,
158 block_hash = %block_hash,
159 "Attributes mismatch! Executing build task to initiate reorg",
160 );
161 self.execute_build_task(state).await
162 }
163}
164
165#[async_trait]
166impl EngineTaskExt for ConsolidateTask {
167 type Output = ();
168
169 type Error = ConsolidateTaskError;
170
171 async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
172 if state.sync_state.safe_head().block_info.number <
174 state.sync_state.unsafe_head().block_info.number
175 {
176 self.consolidate(state).await
177 } else {
178 self.execute_build_task(state).await
179 }
180 }
181}