kona_engine/task_queue/tasks/build/
task.rs1use super::BuildTaskError;
3use crate::{
4 EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskExt, ForkchoiceTask,
5 ForkchoiceTaskError, InsertTask,
6 InsertTaskError::{self},
7 Metrics,
8 state::EngineSyncStateUpdate,
9};
10use alloy_rpc_types_engine::{ExecutionPayload, PayloadId};
11use async_trait::async_trait;
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
14use op_alloy_provider::ext::engine::OpEngineApi;
15use op_alloy_rpc_types_engine::{OpExecutionPayload, OpExecutionPayloadEnvelope};
16use std::{sync::Arc, time::Instant};
17use tokio::sync::mpsc;
18
19#[derive(Debug, Clone)]
21pub struct BuildTask {
22 pub engine: Arc<EngineClient>,
24 pub cfg: Arc<RollupConfig>,
26 pub attributes: OpAttributesWithParent,
28 pub is_attributes_derived: bool,
30 pub payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
33}
34
35impl BuildTask {
36 pub const fn new(
38 engine: Arc<EngineClient>,
39 cfg: Arc<RollupConfig>,
40 attributes: OpAttributesWithParent,
41 is_attributes_derived: bool,
42 payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
43 ) -> Self {
44 Self { engine, cfg, attributes, is_attributes_derived, payload_tx }
45 }
46
47 async fn fetch_payload(
57 &self,
58 cfg: &RollupConfig,
59 engine: &EngineClient,
60 payload_id: PayloadId,
61 payload_attrs: OpAttributesWithParent,
62 ) -> Result<OpExecutionPayloadEnvelope, BuildTaskError> {
63 let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp;
64
65 debug!(
66 target: "engine_builder",
67 payload_id = payload_id.to_string(),
68 l2_time = payload_timestamp,
69 "Inserting payload"
70 );
71
72 let get_payload_version = EngineGetPayloadVersion::from_cfg(cfg, payload_timestamp);
73 let payload_envelope = match get_payload_version {
74 EngineGetPayloadVersion::V4 => {
75 let payload = engine.get_payload_v4(payload_id).await.map_err(|e| {
76 error!(target: "engine_builder", "Payload fetch failed: {e}");
77 BuildTaskError::GetPayloadFailed(e)
78 })?;
79
80 OpExecutionPayloadEnvelope {
81 parent_beacon_block_root: Some(payload.parent_beacon_block_root),
82 payload: OpExecutionPayload::V4(payload.execution_payload),
83 }
84 }
85 EngineGetPayloadVersion::V3 => {
86 let payload = engine.get_payload_v3(payload_id).await.map_err(|e| {
87 error!(target: "engine_builder", "Payload fetch failed: {e}");
88 BuildTaskError::GetPayloadFailed(e)
89 })?;
90
91 OpExecutionPayloadEnvelope {
92 parent_beacon_block_root: Some(payload.parent_beacon_block_root),
93 payload: OpExecutionPayload::V3(payload.execution_payload),
94 }
95 }
96 EngineGetPayloadVersion::V2 => {
97 let payload = engine.get_payload_v2(payload_id).await.map_err(|e| {
98 error!(target: "engine_builder", "Payload fetch failed: {e}");
99 BuildTaskError::GetPayloadFailed(e)
100 })?;
101
102 OpExecutionPayloadEnvelope {
103 parent_beacon_block_root: None,
104 payload: match payload.execution_payload.into_payload() {
105 ExecutionPayload::V1(payload) => OpExecutionPayload::V1(payload),
106 ExecutionPayload::V2(payload) => OpExecutionPayload::V2(payload),
107 _ => unreachable!("the response should be a V1 or V2 payload"),
108 },
109 }
110 }
111 };
112
113 Ok(payload_envelope)
114 }
115}
116
117#[async_trait]
118impl EngineTaskExt for BuildTask {
119 type Output = ();
120
121 type Error = BuildTaskError;
122
123 async fn execute(&self, state: &mut EngineState) -> Result<(), BuildTaskError> {
124 debug!(
125 target: "engine_builder",
126 txs = self.attributes.inner().transactions.as_ref().map_or(0, |txs| txs.len()),
127 "Starting new build job"
128 );
129
130 let fcu_start_time = Instant::now();
133 let payload_id = ForkchoiceTask::new(
134 self.engine.clone(),
135 self.cfg.clone(),
136 EngineSyncStateUpdate {
137 unsafe_head: Some(self.attributes.parent),
138 ..Default::default()
139 },
140 Some(self.attributes.clone()),
141 )
142 .execute(state)
143 .await?
144 .ok_or(BuildTaskError::MissingPayloadId)?;
145 let fcu_duration = fcu_start_time.elapsed();
146
147 let block_import_start_time = Instant::now();
149 let new_payload = self
150 .fetch_payload(&self.cfg, &self.engine, payload_id, self.attributes.clone())
151 .await?;
152
153 let new_block_ref = L2BlockInfo::from_payload_and_genesis(
154 new_payload.payload.clone(),
155 self.attributes.inner().payload_attributes.parent_beacon_block_root,
156 &self.cfg.genesis,
157 )
158 .map_err(BuildTaskError::FromBlock)?;
159
160 match InsertTask::new(
162 Arc::clone(&self.engine),
163 self.cfg.clone(),
164 new_payload.clone(),
165 self.is_attributes_derived,
166 )
167 .execute(state)
168 .await
169 {
170 Err(InsertTaskError::ForkchoiceUpdateFailed(
171 ForkchoiceTaskError::InvalidPayloadStatus(e),
172 )) if self.attributes.is_deposits_only() => {
173 error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed");
174 return Err(BuildTaskError::DepositOnlyPayloadFailed)
175 }
176 Err(InsertTaskError::ForkchoiceUpdateFailed(
178 ForkchoiceTaskError::InvalidPayloadStatus(e),
179 )) if self
180 .cfg
181 .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
182 {
183 warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only.");
184 match Self::new(
186 self.engine.clone(),
187 self.cfg.clone(),
188 self.attributes.as_deposits_only(),
189 self.is_attributes_derived,
190 self.payload_tx.clone(),
191 )
192 .execute(state)
193 .await
194 {
195 Ok(_) => {
196 info!(target: "engine_builder", "Successfully imported deposits-only payload")
197 }
198 Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed),
199 }
200 return Err(BuildTaskError::HoloceneInvalidFlush)
201 }
202 Err(e) => {
203 error!(target: "engine_builder", "Payload import failed: {e}");
204 return Err(e.into())
205 }
206 Ok(_) => {
207 info!(target: "engine_builder", "Successfully imported payload")
208 }
209 }
210
211 let block_import_duration = block_import_start_time.elapsed();
212
213 if let Some(tx) = &self.payload_tx {
215 tx.send(new_payload).await.map_err(BuildTaskError::MpscSend)?;
216 }
217
218 info!(
219 target: "engine_builder",
220 l2_number = new_block_ref.block_info.number,
221 l2_time = new_block_ref.block_info.timestamp,
222 fcu_duration = ?fcu_duration,
223 block_import_duration = ?block_import_duration,
224 "Built and imported new {} block",
225 if self.is_attributes_derived { "safe" } else { "unsafe" },
226 );
227
228 kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::BUILD_TASK_LABEL);
230
231 Ok(())
232 }
233}