kona_engine/task_queue/tasks/synchronize/task.rs
1//! A task for the `engine_forkchoiceUpdated` method, with no attributes.
2
3use crate::{
4 EngineClient, EngineState, EngineTaskExt, SynchronizeTaskError, state::EngineSyncStateUpdate,
5};
6use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadStatusEnum};
7use async_trait::async_trait;
8use kona_genesis::RollupConfig;
9use op_alloy_provider::ext::engine::OpEngineApi;
10use std::sync::Arc;
11use tokio::time::Instant;
12
13/// Internal task for execution layer forkchoice synchronization.
14///
15/// The [`SynchronizeTask`] performs `engine_forkchoiceUpdated` calls to synchronize
16/// the execution layer's forkchoice state with the rollup node's view. This task
17/// operates without payload attributes and is primarily used internally by other
18/// engine tasks rather than being directly enqueued by users.
19///
20/// ## Usage Patterns
21///
22/// - **Internal Synchronization**: Called by [`InsertTask`], [`ConsolidateTask`], and
23/// [`FinalizeTask`]
24/// - **Engine Reset**: Used during engine resets to establish initial forkchoice state
25/// - **Safe Head Updates**: Synchronizes safe and finalized head changes
26///
27/// ## Automatic Integration
28///
29/// Unlike the legacy `ForkchoiceTask`, forkchoice updates during block building are now
30/// handled automatically within [`BuildTask`], eliminating the need for explicit
31/// forkchoice management in most user scenarios.
32///
33/// [`InsertTask`]: crate::InsertTask
34/// [`ConsolidateTask`]: crate::ConsolidateTask
35/// [`FinalizeTask`]: crate::FinalizeTask
36/// [`BuildTask`]: crate::BuildTask
37#[derive(Debug, Clone)]
38pub struct SynchronizeTask {
39 /// The engine client.
40 pub client: Arc<EngineClient>,
41 /// The rollup config.
42 pub rollup: Arc<RollupConfig>,
43 /// The sync state update to apply to the engine state.
44 pub state_update: EngineSyncStateUpdate,
45}
46
47impl SynchronizeTask {
48 /// Creates a new [`SynchronizeTask`].
49 pub const fn new(
50 client: Arc<EngineClient>,
51 rollup: Arc<RollupConfig>,
52 state_update: EngineSyncStateUpdate,
53 ) -> Self {
54 Self { client, rollup, state_update }
55 }
56
57 /// Checks the response of the `engine_forkchoiceUpdated` call, and updates the sync status if
58 /// necessary.
59 fn check_forkchoice_updated_status(
60 &self,
61 state: &mut EngineState,
62 status: &PayloadStatusEnum,
63 ) -> Result<(), SynchronizeTaskError> {
64 match status {
65 PayloadStatusEnum::Valid => {
66 if !state.el_sync_finished {
67 info!(
68 target: "engine",
69 "Finished execution layer sync."
70 );
71 state.el_sync_finished = true;
72 }
73
74 Ok(())
75 }
76 PayloadStatusEnum::Syncing => {
77 // If we're not building a new payload, we're driving EL sync.
78 debug!(target: "engine", "Attempting to update forkchoice state while EL syncing");
79 Ok(())
80 }
81 s => {
82 // Other codes are not expected.
83 Err(SynchronizeTaskError::UnexpectedPayloadStatus(s.clone()))
84 }
85 }
86 }
87}
88
89#[async_trait]
90impl EngineTaskExt for SynchronizeTask {
91 type Output = ();
92 type Error = SynchronizeTaskError;
93
94 async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, SynchronizeTaskError> {
95 // Apply the sync state update to the engine state.
96 let new_sync_state = state.sync_state.apply_update(self.state_update);
97
98 // Check if a forkchoice update is not needed, return early.
99 // A forkchoice update is not needed if...
100 // 1. The engine state is not default (initial forkchoice state has been emitted), and
101 // 2. The new sync state is the same as the current sync state (no changes to the sync
102 // state).
103 //
104 // NOTE:
105 // We shouldn't retry the synchronize task there. Since the `sync_state` is only updated
106 // inside the `SynchronizeTask` (except inside the ConsolidateTask, when the block is not
107 // the last in the batch) - the engine will get stuck retrying the `SynchronizeTask`
108 if state.sync_state != Default::default() && state.sync_state == new_sync_state {
109 debug!(target: "engine", ?new_sync_state, "No forkchoice update needed");
110 return Ok(());
111 }
112
113 // Check if the head is behind the finalized head.
114 if new_sync_state.unsafe_head().block_info.number <
115 new_sync_state.finalized_head().block_info.number
116 {
117 return Err(SynchronizeTaskError::FinalizedAheadOfUnsafe(
118 new_sync_state.unsafe_head().block_info.number,
119 new_sync_state.finalized_head().block_info.number,
120 ));
121 }
122
123 let fcu_time_start = Instant::now();
124
125 // Send the forkchoice update through the input.
126 let forkchoice = new_sync_state.create_forkchoice_state();
127
128 // Handle the forkchoice update result.
129 // NOTE: it doesn't matter which version we use here, because we're not sending any
130 // payload attributes. The forkchoice updated call is version agnostic if no payload
131 // attributes are provided.
132 let response = self.client.fork_choice_updated_v3(forkchoice, None).await;
133
134 let valid_response = response.map_err(|e| {
135 // Fatal forkchoice update error.
136 e.as_error_resp()
137 .and_then(|e| {
138 (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
139 .then_some(SynchronizeTaskError::InvalidForkchoiceState)
140 })
141 .unwrap_or_else(|| SynchronizeTaskError::ForkchoiceUpdateFailed(e))
142 })?;
143
144 self.check_forkchoice_updated_status(state, &valid_response.payload_status.status)?;
145
146 // Apply the new sync state to the engine state.
147 state.sync_state = new_sync_state;
148
149 let fcu_duration = fcu_time_start.elapsed();
150 debug!(
151 target: "engine",
152 fcu_duration = ?fcu_duration,
153 forkchoice = ?forkchoice,
154 response = ?valid_response,
155 "Forkchoice updated"
156 );
157
158 Ok(())
159 }
160}