kona_engine/task_queue/tasks/
task.rs1use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertTask};
6use crate::{
7 BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, ForkchoiceTaskError,
8 InsertTaskError,
9};
10use async_trait::async_trait;
11use std::cmp::Ordering;
12use thiserror::Error;
13
14#[derive(Debug, PartialEq, Eq)]
18pub enum EngineTaskErrorSeverity {
19 Temporary,
21 Critical,
23 Reset,
25 Flush,
27}
28
29pub trait EngineTaskError {
34 fn severity(&self) -> EngineTaskErrorSeverity;
36}
37
38#[async_trait]
40pub trait EngineTaskExt {
41 type Output;
43
44 type Error: EngineTaskError;
46
47 async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, Self::Error>;
49}
50
51#[derive(Error, Debug)]
53pub enum EngineTaskErrors {
54 #[error(transparent)]
56 Forkchoice(#[from] ForkchoiceTaskError),
57 #[error(transparent)]
59 Insert(#[from] InsertTaskError),
60 #[error(transparent)]
62 Build(#[from] BuildTaskError),
63 #[error(transparent)]
65 Consolidate(#[from] ConsolidateTaskError),
66 #[error(transparent)]
68 Finalize(#[from] FinalizeTaskError),
69}
70
71impl EngineTaskError for EngineTaskErrors {
72 fn severity(&self) -> EngineTaskErrorSeverity {
73 match self {
74 Self::Forkchoice(inner) => inner.severity(),
75 Self::Insert(inner) => inner.severity(),
76 Self::Build(inner) => inner.severity(),
77 Self::Consolidate(inner) => inner.severity(),
78 Self::Finalize(inner) => inner.severity(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
87pub enum EngineTask {
88 ForkchoiceUpdate(ForkchoiceTask),
91 Insert(InsertTask),
93 Build(BuildTask),
95 Consolidate(ConsolidateTask),
98 Finalize(FinalizeTask),
100}
101
102impl EngineTask {
103 async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> {
105 match self.clone() {
106 Self::ForkchoiceUpdate(task) => task.execute(state).await.map(|_| ())?,
107 Self::Insert(task) => task.execute(state).await?,
108 Self::Build(task) => task.execute(state).await?,
109 Self::Consolidate(task) => task.execute(state).await?,
110 Self::Finalize(task) => task.execute(state).await?,
111 };
112
113 Ok(())
114 }
115}
116
117impl PartialEq for EngineTask {
118 fn eq(&self, other: &Self) -> bool {
119 matches!(
120 (self, other),
121 (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) |
122 (Self::Insert(_), Self::Insert(_)) |
123 (Self::Build(_), Self::Build(_)) |
124 (Self::Consolidate(_), Self::Consolidate(_)) |
125 (Self::Finalize(_), Self::Finalize(_))
126 )
127 }
128}
129
130impl Eq for EngineTask {}
131
132impl PartialOrd for EngineTask {
133 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
134 Some(self.cmp(other))
135 }
136}
137
138impl Ord for EngineTask {
139 fn cmp(&self, other: &Self) -> Ordering {
140 match (self, other) {
152 (Self::Insert(_), Self::Insert(_)) => Ordering::Equal,
154 (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal,
155 (Self::Build(_), Self::Build(_)) => Ordering::Equal,
156 (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal,
157 (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal,
158
159 (Self::ForkchoiceUpdate(_), _) => Ordering::Greater,
161 (_, Self::ForkchoiceUpdate(_)) => Ordering::Less,
162
163 (Self::Build(_), _) => Ordering::Greater,
165 (_, Self::Build(_)) => Ordering::Less,
166
167 (Self::Insert(_), _) => Ordering::Greater,
169 (_, Self::Insert(_)) => Ordering::Less,
170
171 (Self::Consolidate(_), _) => Ordering::Greater,
173 (_, Self::Consolidate(_)) => Ordering::Less,
174 }
175 }
176}
177
178#[async_trait]
179impl EngineTaskExt for EngineTask {
180 type Output = ();
181
182 type Error = EngineTaskErrors;
183
184 async fn execute(&self, state: &mut EngineState) -> Result<(), Self::Error> {
185 while let Err(e) = self.execute_inner(state).await {
187 match e.severity() {
188 EngineTaskErrorSeverity::Temporary => {
189 trace!(target: "engine", "{e}");
190 continue;
191 }
192 EngineTaskErrorSeverity::Critical => {
193 error!(target: "engine", "{e}");
194 return Err(e);
195 }
196 EngineTaskErrorSeverity::Reset => {
197 warn!(target: "engine", "Engine requested derivation reset");
198 return Err(e);
199 }
200 EngineTaskErrorSeverity::Flush => {
201 warn!(target: "engine", "Engine requested derivation flush");
202 return Err(e);
203 }
204 }
205 }
206
207 Ok(())
208 }
209}