kona_engine/task_queue/tasks/build/
task.rs

1//! A task for building a new block and importing it.
2use super::BuildTaskError;
3use crate::{
4    EngineClient, EngineForkchoiceVersion, EngineGetPayloadVersion, EngineState, EngineTaskExt,
5    InsertTask,
6    InsertTaskError::{self},
7    state::EngineSyncStateUpdate,
8    task_queue::tasks::build::error::EngineBuildError,
9};
10use alloy_rpc_types_engine::{ExecutionPayload, PayloadId, PayloadStatusEnum};
11use async_trait::async_trait;
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
14use op_alloy_provider::ext::engine::OpEngineApi;
15use op_alloy_rpc_types_engine::{OpExecutionPayload, OpExecutionPayloadEnvelope};
16use std::{sync::Arc, time::Instant};
17use tokio::sync::mpsc;
18
19/// Task for building new blocks with automatic forkchoice synchronization.
20///
21/// The [`BuildTask`] handles the complete block building workflow, including:
22///
23/// 1. **Automatic Forkchoice Updates**: Performs initial `engine_forkchoiceUpdated` call with
24///    payload attributes to initiate block building on the execution layer
25/// 2. **Payload Construction**: Retrieves the built payload using `engine_getPayload`
26/// 3. **Block Import**: Imports the payload using [`InsertTask`] for canonicalization
27///
28/// ## Forkchoice Integration
29///
30/// Unlike previous versions where forkchoice updates required separate tasks,
31/// `BuildTask` now handles forkchoice synchronization automatically as part of
32/// the block building process. This eliminates the need for explicit forkchoice
33/// management and ensures atomic block building operations.
34///
35/// ## Error Handling
36///
37/// The task uses [`EngineBuildError`] for build-specific failures during the forkchoice
38/// update phase, and delegates to [`InsertTaskError`] for payload import failures.
39///
40/// [`InsertTask`]: crate::InsertTask
41/// [`EngineBuildError`]: crate::EngineBuildError
42/// [`InsertTaskError`]: crate::InsertTaskError
43#[derive(Debug, Clone)]
44pub struct BuildTask {
45    /// The engine API client.
46    pub engine: Arc<EngineClient>,
47    /// The [`RollupConfig`].
48    pub cfg: Arc<RollupConfig>,
49    /// The [`OpAttributesWithParent`] to instruct the execution layer to build.
50    pub attributes: OpAttributesWithParent,
51    /// Whether or not the payload was derived, or created by the sequencer.
52    pub is_attributes_derived: bool,
53    /// An optional channel to send the built [`OpExecutionPayloadEnvelope`] to, after the block
54    /// has been built, imported, and canonicalized.
55    pub payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
56}
57
58impl BuildTask {
59    /// Creates a new block building task.
60    pub const fn new(
61        engine: Arc<EngineClient>,
62        cfg: Arc<RollupConfig>,
63        attributes: OpAttributesWithParent,
64        is_attributes_derived: bool,
65        payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
66    ) -> Self {
67        Self { engine, cfg, attributes, is_attributes_derived, payload_tx }
68    }
69
70    /// Starts the block building process by sending an initial `engine_forkchoiceUpdate` call with
71    /// the payload attributes to build.
72    ///
73    /// ## Observed [PayloadStatusEnum] Variants
74    /// The `engine_forkchoiceUpdate` payload statuses that this function observes are below. Any
75    /// other [PayloadStatusEnum] variant is considered a failure.
76    ///
77    /// ### Success (`VALID`)
78    /// If the build is successful, the [PayloadId] is returned for sealing and the external
79    /// actor is notified of the successful forkchoice update.
80    ///
81    /// ### Failure (`INVALID`)
82    /// If the forkchoice update fails, the external actor is notified of the failure.
83    ///
84    /// ### Syncing (`SYNCING`)
85    /// If the EL is syncing, the payload attributes are buffered and the function returns early.
86    /// This is a temporary state, and the function should be called again later.
87    async fn start_build(
88        &self,
89        state: &EngineState,
90        engine_client: &EngineClient,
91        attributes_envelope: OpAttributesWithParent,
92    ) -> Result<PayloadId, BuildTaskError> {
93        debug!(
94            target: "engine_builder",
95            txs = attributes_envelope.inner.transactions.as_ref().map_or(0, |txs| txs.len()),
96            "Starting new build job"
97        );
98
99        // Sanity check if the head is behind the finalized head. If it is, this is a critical
100        // error.
101        if state.sync_state.unsafe_head().block_info.number <
102            state.sync_state.finalized_head().block_info.number
103        {
104            return Err(BuildTaskError::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe(
105                state.sync_state.unsafe_head().block_info.number,
106                state.sync_state.finalized_head().block_info.number,
107            )));
108        }
109
110        // When inserting a payload, we advertise the parent's unsafe head as the current unsafe
111        // head to build on top of.
112        let new_forkchoice = state
113            .sync_state
114            .apply_update(EngineSyncStateUpdate {
115                unsafe_head: Some(attributes_envelope.parent),
116                ..Default::default()
117            })
118            .create_forkchoice_state();
119
120        let forkchoice_version = EngineForkchoiceVersion::from_cfg(
121            &self.cfg,
122            attributes_envelope.inner.payload_attributes.timestamp,
123        );
124        let update = match forkchoice_version {
125            EngineForkchoiceVersion::V3 => {
126                engine_client
127                    .fork_choice_updated_v3(new_forkchoice, Some(attributes_envelope.inner))
128                    .await
129            }
130            EngineForkchoiceVersion::V2 => {
131                engine_client
132                    .fork_choice_updated_v2(new_forkchoice, Some(attributes_envelope.inner))
133                    .await
134            }
135        }
136        .map_err(|e| {
137            error!(target: "engine_builder", "Forkchoice update failed: {}", e);
138            BuildTaskError::EngineBuildError(EngineBuildError::AttributesInsertionFailed(e))
139        })?;
140
141        match update.payload_status.status {
142            PayloadStatusEnum::Valid => {
143                debug!(
144                    target: "engine_builder",
145                    unsafe_hash = new_forkchoice.head_block_hash.to_string(),
146                    safe_hash = new_forkchoice.safe_block_hash.to_string(),
147                    finalized_hash = new_forkchoice.finalized_block_hash.to_string(),
148                    "Forkchoice update with attributes successful"
149                );
150            }
151            PayloadStatusEnum::Invalid { validation_error } => {
152                error!(target: "engine_builder", "Forkchoice update failed: {}", validation_error);
153                return Err(BuildTaskError::EngineBuildError(EngineBuildError::InvalidPayload(
154                    validation_error,
155                )));
156            }
157            PayloadStatusEnum::Syncing => {
158                warn!(target: "engine_builder", "Forkchoice update failed temporarily: EL is syncing");
159                return Err(BuildTaskError::EngineBuildError(EngineBuildError::EngineSyncing));
160            }
161            s => {
162                // Other codes are never returned by `engine_forkchoiceUpdate`
163                return Err(BuildTaskError::EngineBuildError(
164                    EngineBuildError::UnexpectedPayloadStatus(s),
165                ));
166            }
167        }
168
169        // Fetch the payload ID from the FCU. If no payload ID was returned, something went wrong -
170        // the block building job on the EL should have been initiated.
171        update
172            .payload_id
173            .ok_or(BuildTaskError::EngineBuildError(EngineBuildError::MissingPayloadId))
174    }
175
176    /// Fetches the execution payload from the EL.
177    ///
178    /// ## Engine Method Selection
179    /// The method used to fetch the payload from the EL is determined by the payload timestamp. The
180    /// method used to import the payload into the engine is determined by the payload version.
181    ///
182    /// - `engine_getPayloadV2` is used for payloads with a timestamp before the Ecotone fork.
183    /// - `engine_getPayloadV3` is used for payloads with a timestamp after the Ecotone fork.
184    /// - `engine_getPayloadV4` is used for payloads with a timestamp after the Isthmus fork.
185    async fn fetch_payload(
186        &self,
187        cfg: &RollupConfig,
188        engine: &EngineClient,
189        payload_id: PayloadId,
190        payload_attrs: OpAttributesWithParent,
191    ) -> Result<OpExecutionPayloadEnvelope, BuildTaskError> {
192        let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp;
193
194        debug!(
195            target: "engine_builder",
196            payload_id = payload_id.to_string(),
197            l2_time = payload_timestamp,
198            "Inserting payload"
199        );
200
201        let get_payload_version = EngineGetPayloadVersion::from_cfg(cfg, payload_timestamp);
202        let payload_envelope = match get_payload_version {
203            EngineGetPayloadVersion::V4 => {
204                let payload = engine.get_payload_v4(payload_id).await.map_err(|e| {
205                    error!(target: "engine_builder", "Payload fetch failed: {e}");
206                    BuildTaskError::GetPayloadFailed(e)
207                })?;
208
209                OpExecutionPayloadEnvelope {
210                    parent_beacon_block_root: Some(payload.parent_beacon_block_root),
211                    execution_payload: OpExecutionPayload::V4(payload.execution_payload),
212                }
213            }
214            EngineGetPayloadVersion::V3 => {
215                let payload = engine.get_payload_v3(payload_id).await.map_err(|e| {
216                    error!(target: "engine_builder", "Payload fetch failed: {e}");
217                    BuildTaskError::GetPayloadFailed(e)
218                })?;
219
220                OpExecutionPayloadEnvelope {
221                    parent_beacon_block_root: Some(payload.parent_beacon_block_root),
222                    execution_payload: OpExecutionPayload::V3(payload.execution_payload),
223                }
224            }
225            EngineGetPayloadVersion::V2 => {
226                let payload = engine.get_payload_v2(payload_id).await.map_err(|e| {
227                    error!(target: "engine_builder", "Payload fetch failed: {e}");
228                    BuildTaskError::GetPayloadFailed(e)
229                })?;
230
231                OpExecutionPayloadEnvelope {
232                    parent_beacon_block_root: None,
233                    execution_payload: match payload.execution_payload.into_payload() {
234                        ExecutionPayload::V1(payload) => OpExecutionPayload::V1(payload),
235                        ExecutionPayload::V2(payload) => OpExecutionPayload::V2(payload),
236                        _ => unreachable!("the response should be a V1 or V2 payload"),
237                    },
238                }
239            }
240        };
241
242        Ok(payload_envelope)
243    }
244}
245
246#[async_trait]
247impl EngineTaskExt for BuildTask {
248    type Output = ();
249
250    type Error = BuildTaskError;
251
252    async fn execute(&self, state: &mut EngineState) -> Result<(), BuildTaskError> {
253        debug!(
254            target: "engine_builder",
255            txs = self.attributes.inner().transactions.as_ref().map_or(0, |txs| txs.len()),
256            "Starting new build job"
257        );
258
259        // Start the build by sending an FCU call with the current forkchoice and the input
260        // payload attributes.
261        let fcu_start_time = Instant::now();
262        let payload_id = self.start_build(state, &self.engine, self.attributes.clone()).await?;
263
264        let fcu_duration = fcu_start_time.elapsed();
265
266        // Fetch the payload just inserted from the EL and import it into the engine.
267        let block_import_start_time = Instant::now();
268        let new_payload = self
269            .fetch_payload(&self.cfg, &self.engine, payload_id, self.attributes.clone())
270            .await?;
271
272        let new_block_ref = L2BlockInfo::from_payload_and_genesis(
273            new_payload.execution_payload.clone(),
274            self.attributes.inner().payload_attributes.parent_beacon_block_root,
275            &self.cfg.genesis,
276        )
277        .map_err(BuildTaskError::FromBlock)?;
278
279        // Insert the new block into the engine.
280        match InsertTask::new(
281            Arc::clone(&self.engine),
282            self.cfg.clone(),
283            new_payload.clone(),
284            self.is_attributes_derived,
285        )
286        .execute(state)
287        .await
288        {
289            Err(InsertTaskError::UnexpectedPayloadStatus(e))
290                if self.attributes.is_deposits_only() =>
291            {
292                error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed");
293                return Err(BuildTaskError::DepositOnlyPayloadFailed)
294            }
295            // HOLOCENE: Re-attempt payload import with deposits only
296            Err(InsertTaskError::UnexpectedPayloadStatus(e))
297                if self
298                    .cfg
299                    .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
300            {
301                warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only.");
302                // HOLOCENE: Re-attempt payload import with deposits only
303                match Self::new(
304                    self.engine.clone(),
305                    self.cfg.clone(),
306                    self.attributes.as_deposits_only(),
307                    self.is_attributes_derived,
308                    self.payload_tx.clone(),
309                )
310                .execute(state)
311                .await
312                {
313                    Ok(_) => {
314                        info!(target: "engine_builder", "Successfully imported deposits-only payload")
315                    }
316                    Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed),
317                }
318                return Err(BuildTaskError::HoloceneInvalidFlush)
319            }
320            Err(e) => {
321                error!(target: "engine_builder", "Payload import failed: {e}");
322                return Err(e.into())
323            }
324            Ok(_) => {
325                info!(target: "engine_builder", "Successfully imported payload")
326            }
327        }
328
329        let block_import_duration = block_import_start_time.elapsed();
330
331        // If a channel was provided, send the built payload envelope to it.
332        if let Some(tx) = &self.payload_tx {
333            tx.send(new_payload).await.map_err(BuildTaskError::MpscSend)?;
334        }
335
336        info!(
337            target: "engine_builder",
338            l2_number = new_block_ref.block_info.number,
339            l2_time = new_block_ref.block_info.timestamp,
340            fcu_duration = ?fcu_duration,
341            block_import_duration = ?block_import_duration,
342            "Built and imported new {} block",
343            if self.is_attributes_derived { "safe" } else { "unsafe" },
344        );
345
346        Ok(())
347    }
348}