kona_engine/task_queue/tasks/
task.rs1use super::{BuildTask, ConsolidateTask, FinalizeTask, InsertTask};
6use crate::{
7 BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, InsertTaskError,
8};
9use async_trait::async_trait;
10use derive_more::Display;
11use std::cmp::Ordering;
12use thiserror::Error;
13
14#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
18pub enum EngineTaskErrorSeverity {
19 #[display("temporary")]
21 Temporary,
22 #[display("critical")]
24 Critical,
25 #[display("reset")]
27 Reset,
28 #[display("flush")]
30 Flush,
31}
32
33pub trait EngineTaskError {
38 fn severity(&self) -> EngineTaskErrorSeverity;
40}
41
42#[async_trait]
44pub trait EngineTaskExt {
45 type Output;
47
48 type Error: EngineTaskError;
50
51 async fn execute(&self, state: &mut EngineState) -> Result<Self::Output, Self::Error>;
53}
54
55#[derive(Error, Debug)]
57pub enum EngineTaskErrors {
58 #[error(transparent)]
60 Insert(#[from] InsertTaskError),
61 #[error(transparent)]
63 Build(#[from] BuildTaskError),
64 #[error(transparent)]
66 Consolidate(#[from] ConsolidateTaskError),
67 #[error(transparent)]
69 Finalize(#[from] FinalizeTaskError),
70}
71
72impl EngineTaskError for EngineTaskErrors {
73 fn severity(&self) -> EngineTaskErrorSeverity {
74 match self {
75 Self::Insert(inner) => inner.severity(),
76 Self::Build(inner) => inner.severity(),
77 Self::Consolidate(inner) => inner.severity(),
78 Self::Finalize(inner) => inner.severity(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
87pub enum EngineTask {
88 Insert(InsertTask),
90 Build(BuildTask),
92 Consolidate(ConsolidateTask),
95 Finalize(FinalizeTask),
97}
98
99impl EngineTask {
100 async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> {
102 match self.clone() {
103 Self::Insert(task) => task.execute(state).await?,
104 Self::Build(task) => task.execute(state).await?,
105 Self::Consolidate(task) => task.execute(state).await?,
106 Self::Finalize(task) => task.execute(state).await?,
107 };
108
109 Ok(())
110 }
111
112 const fn task_metrics_label(&self) -> &'static str {
113 match self {
114 Self::Insert(_) => crate::Metrics::INSERT_TASK_LABEL,
115 Self::Consolidate(_) => crate::Metrics::CONSOLIDATE_TASK_LABEL,
116 Self::Build(_) => crate::Metrics::BUILD_TASK_LABEL,
117 Self::Finalize(_) => crate::Metrics::FINALIZE_TASK_LABEL,
118 }
119 }
120}
121
122impl PartialEq for EngineTask {
123 fn eq(&self, other: &Self) -> bool {
124 matches!(
125 (self, other),
126 (Self::Insert(_), Self::Insert(_)) |
127 (Self::Build(_), Self::Build(_)) |
128 (Self::Consolidate(_), Self::Consolidate(_)) |
129 (Self::Finalize(_), Self::Finalize(_))
130 )
131 }
132}
133
134impl Eq for EngineTask {}
135
136impl PartialOrd for EngineTask {
137 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
138 Some(self.cmp(other))
139 }
140}
141
142impl Ord for EngineTask {
143 fn cmp(&self, other: &Self) -> Ordering {
144 match (self, other) {
156 (Self::Insert(_), Self::Insert(_)) => Ordering::Equal,
158 (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal,
159 (Self::Build(_), Self::Build(_)) => Ordering::Equal,
160 (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal,
161
162 (Self::Build(_), _) => Ordering::Greater,
164 (_, Self::Build(_)) => Ordering::Less,
165
166 (Self::Insert(_), _) => Ordering::Greater,
168 (_, Self::Insert(_)) => Ordering::Less,
169
170 (Self::Consolidate(_), _) => Ordering::Greater,
172 (_, Self::Consolidate(_)) => Ordering::Less,
173 }
174 }
175}
176
177#[async_trait]
178impl EngineTaskExt for EngineTask {
179 type Output = ();
180
181 type Error = EngineTaskErrors;
182
183 async fn execute(&self, state: &mut EngineState) -> Result<(), Self::Error> {
184 while let Err(e) = self.execute_inner(state).await {
186 let severity = e.severity();
187
188 kona_macros::inc!(
189 counter,
190 crate::Metrics::ENGINE_TASK_FAILURE,
191 self.task_metrics_label() => severity.to_string()
192 );
193
194 match severity {
195 EngineTaskErrorSeverity::Temporary => {
196 trace!(target: "engine", "{e}");
197 continue;
198 }
199 EngineTaskErrorSeverity::Critical => {
200 error!(target: "engine", "{e}");
201 return Err(e);
202 }
203 EngineTaskErrorSeverity::Reset => {
204 warn!(target: "engine", "Engine requested derivation reset");
205 return Err(e);
206 }
207 EngineTaskErrorSeverity::Flush => {
208 warn!(target: "engine", "Engine requested derivation flush");
209 return Err(e);
210 }
211 }
212 }
213
214 kona_macros::inc!(counter, crate::Metrics::ENGINE_TASK_SUCCESS, self.task_metrics_label());
215
216 Ok(())
217 }
218}