kona_engine/task_queue/tasks/build/
task.rs1use super::BuildTaskError;
3use crate::{
4 EngineClient, EngineForkchoiceVersion, EngineGetPayloadVersion, EngineState, EngineTaskExt,
5 InsertTask,
6 InsertTaskError::{self},
7 state::EngineSyncStateUpdate,
8 task_queue::tasks::build::error::EngineBuildError,
9};
10use alloy_rpc_types_engine::{ExecutionPayload, PayloadId, PayloadStatusEnum};
11use async_trait::async_trait;
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
14use op_alloy_provider::ext::engine::OpEngineApi;
15use op_alloy_rpc_types_engine::{OpExecutionPayload, OpExecutionPayloadEnvelope};
16use std::{sync::Arc, time::Instant};
17use tokio::sync::mpsc;
18
19#[derive(Debug, Clone)]
44pub struct BuildTask {
45 pub engine: Arc<EngineClient>,
47 pub cfg: Arc<RollupConfig>,
49 pub attributes: OpAttributesWithParent,
51 pub is_attributes_derived: bool,
53 pub payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
56}
57
58impl BuildTask {
59 pub const fn new(
61 engine: Arc<EngineClient>,
62 cfg: Arc<RollupConfig>,
63 attributes: OpAttributesWithParent,
64 is_attributes_derived: bool,
65 payload_tx: Option<mpsc::Sender<OpExecutionPayloadEnvelope>>,
66 ) -> Self {
67 Self { engine, cfg, attributes, is_attributes_derived, payload_tx }
68 }
69
70 async fn start_build(
88 &self,
89 state: &EngineState,
90 engine_client: &EngineClient,
91 attributes_envelope: OpAttributesWithParent,
92 ) -> Result<PayloadId, BuildTaskError> {
93 debug!(
94 target: "engine_builder",
95 txs = attributes_envelope.inner.transactions.as_ref().map_or(0, |txs| txs.len()),
96 "Starting new build job"
97 );
98
99 if state.sync_state.unsafe_head().block_info.number <
102 state.sync_state.finalized_head().block_info.number
103 {
104 return Err(BuildTaskError::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe(
105 state.sync_state.unsafe_head().block_info.number,
106 state.sync_state.finalized_head().block_info.number,
107 )));
108 }
109
110 let new_forkchoice = state
113 .sync_state
114 .apply_update(EngineSyncStateUpdate {
115 unsafe_head: Some(attributes_envelope.parent),
116 ..Default::default()
117 })
118 .create_forkchoice_state();
119
120 let forkchoice_version = EngineForkchoiceVersion::from_cfg(
121 &self.cfg,
122 attributes_envelope.inner.payload_attributes.timestamp,
123 );
124 let update = match forkchoice_version {
125 EngineForkchoiceVersion::V3 => {
126 engine_client
127 .fork_choice_updated_v3(new_forkchoice, Some(attributes_envelope.inner))
128 .await
129 }
130 EngineForkchoiceVersion::V2 => {
131 engine_client
132 .fork_choice_updated_v2(new_forkchoice, Some(attributes_envelope.inner))
133 .await
134 }
135 }
136 .map_err(|e| {
137 error!(target: "engine_builder", "Forkchoice update failed: {}", e);
138 BuildTaskError::EngineBuildError(EngineBuildError::AttributesInsertionFailed(e))
139 })?;
140
141 match update.payload_status.status {
142 PayloadStatusEnum::Valid => {
143 debug!(
144 target: "engine_builder",
145 unsafe_hash = new_forkchoice.head_block_hash.to_string(),
146 safe_hash = new_forkchoice.safe_block_hash.to_string(),
147 finalized_hash = new_forkchoice.finalized_block_hash.to_string(),
148 "Forkchoice update with attributes successful"
149 );
150 }
151 PayloadStatusEnum::Invalid { validation_error } => {
152 error!(target: "engine_builder", "Forkchoice update failed: {}", validation_error);
153 return Err(BuildTaskError::EngineBuildError(EngineBuildError::InvalidPayload(
154 validation_error,
155 )));
156 }
157 PayloadStatusEnum::Syncing => {
158 warn!(target: "engine_builder", "Forkchoice update failed temporarily: EL is syncing");
159 return Err(BuildTaskError::EngineBuildError(EngineBuildError::EngineSyncing));
160 }
161 s => {
162 return Err(BuildTaskError::EngineBuildError(
164 EngineBuildError::UnexpectedPayloadStatus(s),
165 ));
166 }
167 }
168
169 update
172 .payload_id
173 .ok_or(BuildTaskError::EngineBuildError(EngineBuildError::MissingPayloadId))
174 }
175
176 async fn fetch_payload(
186 &self,
187 cfg: &RollupConfig,
188 engine: &EngineClient,
189 payload_id: PayloadId,
190 payload_attrs: OpAttributesWithParent,
191 ) -> Result<OpExecutionPayloadEnvelope, BuildTaskError> {
192 let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp;
193
194 debug!(
195 target: "engine_builder",
196 payload_id = payload_id.to_string(),
197 l2_time = payload_timestamp,
198 "Inserting payload"
199 );
200
201 let get_payload_version = EngineGetPayloadVersion::from_cfg(cfg, payload_timestamp);
202 let payload_envelope = match get_payload_version {
203 EngineGetPayloadVersion::V4 => {
204 let payload = engine.get_payload_v4(payload_id).await.map_err(|e| {
205 error!(target: "engine_builder", "Payload fetch failed: {e}");
206 BuildTaskError::GetPayloadFailed(e)
207 })?;
208
209 OpExecutionPayloadEnvelope {
210 parent_beacon_block_root: Some(payload.parent_beacon_block_root),
211 execution_payload: OpExecutionPayload::V4(payload.execution_payload),
212 }
213 }
214 EngineGetPayloadVersion::V3 => {
215 let payload = engine.get_payload_v3(payload_id).await.map_err(|e| {
216 error!(target: "engine_builder", "Payload fetch failed: {e}");
217 BuildTaskError::GetPayloadFailed(e)
218 })?;
219
220 OpExecutionPayloadEnvelope {
221 parent_beacon_block_root: Some(payload.parent_beacon_block_root),
222 execution_payload: OpExecutionPayload::V3(payload.execution_payload),
223 }
224 }
225 EngineGetPayloadVersion::V2 => {
226 let payload = engine.get_payload_v2(payload_id).await.map_err(|e| {
227 error!(target: "engine_builder", "Payload fetch failed: {e}");
228 BuildTaskError::GetPayloadFailed(e)
229 })?;
230
231 OpExecutionPayloadEnvelope {
232 parent_beacon_block_root: None,
233 execution_payload: match payload.execution_payload.into_payload() {
234 ExecutionPayload::V1(payload) => OpExecutionPayload::V1(payload),
235 ExecutionPayload::V2(payload) => OpExecutionPayload::V2(payload),
236 _ => unreachable!("the response should be a V1 or V2 payload"),
237 },
238 }
239 }
240 };
241
242 Ok(payload_envelope)
243 }
244}
245
246#[async_trait]
247impl EngineTaskExt for BuildTask {
248 type Output = ();
249
250 type Error = BuildTaskError;
251
252 async fn execute(&self, state: &mut EngineState) -> Result<(), BuildTaskError> {
253 debug!(
254 target: "engine_builder",
255 txs = self.attributes.inner().transactions.as_ref().map_or(0, |txs| txs.len()),
256 "Starting new build job"
257 );
258
259 let fcu_start_time = Instant::now();
262 let payload_id = self.start_build(state, &self.engine, self.attributes.clone()).await?;
263
264 let fcu_duration = fcu_start_time.elapsed();
265
266 let block_import_start_time = Instant::now();
268 let new_payload = self
269 .fetch_payload(&self.cfg, &self.engine, payload_id, self.attributes.clone())
270 .await?;
271
272 let new_block_ref = L2BlockInfo::from_payload_and_genesis(
273 new_payload.execution_payload.clone(),
274 self.attributes.inner().payload_attributes.parent_beacon_block_root,
275 &self.cfg.genesis,
276 )
277 .map_err(BuildTaskError::FromBlock)?;
278
279 match InsertTask::new(
281 Arc::clone(&self.engine),
282 self.cfg.clone(),
283 new_payload.clone(),
284 self.is_attributes_derived,
285 )
286 .execute(state)
287 .await
288 {
289 Err(InsertTaskError::UnexpectedPayloadStatus(e))
290 if self.attributes.is_deposits_only() =>
291 {
292 error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed");
293 return Err(BuildTaskError::DepositOnlyPayloadFailed)
294 }
295 Err(InsertTaskError::UnexpectedPayloadStatus(e))
297 if self
298 .cfg
299 .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
300 {
301 warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only.");
302 match Self::new(
304 self.engine.clone(),
305 self.cfg.clone(),
306 self.attributes.as_deposits_only(),
307 self.is_attributes_derived,
308 self.payload_tx.clone(),
309 )
310 .execute(state)
311 .await
312 {
313 Ok(_) => {
314 info!(target: "engine_builder", "Successfully imported deposits-only payload")
315 }
316 Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed),
317 }
318 return Err(BuildTaskError::HoloceneInvalidFlush)
319 }
320 Err(e) => {
321 error!(target: "engine_builder", "Payload import failed: {e}");
322 return Err(e.into())
323 }
324 Ok(_) => {
325 info!(target: "engine_builder", "Successfully imported payload")
326 }
327 }
328
329 let block_import_duration = block_import_start_time.elapsed();
330
331 if let Some(tx) = &self.payload_tx {
333 tx.send(new_payload).await.map_err(BuildTaskError::MpscSend)?;
334 }
335
336 info!(
337 target: "engine_builder",
338 l2_number = new_block_ref.block_info.number,
339 l2_time = new_block_ref.block_info.timestamp,
340 fcu_duration = ?fcu_duration,
341 block_import_duration = ?block_import_duration,
342 "Built and imported new {} block",
343 if self.is_attributes_derived { "safe" } else { "unsafe" },
344 );
345
346 Ok(())
347 }
348}