reifydb_sub_worker/
task.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	cmp::Ordering,
6	sync::Arc,
7	time::{Duration, Instant},
8};
9
10use reifydb_core::Result;
11use reifydb_engine::StandardEngine;
12use reifydb_sub_api::{Priority, TaskHandle};
13
14use crate::tracker::CancellationToken;
15
16/// Internal context provided to tasks during execution
17pub struct InternalTaskContext {
18	/// Cancellation token for this task
19	pub cancel_token: Option<CancellationToken>,
20	/// Engine for database access
21	pub engine: StandardEngine,
22}
23
24impl InternalTaskContext {
25	/// Check if task has been cancelled
26	pub fn is_cancelled(&self) -> bool {
27		self.cancel_token.as_ref().map(|t| t.is_cancelled()).unwrap_or(false)
28	}
29}
30
31/// Trait for tasks that can be executed by the worker pool
32pub trait PoolTask: Send + Sync {
33	/// Execute the task
34	fn execute(&self, ctx: &InternalTaskContext) -> Result<()>;
35
36	/// Get the priority of this task
37	fn priority(&self) -> Priority {
38		Priority::Normal
39	}
40
41	/// Get a name/description for this task for debugging
42	fn name(&self) -> &str {
43		"unnamed_task"
44	}
45
46	/// Whether this task can be retried on failure
47	fn can_retry(&self) -> bool {
48		false
49	}
50
51	/// Maximum number of retries if can_retry is true
52	fn max_retries(&self) -> usize {
53		3
54	}
55}
56
57/// Internal representation of a scheduled task
58pub(crate) struct ScheduledTask {
59	pub handle: TaskHandle,
60	pub task: Arc<dyn PoolTask>,
61	pub next_run: Instant,
62	pub interval: Option<Duration>,
63	pub priority: Priority,
64}
65
66impl ScheduledTask {
67	pub fn new(
68		handle: TaskHandle,
69		task: Box<dyn PoolTask>,
70		next_run: Instant,
71		interval: Option<Duration>,
72		priority: Priority,
73	) -> Self {
74		Self {
75			handle,
76			task: Arc::from(task),
77			next_run,
78			interval,
79			priority,
80		}
81	}
82}
83
84/// Wrapper for periodic tasks
85pub struct PeriodicTask {
86	inner: Arc<dyn PoolTask>,
87	interval: Duration,
88	priority: Priority,
89}
90
91impl PeriodicTask {
92	pub fn new(task: Arc<dyn PoolTask>, interval: Duration, priority: Priority) -> Self {
93		Self {
94			inner: task,
95			interval,
96			priority,
97		}
98	}
99}
100
101impl PoolTask for PeriodicTask {
102	fn execute(&self, ctx: &InternalTaskContext) -> Result<()> {
103		self.inner.execute(ctx)
104	}
105
106	fn priority(&self) -> Priority {
107		self.priority
108	}
109
110	fn name(&self) -> &str {
111		self.inner.name()
112	}
113}
114
115/// A prioritized task wrapper for the queue
116pub struct PrioritizedTask {
117	pub task: Box<dyn PoolTask>,
118	pub priority: Priority,
119	pub submitted_at: Instant,
120}
121
122impl PrioritizedTask {
123	pub fn new(task: Box<dyn PoolTask>) -> Self {
124		let priority = task.priority();
125		Self {
126			task,
127			priority,
128			submitted_at: Instant::now(),
129		}
130	}
131}
132
133impl PartialEq for PrioritizedTask {
134	fn eq(&self, other: &Self) -> bool {
135		self.priority == other.priority && self.submitted_at == other.submitted_at
136	}
137}
138
139impl Eq for PrioritizedTask {}
140
141impl PartialOrd for PrioritizedTask {
142	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143		Some(self.cmp(other))
144	}
145}
146
147impl Ord for PrioritizedTask {
148	fn cmp(&self, other: &Self) -> Ordering {
149		// Higher priority first (High=2 should be greater than Low=0)
150		match self.priority.cmp(&other.priority) {
151			Ordering::Equal => {
152				// Earlier submitted first (FIFO within same
153				// priority) Reverse this so earlier tasks
154				// are "greater" and pop first
155				other.submitted_at.cmp(&self.submitted_at)
156			}
157			other => other,
158		}
159	}
160}
161
162/// Simple closure-based implementation for PoolTask (internal use)
163pub struct InternalClosureTask<F>
164where
165	F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
166{
167	name: String,
168	priority: Priority,
169	closure: F,
170}
171
172impl<F> InternalClosureTask<F>
173where
174	F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
175{
176	pub fn new(name: impl Into<String>, priority: Priority, closure: F) -> Self {
177		Self {
178			name: name.into(),
179			priority,
180			closure,
181		}
182	}
183}
184
185impl<F> PoolTask for InternalClosureTask<F>
186where
187	F: Fn(&InternalTaskContext) -> Result<()> + Send + Sync,
188{
189	fn execute(&self, ctx: &InternalTaskContext) -> Result<()> {
190		(self.closure)(ctx)
191	}
192
193	fn priority(&self) -> Priority {
194		self.priority
195	}
196
197	fn name(&self) -> &str {
198		&self.name
199	}
200}