kona_engine/task_queue/tasks/consolidate/
task.rs

1//! A task to consolidate the engine state.
2
3use crate::{
4    BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, ForkchoiceTask,
5    Metrics, state::EngineSyncStateUpdate,
6};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
10use std::{sync::Arc, time::Instant};
11
12/// The [`ConsolidateTask`] attempts to consolidate the engine state
13/// using the specified payload attributes and the oldest unsafe head.
14///
15/// If consolidation fails, payload attributes processing is attempted using the [`BuildTask`].
16#[derive(Debug, Clone)]
17pub struct ConsolidateTask {
18    /// The engine client.
19    pub client: Arc<EngineClient>,
20    /// The [`RollupConfig`].
21    pub cfg: Arc<RollupConfig>,
22    /// The [`OpAttributesWithParent`] to instruct the execution layer to build.
23    pub attributes: OpAttributesWithParent,
24    /// Whether or not the payload was derived, or created by the sequencer.
25    pub is_attributes_derived: bool,
26}
27
28impl ConsolidateTask {
29    /// Creates a new [`ConsolidateTask`].
30    pub const fn new(
31        client: Arc<EngineClient>,
32        config: Arc<RollupConfig>,
33        attributes: OpAttributesWithParent,
34        is_attributes_derived: bool,
35    ) -> Self {
36        Self { client, cfg: config, attributes, is_attributes_derived }
37    }
38
39    /// Executes a new [`BuildTask`].
40    /// This is used when the [`ConsolidateTask`] fails to consolidate the engine state.
41    async fn execute_build_task(
42        &self,
43        state: &mut EngineState,
44    ) -> Result<(), ConsolidateTaskError> {
45        let build_task = BuildTask::new(
46            self.client.clone(),
47            self.cfg.clone(),
48            self.attributes.clone(),
49            self.is_attributes_derived,
50            None,
51        );
52        Ok(build_task.execute(state).await?)
53    }
54
55    /// Attempts consolidation on the engine state.
56    pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
57        let global_start = Instant::now();
58
59        // Fetch the unsafe l2 block after the attributes parent.
60        let block_num = self.attributes.block_number();
61        let fetch_start = Instant::now();
62        let block = match self.client.l2_block_by_label(block_num.into()).await {
63            Ok(Some(block)) => block,
64            Ok(None) => {
65                warn!(target: "engine", "Received `None` block for {}", block_num);
66                return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num));
67            }
68            Err(_) => {
69                warn!(target: "engine", "Failed to fetch unsafe l2 block for consolidation");
70                return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block);
71            }
72        };
73        let block_fetch_duration = fetch_start.elapsed();
74
75        // Attempt to consolidate the unsafe head.
76        // If this is successful, the forkchoice change synchronizes.
77        // Otherwise, the attributes need to be processed.
78        let block_hash = block.header.hash;
79        if crate::AttributesMatch::check(&self.cfg, &self.attributes, &block).is_match() {
80            trace!(
81                target: "engine",
82                attributes = ?self.attributes,
83                block_hash = %block_hash,
84                "Consolidating engine state",
85            );
86
87            match L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis) {
88                // Only issue a forkchoice update if the attributes are the last in the span
89                // batch. This is an optimization to avoid sending a FCU
90                // call for every block in the span batch.
91                Ok(block_info) if !self.attributes.is_last_in_span => {
92                    let total_duration = global_start.elapsed();
93
94                    // Apply a transient update to the safe head.
95                    state.sync_state = state.sync_state.apply_update(EngineSyncStateUpdate {
96                        safe_head: Some(block_info),
97                        local_safe_head: Some(block_info),
98                        ..Default::default()
99                    });
100
101                    // Update metrics.
102                    kona_macros::inc!(
103                        counter,
104                        Metrics::ENGINE_TASK_COUNT,
105                        Metrics::CONSOLIDATE_TASK_LABEL
106                    );
107
108                    info!(
109                        target: "engine",
110                        hash = %block_info.block_info.hash,
111                        number = block_info.block_info.number,
112                        ?total_duration,
113                        ?block_fetch_duration,
114                        "Updated safe head via L1 consolidation"
115                    );
116
117                    return Ok(());
118                }
119                Ok(block_info) => {
120                    let fcu_start = Instant::now();
121
122                    ForkchoiceTask::new(
123                        Arc::clone(&self.client),
124                        self.cfg.clone(),
125                        EngineSyncStateUpdate {
126                            safe_head: Some(block_info),
127                            local_safe_head: Some(block_info),
128                            ..Default::default()
129                        },
130                        None,
131                    )
132                    .execute(state)
133                    .await
134                    .map_err(|e| {
135                        warn!(target: "engine", ?e, "Consolidation failed");
136                        e
137                    })?;
138
139                    let fcu_duration = fcu_start.elapsed();
140
141                    let total_duration = global_start.elapsed();
142
143                    // Update metrics.
144                    kona_macros::inc!(
145                        counter,
146                        Metrics::ENGINE_TASK_COUNT,
147                        Metrics::CONSOLIDATE_TASK_LABEL
148                    );
149
150                    info!(
151                        target: "engine",
152                        hash = %block_info.block_info.hash,
153                        number = block_info.block_info.number,
154                        ?total_duration,
155                        ?block_fetch_duration,
156                        fcu_duration = ?fcu_duration,
157                        "Updated safe head via L1 consolidation"
158                    );
159
160                    return Ok(());
161                }
162                Err(e) => {
163                    // Continue on to build the block since we failed to construct the block info.
164                    warn!(target: "engine", ?e, "Failed to construct L2BlockInfo, proceeding to build task");
165                }
166            }
167        }
168
169        // Otherwise, the attributes need to be processed.
170        debug!(
171            target: "engine",
172            attributes = ?self.attributes,
173            block_hash = %block_hash,
174            "Attributes mismatch! Executing build task to initiate reorg",
175        );
176        self.execute_build_task(state).await
177    }
178}
179
180#[async_trait]
181impl EngineTaskExt for ConsolidateTask {
182    type Output = ();
183
184    type Error = ConsolidateTaskError;
185
186    async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
187        // Skip to building the payload attributes if consolidation is not needed.
188        if state.sync_state.safe_head().block_info.number <
189            state.sync_state.unsafe_head().block_info.number
190        {
191            self.consolidate(state).await
192        } else {
193            self.execute_build_task(state).await
194        }
195    }
196}