kona_engine/task_queue/tasks/synchronize/
task.rs

1//! A task for the `engine_forkchoiceUpdated` method, with no attributes.
2
3use crate::{
4    EngineClient, EngineState, EngineTaskExt, SynchronizeTaskError, state::EngineSyncStateUpdate,
5};
6use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadStatusEnum};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use op_alloy_provider::ext::engine::OpEngineApi;
10use std::sync::Arc;
11use tokio::time::Instant;
12
13/// Internal task for execution layer forkchoice synchronization.
14///
15/// The [`SynchronizeTask`] performs `engine_forkchoiceUpdated` calls to synchronize
16/// the execution layer's forkchoice state with the rollup node's view. This task
17/// operates without payload attributes and is primarily used internally by other
18/// engine tasks rather than being directly enqueued by users.
19///
20/// ## Usage Patterns
21///
22/// - **Internal Synchronization**: Called by [`InsertTask`], [`ConsolidateTask`], and
23///   [`FinalizeTask`]
24/// - **Engine Reset**: Used during engine resets to establish initial forkchoice state
25/// - **Safe Head Updates**: Synchronizes safe and finalized head changes
26///
27/// ## Automatic Integration
28///
29/// Unlike the legacy `ForkchoiceTask`, forkchoice updates during block building are now
30/// handled automatically within [`BuildTask`], eliminating the need for explicit
31/// forkchoice management in most user scenarios.
32///
33/// [`InsertTask`]: crate::InsertTask
34/// [`ConsolidateTask`]: crate::ConsolidateTask  
35/// [`FinalizeTask`]: crate::FinalizeTask
36/// [`BuildTask`]: crate::BuildTask
37#[derive(Debug, Clone)]
38pub struct SynchronizeTask {
39    /// The engine client.
40    pub client: Arc<EngineClient>,
41    /// The rollup config.
42    pub rollup: Arc<RollupConfig>,
43    /// The sync state update to apply to the engine state.
44    pub state_update: EngineSyncStateUpdate,
45}
46
47impl SynchronizeTask {
48    /// Creates a new [`SynchronizeTask`].
49    pub const fn new(
50        client: Arc<EngineClient>,
51        rollup: Arc<RollupConfig>,
52        state_update: EngineSyncStateUpdate,
53    ) -> Self {
54        Self { client, rollup, state_update }
55    }
56
57    /// Checks the response of the `engine_forkchoiceUpdated` call, and updates the sync status if
58    /// necessary.
59    fn check_forkchoice_updated_status(
60        &self,
61        state: &mut EngineState,
62        status: &PayloadStatusEnum,
63    ) -> Result<(), SynchronizeTaskError> {
64        match status {
65            PayloadStatusEnum::Valid => {
66                if !state.el_sync_finished {
67                    info!(
68                        target: "engine",
69                        "Finished execution layer sync."
70                    );
71                    state.el_sync_finished = true;
72                }
73
74                Ok(())
75            }
76            PayloadStatusEnum::Syncing => {
77                // If we're not building a new payload, we're driving EL sync.
78                debug!(target: "engine", "Attempting to update forkchoice state while EL syncing");
79                Ok(())
80            }
81            s => {
82                // Other codes are not expected.
83                Err(SynchronizeTaskError::UnexpectedPayloadStatus(s.clone()))
84            }
85        }
86    }
87}
88
89#[async_trait]
90impl EngineTaskExt for SynchronizeTask {
91    type Output = ();
92    type Error = SynchronizeTaskError;
93
94    async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, SynchronizeTaskError> {
95        // Apply the sync state update to the engine state.
96        let new_sync_state = state.sync_state.apply_update(self.state_update);
97
98        // Check if a forkchoice update is not needed, return early.
99        // A forkchoice update is not needed if...
100        // 1. The engine state is not default (initial forkchoice state has been emitted), and
101        // 2. The new sync state is the same as the current sync state (no changes to the sync
102        //    state).
103        //
104        // NOTE:
105        // We shouldn't retry the synchronize task there. Since the `sync_state` is only updated
106        // inside the `SynchronizeTask` (except inside the ConsolidateTask, when the block is not
107        // the last in the batch) - the engine will get stuck retrying the `SynchronizeTask`
108        if state.sync_state != Default::default() && state.sync_state == new_sync_state {
109            debug!(target: "engine", ?new_sync_state, "No forkchoice update needed");
110            return Ok(());
111        }
112
113        // Check if the head is behind the finalized head.
114        if new_sync_state.unsafe_head().block_info.number <
115            new_sync_state.finalized_head().block_info.number
116        {
117            return Err(SynchronizeTaskError::FinalizedAheadOfUnsafe(
118                new_sync_state.unsafe_head().block_info.number,
119                new_sync_state.finalized_head().block_info.number,
120            ));
121        }
122
123        let fcu_time_start = Instant::now();
124
125        // Send the forkchoice update through the input.
126        let forkchoice = new_sync_state.create_forkchoice_state();
127
128        // Handle the forkchoice update result.
129        // NOTE: it doesn't matter which version we use here, because we're not sending any
130        // payload attributes. The forkchoice updated call is version agnostic if no payload
131        // attributes are provided.
132        let response = self.client.fork_choice_updated_v3(forkchoice, None).await;
133
134        let valid_response = response.map_err(|e| {
135            // Fatal forkchoice update error.
136            e.as_error_resp()
137                .and_then(|e| {
138                    (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
139                        .then_some(SynchronizeTaskError::InvalidForkchoiceState)
140                })
141                .unwrap_or_else(|| SynchronizeTaskError::ForkchoiceUpdateFailed(e))
142        })?;
143
144        self.check_forkchoice_updated_status(state, &valid_response.payload_status.status)?;
145
146        // Apply the new sync state to the engine state.
147        state.sync_state = new_sync_state;
148
149        let fcu_duration = fcu_time_start.elapsed();
150        debug!(
151            target: "engine",
152            fcu_duration = ?fcu_duration,
153            forkchoice = ?forkchoice,
154            response = ?valid_response,
155            "Forkchoice updated"
156        );
157
158        Ok(())
159    }
160}