kona_engine/task_queue/tasks/consolidate/
task.rs

1//! A task to consolidate the engine state.
2
3use crate::{
4    BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, SynchronizeTask,
5    state::EngineSyncStateUpdate,
6};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
10use std::{sync::Arc, time::Instant};
11
12/// The [`ConsolidateTask`] attempts to consolidate the engine state
13/// using the specified payload attributes and the oldest unsafe head.
14///
15/// If consolidation fails, payload attributes processing is attempted using the [`BuildTask`].
16#[derive(Debug, Clone)]
17pub struct ConsolidateTask {
18    /// The engine client.
19    pub client: Arc<EngineClient>,
20    /// The [`RollupConfig`].
21    pub cfg: Arc<RollupConfig>,
22    /// The [`OpAttributesWithParent`] to instruct the execution layer to build.
23    pub attributes: OpAttributesWithParent,
24    /// Whether or not the payload was derived, or created by the sequencer.
25    pub is_attributes_derived: bool,
26}
27
28impl ConsolidateTask {
29    /// Creates a new [`ConsolidateTask`].
30    pub const fn new(
31        client: Arc<EngineClient>,
32        config: Arc<RollupConfig>,
33        attributes: OpAttributesWithParent,
34        is_attributes_derived: bool,
35    ) -> Self {
36        Self { client, cfg: config, attributes, is_attributes_derived }
37    }
38
39    /// Executes a new [`BuildTask`].
40    /// This is used when the [`ConsolidateTask`] fails to consolidate the engine state.
41    async fn execute_build_task(
42        &self,
43        state: &mut EngineState,
44    ) -> Result<(), ConsolidateTaskError> {
45        let build_task = BuildTask::new(
46            self.client.clone(),
47            self.cfg.clone(),
48            self.attributes.clone(),
49            self.is_attributes_derived,
50            None,
51        );
52        Ok(build_task.execute(state).await?)
53    }
54
55    /// Attempts consolidation on the engine state.
56    pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
57        let global_start = Instant::now();
58
59        // Fetch the unsafe l2 block after the attributes parent.
60        let block_num = self.attributes.block_number();
61        let fetch_start = Instant::now();
62        let block = match self.client.l2_block_by_label(block_num.into()).await {
63            Ok(Some(block)) => block,
64            Ok(None) => {
65                warn!(target: "engine", "Received `None` block for {}", block_num);
66                return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num));
67            }
68            Err(_) => {
69                warn!(target: "engine", "Failed to fetch unsafe l2 block for consolidation");
70                return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block);
71            }
72        };
73        let block_fetch_duration = fetch_start.elapsed();
74
75        // Attempt to consolidate the unsafe head.
76        // If this is successful, the forkchoice change synchronizes.
77        // Otherwise, the attributes need to be processed.
78        let block_hash = block.header.hash;
79        if crate::AttributesMatch::check(&self.cfg, &self.attributes, &block).is_match() {
80            trace!(
81                target: "engine",
82                attributes = ?self.attributes,
83                block_hash = %block_hash,
84                "Consolidating engine state",
85            );
86
87            match L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis) {
88                // Only issue a forkchoice update if the attributes are the last in the span
89                // batch. This is an optimization to avoid sending a FCU
90                // call for every block in the span batch.
91                Ok(block_info) if !self.attributes.is_last_in_span => {
92                    let total_duration = global_start.elapsed();
93
94                    // Apply a transient update to the safe head.
95                    state.sync_state = state.sync_state.apply_update(EngineSyncStateUpdate {
96                        safe_head: Some(block_info),
97                        local_safe_head: Some(block_info),
98                        ..Default::default()
99                    });
100
101                    info!(
102                        target: "engine",
103                        hash = %block_info.block_info.hash,
104                        number = block_info.block_info.number,
105                        ?total_duration,
106                        ?block_fetch_duration,
107                        "Updated safe head via L1 consolidation"
108                    );
109
110                    return Ok(());
111                }
112                Ok(block_info) => {
113                    let fcu_start = Instant::now();
114
115                    SynchronizeTask::new(
116                        Arc::clone(&self.client),
117                        self.cfg.clone(),
118                        EngineSyncStateUpdate {
119                            safe_head: Some(block_info),
120                            local_safe_head: Some(block_info),
121                            ..Default::default()
122                        },
123                    )
124                    .execute(state)
125                    .await
126                    .map_err(|e| {
127                        warn!(target: "engine", ?e, "Consolidation failed");
128                        e
129                    })?;
130
131                    let fcu_duration = fcu_start.elapsed();
132
133                    let total_duration = global_start.elapsed();
134
135                    info!(
136                        target: "engine",
137                        hash = %block_info.block_info.hash,
138                        number = block_info.block_info.number,
139                        ?total_duration,
140                        ?block_fetch_duration,
141                        fcu_duration = ?fcu_duration,
142                        "Updated safe head via L1 consolidation"
143                    );
144
145                    return Ok(());
146                }
147                Err(e) => {
148                    // Continue on to build the block since we failed to construct the block info.
149                    warn!(target: "engine", ?e, "Failed to construct L2BlockInfo, proceeding to build task");
150                }
151            }
152        }
153
154        // Otherwise, the attributes need to be processed.
155        debug!(
156            target: "engine",
157            attributes = ?self.attributes,
158            block_hash = %block_hash,
159            "Attributes mismatch! Executing build task to initiate reorg",
160        );
161        self.execute_build_task(state).await
162    }
163}
164
165#[async_trait]
166impl EngineTaskExt for ConsolidateTask {
167    type Output = ();
168
169    type Error = ConsolidateTaskError;
170
171    async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> {
172        // Skip to building the payload attributes if consolidation is not needed.
173        if state.sync_state.safe_head().block_info.number <
174            state.sync_state.unsafe_head().block_info.number
175        {
176            self.consolidate(state).await
177        } else {
178            self.execute_build_task(state).await
179        }
180    }
181}