kona_engine/task_queue/tasks/consolidate/
task.rs1use crate::{
4 BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, ForkchoiceTask,
5 Metrics, state::EngineSyncStateUpdate,
6};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
10use std::{sync::Arc, time::Instant};
11
12#[derive(Debug, Clone)]
17pub struct ConsolidateTask {
18 pub client: Arc<EngineClient>,
20 pub cfg: Arc<RollupConfig>,
22 pub attributes: OpAttributesWithParent,
24 pub is_attributes_derived: bool,
26}
27
28impl ConsolidateTask {
29 pub const fn new(
31 client: Arc<EngineClient>,
32 config: Arc<RollupConfig>,
33 attributes: OpAttributesWithParent,
34 is_attributes_derived: bool,
35 ) -> Self {
36 Self { client, cfg: config, attributes, is_attributes_derived }
37 }
38
39 async fn execute_build_task(
42 &self,
43 state: &mut EngineState,
44 ) -> Result<(), ConsolidateTaskError> {
45 let build_task = BuildTask::new(
46 self.client.clone(),
47 self.cfg.clone(),
48 self.attributes.clone(),
49 self.is_attributes_derived,
50 None,
51 );
52 Ok(build_task.execute(state).await?)
53 }
54
55 pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
57 let global_start = Instant::now();
58
59 let block_num = self.attributes.block_number();
61 let fetch_start = Instant::now();
62 let block = match self.client.l2_block_by_label(block_num.into()).await {
63 Ok(Some(block)) => block,
64 Ok(None) => {
65 warn!(target: "engine", "Received `None` block for {}", block_num);
66 return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num));
67 }
68 Err(_) => {
69 warn!(target: "engine", "Failed to fetch unsafe l2 block for consolidation");
70 return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block);
71 }
72 };
73 let block_fetch_duration = fetch_start.elapsed();
74
75 let block_hash = block.header.hash;
79 if crate::AttributesMatch::check(&self.cfg, &self.attributes, &block).is_match() {
80 trace!(
81 target: "engine",
82 attributes = ?self.attributes,
83 block_hash = %block_hash,
84 "Consolidating engine state",
85 );
86
87 match L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis) {
88 Ok(block_info) if !self.attributes.is_last_in_span => {
92 let total_duration = global_start.elapsed();
93
94 state.sync_state = state.sync_state.apply_update(EngineSyncStateUpdate {
96 safe_head: Some(block_info),
97 local_safe_head: Some(block_info),
98 ..Default::default()
99 });
100
101 kona_macros::inc!(
103 counter,
104 Metrics::ENGINE_TASK_COUNT,
105 Metrics::CONSOLIDATE_TASK_LABEL
106 );
107
108 info!(
109 target: "engine",
110 hash = %block_info.block_info.hash,
111 number = block_info.block_info.number,
112 ?total_duration,
113 ?block_fetch_duration,
114 "Updated safe head via L1 consolidation"
115 );
116
117 return Ok(());
118 }
119 Ok(block_info) => {
120 let fcu_start = Instant::now();
121
122 ForkchoiceTask::new(
123 Arc::clone(&self.client),
124 self.cfg.clone(),
125 EngineSyncStateUpdate {
126 safe_head: Some(block_info),
127 local_safe_head: Some(block_info),
128 ..Default::default()
129 },
130 None,
131 )
132 .execute(state)
133 .await
134 .map_err(|e| {
135 warn!(target: "engine", ?e, "Consolidation failed");
136 e
137 })?;
138
139 let fcu_duration = fcu_start.elapsed();
140
141 let total_duration = global_start.elapsed();
142
143 kona_macros::inc!(
145 counter,
146 Metrics::ENGINE_TASK_COUNT,
147 Metrics::CONSOLIDATE_TASK_LABEL
148 );
149
150 info!(
151 target: "engine",
152 hash = %block_info.block_info.hash,
153 number = block_info.block_info.number,
154 ?total_duration,
155 ?block_fetch_duration,
156 fcu_duration = ?fcu_duration,
157 "Updated safe head via L1 consolidation"
158 );
159
160 return Ok(());
161 }
162 Err(e) => {
163 warn!(target: "engine", ?e, "Failed to construct L2BlockInfo, proceeding to build task");
165 }
166 }
167 }
168
169 debug!(
171 target: "engine",
172 attributes = ?self.attributes,
173 block_hash = %block_hash,
174 "Attributes mismatch! Executing build task to initiate reorg",
175 );
176 self.execute_build_task(state).await
177 }
178}
179
180#[async_trait]
181impl EngineTaskExt for ConsolidateTask {
182 type Output = ();
183
184 type Error = ConsolidateTaskError;
185
186 async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
187 if state.sync_state.safe_head().block_info.number <
189 state.sync_state.unsafe_head().block_info.number
190 {
191 self.consolidate(state).await
192 } else {
193 self.execute_build_task(state).await
194 }
195 }
196}