reifydb_sub_worker/
task.rs1use std::{
5 cmp::Ordering,
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use reifydb_core::Result;
11use reifydb_engine::StandardEngine;
12use reifydb_sub_api::{Priority, TaskHandle};
13
14use crate::tracker::CancellationToken;
15
16pub struct InternalTaskContext {
18 pub cancel_token: Option<CancellationToken>,
20 pub engine: StandardEngine,
22}
23
24impl InternalTaskContext {
25 pub fn is_cancelled(&self) -> bool {
27 self.cancel_token.as_ref().map(|t| t.is_cancelled()).unwrap_or(false)
28 }
29}
30
31pub trait PoolTask: Send + Sync {
33 fn execute(&self, ctx: &InternalTaskContext) -> Result<()>;
35
36 fn priority(&self) -> Priority {
38 Priority::Normal
39 }
40
41 fn name(&self) -> &str {
43 "unnamed_task"
44 }
45
46 fn can_retry(&self) -> bool {
48 false
49 }
50
51 fn max_retries(&self) -> usize {
53 3
54 }
55}
56
57pub(crate) struct ScheduledTask {
59 pub handle: TaskHandle,
60 pub task: Arc<dyn PoolTask>,
61 pub next_run: Instant,
62 pub interval: Option<Duration>,
63 pub priority: Priority,
64}
65
66impl ScheduledTask {
67 pub fn new(
68 handle: TaskHandle,
69 task: Box<dyn PoolTask>,
70 next_run: Instant,
71 interval: Option<Duration>,
72 priority: Priority,
73 ) -> Self {
74 Self {
75 handle,
76 task: Arc::from(task),
77 next_run,
78 interval,
79 priority,
80 }
81 }
82}
83
84pub struct PeriodicTask {
86 inner: Arc<dyn PoolTask>,
87 interval: Duration,
88 priority: Priority,
89}
90
91impl PeriodicTask {
92 pub fn new(task: Arc<dyn PoolTask>, interval: Duration, priority: Priority) -> Self {
93 Self {
94 inner: task,
95 interval,
96 priority,
97 }
98 }
99}
100
101impl PoolTask for PeriodicTask {
102 fn execute(&self, ctx: &InternalTaskContext) -> Result<()> {
103 self.inner.execute(ctx)
104 }
105
106 fn priority(&self) -> Priority {
107 self.priority
108 }
109
110 fn name(&self) -> &str {
111 self.inner.name()
112 }
113}
114
115pub struct PrioritizedTask {
117 pub task: Box<dyn PoolTask>,
118 pub priority: Priority,
119 pub submitted_at: Instant,
120}
121
122impl PrioritizedTask {
123 pub fn new(task: Box<dyn PoolTask>) -> Self {
124 let priority = task.priority();
125 Self {
126 task,
127 priority,
128 submitted_at: Instant::now(),
129 }
130 }
131}
132
133impl PartialEq for PrioritizedTask {
134 fn eq(&self, other: &Self) -> bool {
135 self.priority == other.priority && self.submitted_at == other.submitted_at
136 }
137}
138
139impl Eq for PrioritizedTask {}
140
141impl PartialOrd for PrioritizedTask {
142 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143 Some(self.cmp(other))
144 }
145}
146
147impl Ord for PrioritizedTask {
148 fn cmp(&self, other: &Self) -> Ordering {
149 match self.priority.cmp(&other.priority) {
151 Ordering::Equal => {
152 other.submitted_at.cmp(&self.submitted_at)
156 }
157 other => other,
158 }
159 }
160}
161
162pub struct InternalClosureTask<F>
164where
165 F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
166{
167 name: String,
168 priority: Priority,
169 closure: F,
170}
171
172impl<F> InternalClosureTask<F>
173where
174 F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
175{
176 pub fn new(name: impl Into<String>, priority: Priority, closure: F) -> Self {
177 Self {
178 name: name.into(),
179 priority,
180 closure,
181 }
182 }
183}
184
185impl<F> PoolTask for InternalClosureTask<F>
186where
187 F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
188{
189 fn execute(&self, ctx: &InternalTaskContext) -> Result<()> {
190 (self.closure)(ctx)
191 }
192
193 fn priority(&self) -> Priority {
194 self.priority
195 }
196
197 fn name(&self) -> &str {
198 &self.name
199 }
200}