kona_engine/task_queue/tasks/
task.rs

1//! Tasks sent to the [`Engine`] for execution.
2//!
3//! [`Engine`]: crate::Engine
4
5use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertTask};
6use crate::{
7    BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, ForkchoiceTaskError,
8    InsertTaskError,
9};
10use async_trait::async_trait;
11use std::cmp::Ordering;
12use thiserror::Error;
13
14/// The severity of an engine task error.
15///
16/// This is used to determine how to handle the error when draining the engine task queue.
17#[derive(Debug, PartialEq, Eq)]
18pub enum EngineTaskErrorSeverity {
19    /// The error is temporary and the task is retried.
20    Temporary,
21    /// The error is critical and is propagated to the engine actor.
22    Critical,
23    /// The error indicates that the engine should be reset.
24    Reset,
25    /// The error indicates that the engine should be flushed.
26    Flush,
27}
28
29/// The interface for an engine task error.
30///
31/// An engine task error should have an associated severity level to specify how to handle the error
32/// when draining the engine task queue.
33pub trait EngineTaskError {
34    /// The severity of the error.
35    fn severity(&self) -> EngineTaskErrorSeverity;
36}
37
38/// The interface for an engine task.
39#[async_trait]
40pub trait EngineTaskExt {
41    /// The output type of the task.
42    type Output;
43
44    /// The error type of the task.
45    type Error: EngineTaskError;
46
47    /// Executes the task, taking a shared lock on the engine state and `self`.
48    async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, Self::Error>;
49}
50
51/// An error that may occur during an [`EngineTask`]'s execution.
52#[derive(Error, Debug)]
53pub enum EngineTaskErrors {
54    /// An error that occurred while updating the forkchoice state.
55    #[error(transparent)]
56    Forkchoice(#[from] ForkchoiceTaskError),
57    /// An error that occurred while inserting a block into the engine.
58    #[error(transparent)]
59    Insert(#[from] InsertTaskError),
60    /// An error that occurred while building a block.
61    #[error(transparent)]
62    Build(#[from] BuildTaskError),
63    /// An error that occurred while consolidating the engine state.
64    #[error(transparent)]
65    Consolidate(#[from] ConsolidateTaskError),
66    /// An error that occurred while finalizing an L2 block.
67    #[error(transparent)]
68    Finalize(#[from] FinalizeTaskError),
69}
70
71impl EngineTaskError for EngineTaskErrors {
72    fn severity(&self) -> EngineTaskErrorSeverity {
73        match self {
74            Self::Forkchoice(inner) => inner.severity(),
75            Self::Insert(inner) => inner.severity(),
76            Self::Build(inner) => inner.severity(),
77            Self::Consolidate(inner) => inner.severity(),
78            Self::Finalize(inner) => inner.severity(),
79        }
80    }
81}
82
83/// Tasks that may be inserted into and executed by the [`Engine`].
84///
85/// [`Engine`]: crate::Engine
86#[derive(Debug, Clone)]
87pub enum EngineTask {
88    /// Perform a `engine_forkchoiceUpdated` call with the current [`EngineState`]'s forkchoice,
89    /// and no payload attributes.
90    ForkchoiceUpdate(ForkchoiceTask),
91    /// Inserts a payload into the execution engine.
92    Insert(InsertTask),
93    /// Builds a new block with the given attributes, and inserts it into the execution engine.
94    Build(BuildTask),
95    /// Performs consolidation on the engine state, reverting to payload attribute processing
96    /// via the [`BuildTask`] if consolidation fails.
97    Consolidate(ConsolidateTask),
98    /// Finalizes an L2 block
99    Finalize(FinalizeTask),
100}
101
102impl EngineTask {
103    /// Executes the task without consuming it.
104    async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> {
105        match self.clone() {
106            Self::ForkchoiceUpdate(task) => task.execute(state).await.map(|_| ())?,
107            Self::Insert(task) => task.execute(state).await?,
108            Self::Build(task) => task.execute(state).await?,
109            Self::Consolidate(task) => task.execute(state).await?,
110            Self::Finalize(task) => task.execute(state).await?,
111        };
112
113        Ok(())
114    }
115}
116
117impl PartialEq for EngineTask {
118    fn eq(&self, other: &Self) -> bool {
119        matches!(
120            (self, other),
121            (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) |
122                (Self::Insert(_), Self::Insert(_)) |
123                (Self::Build(_), Self::Build(_)) |
124                (Self::Consolidate(_), Self::Consolidate(_)) |
125                (Self::Finalize(_), Self::Finalize(_))
126        )
127    }
128}
129
130impl Eq for EngineTask {}
131
132impl PartialOrd for EngineTask {
133    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
134        Some(self.cmp(other))
135    }
136}
137
138impl Ord for EngineTask {
139    fn cmp(&self, other: &Self) -> Ordering {
140        // Order (descending): ForkchoiceUpdate -> BuildBlock -> InsertUnsafe -> Consolidate
141        //
142        // https://specs.optimism.io/protocol/derivation.html#forkchoice-synchronization
143        //
144        // - Outstanding FCUs are processed before anything else.
145        // - Block building jobs are prioritized above InsertUnsafe and Consolidate tasks, to give
146        //   priority to the sequencer.
147        // - InsertUnsafe tasks are prioritized over Consolidate tasks, to ensure that unsafe block
148        //   gossip is imported promptly.
149        // - Consolidate tasks are the lowest priority, as they are only used for advancing the safe
150        //   chain via derivation.
151        match (self, other) {
152            // Same variant cases
153            (Self::Insert(_), Self::Insert(_)) => Ordering::Equal,
154            (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal,
155            (Self::Build(_), Self::Build(_)) => Ordering::Equal,
156            (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal,
157            (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal,
158
159            // Individual ForkchoiceUpdate tasks are the highest priority
160            (Self::ForkchoiceUpdate(_), _) => Ordering::Greater,
161            (_, Self::ForkchoiceUpdate(_)) => Ordering::Less,
162
163            // BuildBlock tasks are prioritized over InsertUnsafe and Consolidate tasks
164            (Self::Build(_), _) => Ordering::Greater,
165            (_, Self::Build(_)) => Ordering::Less,
166
167            // InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks
168            (Self::Insert(_), _) => Ordering::Greater,
169            (_, Self::Insert(_)) => Ordering::Less,
170
171            // Consolidate tasks are prioritized over Finalize tasks
172            (Self::Consolidate(_), _) => Ordering::Greater,
173            (_, Self::Consolidate(_)) => Ordering::Less,
174        }
175    }
176}
177
178#[async_trait]
179impl EngineTaskExt for EngineTask {
180    type Output = ();
181
182    type Error = EngineTaskErrors;
183
184    async fn execute(&self, state: &mut EngineState) -> Result<(), Self::Error> {
185        // Retry the task until it succeeds or a critical error occurs.
186        while let Err(e) = self.execute_inner(state).await {
187            match e.severity() {
188                EngineTaskErrorSeverity::Temporary => {
189                    trace!(target: "engine", "{e}");
190                    continue;
191                }
192                EngineTaskErrorSeverity::Critical => {
193                    error!(target: "engine", "{e}");
194                    return Err(e);
195                }
196                EngineTaskErrorSeverity::Reset => {
197                    warn!(target: "engine", "Engine requested derivation reset");
198                    return Err(e);
199                }
200                EngineTaskErrorSeverity::Flush => {
201                    warn!(target: "engine", "Engine requested derivation flush");
202                    return Err(e);
203                }
204            }
205        }
206
207        Ok(())
208    }
209}