kona_engine/task_queue/tasks/forkchoice/
task.rs

1//! A task for the `engine_forkchoiceUpdated` method, with no attributes.
2
3use crate::{
4    EngineClient, EngineForkchoiceVersion, EngineState, EngineTaskExt, ForkchoiceTaskError,
5    Metrics, state::EngineSyncStateUpdate,
6};
7use alloy_provider::ext::EngineApi;
8use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadId, PayloadStatusEnum};
9use async_trait::async_trait;
10use kona_genesis::RollupConfig;
11use kona_protocol::OpAttributesWithParent;
12use op_alloy_provider::ext::engine::OpEngineApi;
13use std::sync::Arc;
14use tokio::time::Instant;
15
16/// The [`ForkchoiceTask`] executes an `engine_forkchoiceUpdated` call with the current
17/// [`EngineState`]'s forkchoice, and no payload attributes.
18#[derive(Debug, Clone)]
19pub struct ForkchoiceTask {
20    /// The engine client.
21    pub client: Arc<EngineClient>,
22    /// The rollup config.
23    pub rollup: Arc<RollupConfig>,
24    /// Optional payload attributes to be used for the forkchoice update.
25    pub envelope: Option<OpAttributesWithParent>,
26    /// The sync state update to apply to the engine state.
27    pub state_update: EngineSyncStateUpdate,
28}
29
30impl ForkchoiceTask {
31    /// Creates a new [`ForkchoiceTask`].
32    pub const fn new(
33        client: Arc<EngineClient>,
34        rollup: Arc<RollupConfig>,
35        state_update: EngineSyncStateUpdate,
36        payload_attributes: Option<OpAttributesWithParent>,
37    ) -> Self {
38        Self { client, rollup, envelope: payload_attributes, state_update }
39    }
40
41    /// Checks the response of the `engine_forkchoiceUpdated` call, and updates the sync status if
42    /// necessary.
43    fn check_forkchoice_updated_status(
44        state: &mut EngineState,
45        status: &PayloadStatusEnum,
46    ) -> Result<(), ForkchoiceTaskError> {
47        match status {
48            PayloadStatusEnum::Valid => {
49                if !state.el_sync_finished {
50                    info!(
51                        target: "engine",
52                        "Finished execution layer sync."
53                    );
54                    state.el_sync_finished = true;
55                }
56
57                Ok(())
58            }
59            PayloadStatusEnum::Syncing => {
60                debug!(target: "engine", "Forkchoice update failed temporarily: EL is syncing");
61                Err(ForkchoiceTaskError::EngineSyncing)
62            }
63            PayloadStatusEnum::Invalid { validation_error } => {
64                error!(target: "engine", "Forkchoice update failed: {}", validation_error);
65                Err(ForkchoiceTaskError::InvalidPayloadStatus(validation_error.clone()))
66            }
67            s => {
68                // Other codes are never returned by `engine_forkchoiceUpdate`
69                Err(ForkchoiceTaskError::UnexpectedPayloadStatus(s.clone()))
70            }
71        }
72    }
73}
74
75#[async_trait]
76impl EngineTaskExt for ForkchoiceTask {
77    type Output = Option<PayloadId>;
78    type Error = ForkchoiceTaskError;
79
80    async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, ForkchoiceTaskError> {
81        // Apply the sync state update to the engine state.
82        let new_sync_state = state.sync_state.apply_update(self.state_update);
83
84        // Check if a forkchoice update is not needed, return early.
85        // A forkchoice update is not needed if...
86        // 1. The engine state is not default (initial forkchoice state has been emitted), and
87        // 2. The new sync state is the same as the current sync state (no changes to the sync
88        //    state).
89        if state.sync_state != Default::default() &&
90            state.sync_state == new_sync_state &&
91            self.envelope.is_none()
92        {
93            return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded);
94        }
95
96        // Check if the head is behind the finalized head.
97        if new_sync_state.unsafe_head().block_info.number <
98            new_sync_state.finalized_head().block_info.number
99        {
100            return Err(ForkchoiceTaskError::FinalizedAheadOfUnsafe(
101                new_sync_state.unsafe_head().block_info.number,
102                new_sync_state.finalized_head().block_info.number,
103            ));
104        }
105
106        let fcu_time_start = Instant::now();
107
108        // Determine the forkchoice version to use.
109        // Note that if the envelope is not provided, we use the forkchoice version from the
110        // timestamp zero. The version number in `fork_choice_updated_v*`
111        // methods only matters for the payload attributes.
112        let version = EngineForkchoiceVersion::from_cfg(
113            &self.rollup,
114            self.envelope.as_ref().map(|p| p.inner.payload_attributes.timestamp).unwrap_or(0),
115        );
116
117        // TODO(@theochap, `<https://github.com/op-rs/kona/issues/2387>`): we should avoid cloning the payload attributes here.
118        let payload_attributes = self.envelope.as_ref().map(|p| p.inner()).cloned();
119
120        // Send the forkchoice update through the input.
121        let forkchoice = new_sync_state.create_forkchoice_state();
122
123        // Handle the forkchoice update result.
124        let response = match version {
125            EngineForkchoiceVersion::V1 => {
126                self.client
127                    .fork_choice_updated_v1(
128                        forkchoice,
129                        payload_attributes.map(|p| p.payload_attributes),
130                    )
131                    .await
132            }
133            EngineForkchoiceVersion::V2 => {
134                self.client.fork_choice_updated_v2(forkchoice, payload_attributes).await
135            }
136            EngineForkchoiceVersion::V3 => {
137                self.client.fork_choice_updated_v3(forkchoice, payload_attributes).await
138            }
139        };
140
141        let valid_response = response.map_err(|e| {
142            // Fatal forkchoice update error.
143            e.as_error_resp()
144                .and_then(|e| {
145                    (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
146                        .then_some(ForkchoiceTaskError::InvalidForkchoiceState)
147                })
148                .unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e))
149        })?;
150
151        // Unexpected forkchoice payload status.
152        // We may be able to recover from this by resetting the engine.
153        Self::check_forkchoice_updated_status(state, &valid_response.payload_status.status)?;
154
155        // Apply the new sync state to the engine state.
156        state.sync_state = new_sync_state;
157
158        // Update metrics.
159        kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::FORKCHOICE_TASK_LABEL);
160
161        let fcu_duration = fcu_time_start.elapsed();
162        debug!(
163            target: "engine",
164            fcu_duration = ?fcu_duration,
165            forkchoice = ?forkchoice,
166            response = ?valid_response,
167            "Forkchoice updated"
168        );
169
170        Ok(valid_response.payload_id)
171    }
172}