kona_engine/task_queue/tasks/build/
task.rs

1//! A task for building a new block and importing it.
2use super::BuildTaskError;
3use crate::{
4    EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskExt, ForkchoiceTask,
5    ForkchoiceTaskError, InsertTask,
6    InsertTaskError::{self},
7    Metrics,
8    state::EngineSyncStateUpdate,
9};
10use alloy_rpc_types_engine::{ExecutionPayload, PayloadId};
11use async_trait::async_trait;
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
14use op_alloy_provider::ext::engine::OpEngineApi;
15use op_alloy_rpc_types_engine::{OpExecutionPayload, OpExecutionPayloadEnvelope};
16use std::{sync::Arc, time::Instant};
17use tokio::sync::mpsc;
18
19/// The [`BuildTask`] is responsible for building new blocks and importing them via the engine API.
20#[derive(Debug, Clone)]
21pub struct BuildTask {
22    /// The engine API client.
23    pub engine: Arc<EngineClient>,
24    /// The [`RollupConfig`].
25    pub cfg: Arc<RollupConfig>,
26    /// The [`OpAttributesWithParent`] to instruct the execution layer to build.
27    pub attributes: OpAttributesWithParent,
28    /// Whether or not the payload was derived, or created by the sequencer.
29    pub is_attributes_derived: bool,
30    /// An optional channel to send the built [`OpExecutionPayloadEnvelope`] to, after the block
31    /// has been built, imported, and canonicalized.
32    pub payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
33}
34
35impl BuildTask {
36    /// Creates a new block building task.
37    pub const fn new(
38        engine: Arc<EngineClient>,
39        cfg: Arc<RollupConfig>,
40        attributes: OpAttributesWithParent,
41        is_attributes_derived: bool,
42        payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
43    ) -> Self {
44        Self { engine, cfg, attributes, is_attributes_derived, payload_tx }
45    }
46
47    /// Fetches the execution payload from the EL.
48    ///
49    /// ## Engine Method Selection
50    /// The method used to fetch the payload from the EL is determined by the payload timestamp. The
51    /// method used to import the payload into the engine is determined by the payload version.
52    ///
53    /// - `engine_getPayloadV2` is used for payloads with a timestamp before the Ecotone fork.
54    /// - `engine_getPayloadV3` is used for payloads with a timestamp after the Ecotone fork.
55    /// - `engine_getPayloadV4` is used for payloads with a timestamp after the Isthmus fork.
56    async fn fetch_payload(
57        &self,
58        cfg: &RollupConfig,
59        engine: &EngineClient,
60        payload_id: PayloadId,
61        payload_attrs: OpAttributesWithParent,
62    ) -> Result<OpExecutionPayloadEnvelope, BuildTaskError> {
63        let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp;
64
65        debug!(
66            target: "engine_builder",
67            payload_id = payload_id.to_string(),
68            l2_time = payload_timestamp,
69            "Inserting payload"
70        );
71
72        let get_payload_version = EngineGetPayloadVersion::from_cfg(cfg, payload_timestamp);
73        let payload_envelope = match get_payload_version {
74            EngineGetPayloadVersion::V4 => {
75                let payload = engine.get_payload_v4(payload_id).await.map_err(|e| {
76                    error!(target: "engine_builder", "Payload fetch failed: {e}");
77                    BuildTaskError::GetPayloadFailed(e)
78                })?;
79
80                OpExecutionPayloadEnvelope {
81                    parent_beacon_block_root: Some(payload.parent_beacon_block_root),
82                    payload: OpExecutionPayload::V4(payload.execution_payload),
83                }
84            }
85            EngineGetPayloadVersion::V3 => {
86                let payload = engine.get_payload_v3(payload_id).await.map_err(|e| {
87                    error!(target: "engine_builder", "Payload fetch failed: {e}");
88                    BuildTaskError::GetPayloadFailed(e)
89                })?;
90
91                OpExecutionPayloadEnvelope {
92                    parent_beacon_block_root: Some(payload.parent_beacon_block_root),
93                    payload: OpExecutionPayload::V3(payload.execution_payload),
94                }
95            }
96            EngineGetPayloadVersion::V2 => {
97                let payload = engine.get_payload_v2(payload_id).await.map_err(|e| {
98                    error!(target: "engine_builder", "Payload fetch failed: {e}");
99                    BuildTaskError::GetPayloadFailed(e)
100                })?;
101
102                OpExecutionPayloadEnvelope {
103                    parent_beacon_block_root: None,
104                    payload: match payload.execution_payload.into_payload() {
105                        ExecutionPayload::V1(payload) => OpExecutionPayload::V1(payload),
106                        ExecutionPayload::V2(payload) => OpExecutionPayload::V2(payload),
107                        _ => unreachable!("the response should be a V1 or V2 payload"),
108                    },
109                }
110            }
111        };
112
113        Ok(payload_envelope)
114    }
115}
116
117#[async_trait]
118impl EngineTaskExt for BuildTask {
119    type Output = ();
120
121    type Error = BuildTaskError;
122
123    async fn execute(&self, state: &mut EngineState) -> Result<(), BuildTaskError> {
124        debug!(
125            target: "engine_builder",
126            txs = self.attributes.inner().transactions.as_ref().map_or(0, |txs| txs.len()),
127            "Starting new build job"
128        );
129
130        // Start the build by sending an FCU call with the current forkchoice and the input
131        // payload attributes.
132        let fcu_start_time = Instant::now();
133        let payload_id = ForkchoiceTask::new(
134            self.engine.clone(),
135            self.cfg.clone(),
136            EngineSyncStateUpdate {
137                unsafe_head: Some(self.attributes.parent),
138                ..Default::default()
139            },
140            Some(self.attributes.clone()),
141        )
142        .execute(state)
143        .await?
144        .ok_or(BuildTaskError::MissingPayloadId)?;
145        let fcu_duration = fcu_start_time.elapsed();
146
147        // Fetch the payload just inserted from the EL and import it into the engine.
148        let block_import_start_time = Instant::now();
149        let new_payload = self
150            .fetch_payload(&self.cfg, &self.engine, payload_id, self.attributes.clone())
151            .await?;
152
153        let new_block_ref = L2BlockInfo::from_payload_and_genesis(
154            new_payload.payload.clone(),
155            self.attributes.inner().payload_attributes.parent_beacon_block_root,
156            &self.cfg.genesis,
157        )
158        .map_err(BuildTaskError::FromBlock)?;
159
160        // Insert the new block into the engine.
161        match InsertTask::new(
162            Arc::clone(&self.engine),
163            self.cfg.clone(),
164            new_payload.clone(),
165            self.is_attributes_derived,
166        )
167        .execute(state)
168        .await
169        {
170            Err(InsertTaskError::ForkchoiceUpdateFailed(
171                ForkchoiceTaskError::InvalidPayloadStatus(e),
172            )) if self.attributes.is_deposits_only() => {
173                error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed");
174                return Err(BuildTaskError::DepositOnlyPayloadFailed)
175            }
176            // HOLOCENE: Re-attempt payload import with deposits only
177            Err(InsertTaskError::ForkchoiceUpdateFailed(
178                ForkchoiceTaskError::InvalidPayloadStatus(e),
179            )) if self
180                .cfg
181                .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
182            {
183                warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only.");
184                // HOLOCENE: Re-attempt payload import with deposits only
185                match Self::new(
186                    self.engine.clone(),
187                    self.cfg.clone(),
188                    self.attributes.as_deposits_only(),
189                    self.is_attributes_derived,
190                    self.payload_tx.clone(),
191                )
192                .execute(state)
193                .await
194                {
195                    Ok(_) => {
196                        info!(target: "engine_builder", "Successfully imported deposits-only payload")
197                    }
198                    Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed),
199                }
200                return Err(BuildTaskError::HoloceneInvalidFlush)
201            }
202            Err(e) => {
203                error!(target: "engine_builder", "Payload import failed: {e}");
204                return Err(e.into())
205            }
206            Ok(_) => {
207                info!(target: "engine_builder", "Successfully imported payload")
208            }
209        }
210
211        let block_import_duration = block_import_start_time.elapsed();
212
213        // If a channel was provided, send the built payload envelope to it.
214        if let Some(tx) = &self.payload_tx {
215            tx.send(new_payload).await.map_err(BuildTaskError::MpscSend)?;
216        }
217
218        info!(
219            target: "engine_builder",
220            l2_number = new_block_ref.block_info.number,
221            l2_time = new_block_ref.block_info.timestamp,
222            fcu_duration = ?fcu_duration,
223            block_import_duration = ?block_import_duration,
224            "Built and imported new {} block",
225            if self.is_attributes_derived { "safe" } else { "unsafe" },
226        );
227
228        // Update metrics.
229        kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::BUILD_TASK_LABEL);
230
231        Ok(())
232    }
233}