kona_engine/task_queue/tasks/
task.rs

1//! Tasks sent to the [`Engine`] for execution.
2//!
3//! [`Engine`]: crate::Engine
4
5use super::{BuildTask, ConsolidateTask, FinalizeTask, InsertTask};
6use crate::{
7    BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, InsertTaskError,
8};
9use async_trait::async_trait;
10use derive_more::Display;
11use std::cmp::Ordering;
12use thiserror::Error;
13
14/// The severity of an engine task error.
15///
16/// This is used to determine how to handle the error when draining the engine task queue.
17#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
18pub enum EngineTaskErrorSeverity {
19    /// The error is temporary and the task is retried.
20    #[display("temporary")]
21    Temporary,
22    /// The error is critical and is propagated to the engine actor.
23    #[display("critical")]
24    Critical,
25    /// The error indicates that the engine should be reset.
26    #[display("reset")]
27    Reset,
28    /// The error indicates that the engine should be flushed.
29    #[display("flush")]
30    Flush,
31}
32
33/// The interface for an engine task error.
34///
35/// An engine task error should have an associated severity level to specify how to handle the error
36/// when draining the engine task queue.
37pub trait EngineTaskError {
38    /// The severity of the error.
39    fn severity(&self) -> EngineTaskErrorSeverity;
40}
41
42/// The interface for an engine task.
43#[async_trait]
44pub trait EngineTaskExt {
45    /// The output type of the task.
46    type Output;
47
48    /// The error type of the task.
49    type Error: EngineTaskError;
50
51    /// Executes the task, taking a shared lock on the engine state and `self`.
52    async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, Self::Error>;
53}
54
55/// An error that may occur during an [`EngineTask`]'s execution.
56#[derive(Error, Debug)]
57pub enum EngineTaskErrors {
58    /// An error that occurred while inserting a block into the engine.
59    #[error(transparent)]
60    Insert(#[from] InsertTaskError),
61    /// An error that occurred while building a block.
62    #[error(transparent)]
63    Build(#[from] BuildTaskError),
64    /// An error that occurred while consolidating the engine state.
65    #[error(transparent)]
66    Consolidate(#[from] ConsolidateTaskError),
67    /// An error that occurred while finalizing an L2 block.
68    #[error(transparent)]
69    Finalize(#[from] FinalizeTaskError),
70}
71
72impl EngineTaskError for EngineTaskErrors {
73    fn severity(&self) -> EngineTaskErrorSeverity {
74        match self {
75            Self::Insert(inner) => inner.severity(),
76            Self::Build(inner) => inner.severity(),
77            Self::Consolidate(inner) => inner.severity(),
78            Self::Finalize(inner) => inner.severity(),
79        }
80    }
81}
82
83/// Tasks that may be inserted into and executed by the [`Engine`].
84///
85/// [`Engine`]: crate::Engine
86#[derive(Debug, Clone)]
87pub enum EngineTask {
88    /// Inserts a payload into the execution engine.
89    Insert(InsertTask),
90    /// Builds a new block with the given attributes, and inserts it into the execution engine.
91    Build(BuildTask),
92    /// Performs consolidation on the engine state, reverting to payload attribute processing
93    /// via the [`BuildTask`] if consolidation fails.
94    Consolidate(ConsolidateTask),
95    /// Finalizes an L2 block
96    Finalize(FinalizeTask),
97}
98
99impl EngineTask {
100    /// Executes the task without consuming it.
101    async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> {
102        match self.clone() {
103            Self::Insert(task) => task.execute(state).await?,
104            Self::Build(task) => task.execute(state).await?,
105            Self::Consolidate(task) => task.execute(state).await?,
106            Self::Finalize(task) => task.execute(state).await?,
107        };
108
109        Ok(())
110    }
111
112    const fn task_metrics_label(&self) -> &'static str {
113        match self {
114            Self::Insert(_) => crate::Metrics::INSERT_TASK_LABEL,
115            Self::Consolidate(_) => crate::Metrics::CONSOLIDATE_TASK_LABEL,
116            Self::Build(_) => crate::Metrics::BUILD_TASK_LABEL,
117            Self::Finalize(_) => crate::Metrics::FINALIZE_TASK_LABEL,
118        }
119    }
120}
121
122impl PartialEq for EngineTask {
123    fn eq(&self, other: &Self) -> bool {
124        matches!(
125            (self, other),
126            (Self::Insert(_), Self::Insert(_)) |
127                (Self::Build(_), Self::Build(_)) |
128                (Self::Consolidate(_), Self::Consolidate(_)) |
129                (Self::Finalize(_), Self::Finalize(_))
130        )
131    }
132}
133
134impl Eq for EngineTask {}
135
136impl PartialOrd for EngineTask {
137    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
138        Some(self.cmp(other))
139    }
140}
141
142impl Ord for EngineTask {
143    fn cmp(&self, other: &Self) -> Ordering {
144        // Order (descending): BuildBlock -> InsertUnsafe -> Consolidate -> Finalize
145        //
146        // https://specs.optimism.io/protocol/derivation.html#forkchoice-synchronization
147        //
148        // - Block building jobs are prioritized above all other tasks, to give priority to the
149        //   sequencer. BuildTask handles forkchoice updates automatically.
150        // - InsertUnsafe tasks are prioritized over Consolidate tasks, to ensure that unsafe block
151        //   gossip is imported promptly.
152        // - Consolidate tasks are prioritized over Finalize tasks, as they advance the safe chain
153        //   via derivation.
154        // - Finalize tasks have the lowest priority, as they only update finalized status.
155        match (self, other) {
156            // Same variant cases
157            (Self::Insert(_), Self::Insert(_)) => Ordering::Equal,
158            (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal,
159            (Self::Build(_), Self::Build(_)) => Ordering::Equal,
160            (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal,
161
162            // BuildBlock tasks are prioritized over InsertUnsafe and Consolidate tasks
163            (Self::Build(_), _) => Ordering::Greater,
164            (_, Self::Build(_)) => Ordering::Less,
165
166            // InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks
167            (Self::Insert(_), _) => Ordering::Greater,
168            (_, Self::Insert(_)) => Ordering::Less,
169
170            // Consolidate tasks are prioritized over Finalize tasks
171            (Self::Consolidate(_), _) => Ordering::Greater,
172            (_, Self::Consolidate(_)) => Ordering::Less,
173        }
174    }
175}
176
177#[async_trait]
178impl EngineTaskExt for EngineTask {
179    type Output = ();
180
181    type Error = EngineTaskErrors;
182
183    async fn execute(&self, state: &mut EngineState) -> Result<(), Self::Error> {
184        // Retry the task until it succeeds or a critical error occurs.
185        while let Err(e) = self.execute_inner(state).await {
186            let severity = e.severity();
187
188            kona_macros::inc!(
189                counter,
190                crate::Metrics::ENGINE_TASK_FAILURE,
191                self.task_metrics_label() => severity.to_string()
192            );
193
194            match severity {
195                EngineTaskErrorSeverity::Temporary => {
196                    trace!(target: "engine", "{e}");
197                    continue;
198                }
199                EngineTaskErrorSeverity::Critical => {
200                    error!(target: "engine", "{e}");
201                    return Err(e);
202                }
203                EngineTaskErrorSeverity::Reset => {
204                    warn!(target: "engine", "Engine requested derivation reset");
205                    return Err(e);
206                }
207                EngineTaskErrorSeverity::Flush => {
208                    warn!(target: "engine", "Engine requested derivation flush");
209                    return Err(e);
210                }
211            }
212        }
213
214        kona_macros::inc!(counter, crate::Metrics::ENGINE_TASK_SUCCESS, self.task_metrics_label());
215
216        Ok(())
217    }
218}