kona_engine/task_queue/tasks/forkchoice/
task.rs1use crate::{
4 EngineClient, EngineForkchoiceVersion, EngineState, EngineTaskExt, ForkchoiceTaskError,
5 Metrics, state::EngineSyncStateUpdate,
6};
7use alloy_provider::ext::EngineApi;
8use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadId, PayloadStatusEnum};
9use async_trait::async_trait;
10use kona_genesis::RollupConfig;
11use kona_protocol::OpAttributesWithParent;
12use op_alloy_provider::ext::engine::OpEngineApi;
13use std::sync::Arc;
14use tokio::time::Instant;
15
16#[derive(Debug, Clone)]
19pub struct ForkchoiceTask {
20 pub client: Arc<EngineClient>,
22 pub rollup: Arc<RollupConfig>,
24 pub envelope: Option<OpAttributesWithParent>,
26 pub state_update: EngineSyncStateUpdate,
28}
29
30impl ForkchoiceTask {
31 pub const fn new(
33 client: Arc<EngineClient>,
34 rollup: Arc<RollupConfig>,
35 state_update: EngineSyncStateUpdate,
36 payload_attributes: Option<OpAttributesWithParent>,
37 ) -> Self {
38 Self { client, rollup, envelope: payload_attributes, state_update }
39 }
40
41 fn check_forkchoice_updated_status(
44 state: &mut EngineState,
45 status: &PayloadStatusEnum,
46 ) -> Result<(), ForkchoiceTaskError> {
47 match status {
48 PayloadStatusEnum::Valid => {
49 if !state.el_sync_finished {
50 info!(
51 target: "engine",
52 "Finished execution layer sync."
53 );
54 state.el_sync_finished = true;
55 }
56
57 Ok(())
58 }
59 PayloadStatusEnum::Syncing => {
60 debug!(target: "engine", "Forkchoice update failed temporarily: EL is syncing");
61 Err(ForkchoiceTaskError::EngineSyncing)
62 }
63 PayloadStatusEnum::Invalid { validation_error } => {
64 error!(target: "engine", "Forkchoice update failed: {}", validation_error);
65 Err(ForkchoiceTaskError::InvalidPayloadStatus(validation_error.clone()))
66 }
67 s => {
68 Err(ForkchoiceTaskError::UnexpectedPayloadStatus(s.clone()))
70 }
71 }
72 }
73}
74
75#[async_trait]
76impl EngineTaskExt for ForkchoiceTask {
77 type Output = Option<PayloadId>;
78 type Error = ForkchoiceTaskError;
79
80 async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, ForkchoiceTaskError> {
81 let new_sync_state = state.sync_state.apply_update(self.state_update);
83
84 if state.sync_state != Default::default() &&
90 state.sync_state == new_sync_state &&
91 self.envelope.is_none()
92 {
93 return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded);
94 }
95
96 if new_sync_state.unsafe_head().block_info.number <
98 new_sync_state.finalized_head().block_info.number
99 {
100 return Err(ForkchoiceTaskError::FinalizedAheadOfUnsafe(
101 new_sync_state.unsafe_head().block_info.number,
102 new_sync_state.finalized_head().block_info.number,
103 ));
104 }
105
106 let fcu_time_start = Instant::now();
107
108 let version = EngineForkchoiceVersion::from_cfg(
113 &self.rollup,
114 self.envelope.as_ref().map(|p| p.inner.payload_attributes.timestamp).unwrap_or(0),
115 );
116
117 let payload_attributes = self.envelope.as_ref().map(|p| p.inner()).cloned();
119
120 let forkchoice = new_sync_state.create_forkchoice_state();
122
123 let response = match version {
125 EngineForkchoiceVersion::V1 => {
126 self.client
127 .fork_choice_updated_v1(
128 forkchoice,
129 payload_attributes.map(|p| p.payload_attributes),
130 )
131 .await
132 }
133 EngineForkchoiceVersion::V2 => {
134 self.client.fork_choice_updated_v2(forkchoice, payload_attributes).await
135 }
136 EngineForkchoiceVersion::V3 => {
137 self.client.fork_choice_updated_v3(forkchoice, payload_attributes).await
138 }
139 };
140
141 let valid_response = response.map_err(|e| {
142 e.as_error_resp()
144 .and_then(|e| {
145 (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
146 .then_some(ForkchoiceTaskError::InvalidForkchoiceState)
147 })
148 .unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e))
149 })?;
150
151 Self::check_forkchoice_updated_status(state, &valid_response.payload_status.status)?;
154
155 state.sync_state = new_sync_state;
157
158 kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::FORKCHOICE_TASK_LABEL);
160
161 let fcu_duration = fcu_time_start.elapsed();
162 debug!(
163 target: "engine",
164 fcu_duration = ?fcu_duration,
165 forkchoice = ?forkchoice,
166 response = ?valid_response,
167 "Forkchoice updated"
168 );
169
170 Ok(valid_response.payload_id)
171 }
172}