reifydb_sub_api/
worker.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Worker interface for centralized task management
5//!
6//! This module provides the interface for a global worker pool that can be used
7//! by various components to schedule and manage background tasks.
8
9use std::{
10	fmt::{self, Display, Formatter},
11	ops::Deref,
12	sync::Arc,
13	time::Duration,
14};
15
16use reifydb_engine::StandardEngine;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19pub enum Priority {
20	Low = 0,
21	Normal = 1,
22	High = 2,
23}
24
25/// Handle to a scheduled task in the worker pool
26#[repr(transparent)]
27#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Hash)]
28pub struct TaskHandle(pub u64);
29
30impl Display for TaskHandle {
31	fn fmt(&self, f: &mut Formatter) -> fmt::Result {
32		Display::fmt(&self.0, f)
33	}
34}
35
36impl Deref for TaskHandle {
37	type Target = u64;
38
39	fn deref(&self) -> &Self::Target {
40		&self.0
41	}
42}
43
44impl PartialEq<u64> for TaskHandle {
45	fn eq(&self, other: &u64) -> bool {
46		self.0.eq(other)
47	}
48}
49
50impl From<TaskHandle> for u64 {
51	fn from(value: TaskHandle) -> Self {
52		value.0
53	}
54}
55
56impl From<u64> for TaskHandle {
57	fn from(value: u64) -> Self {
58		TaskHandle(value)
59	}
60}
61
62#[derive(Clone)]
63pub struct TaskContext {
64	engine: StandardEngine,
65}
66
67impl TaskContext {
68	pub fn new(engine: StandardEngine) -> Self {
69		Self {
70			engine,
71		}
72	}
73
74	pub fn engine(&self) -> &StandardEngine {
75		&self.engine
76	}
77}
78
79pub trait SchedulableTask: Send + Sync {
80	fn execute(&self, ctx: &TaskContext) -> reifydb_core::Result<()>;
81	fn name(&self) -> &str;
82	fn priority(&self) -> Priority;
83}
84
85pub type BoxedTask = Box<dyn SchedulableTask>;
86
87/// Trait for one-time tasks that consume themselves on execution
88///
89/// Unlike SchedulableTask which uses `&self` and can be executed multiple times,
90/// OnceTask takes ownership via `Box<Self>` and is consumed on execution.
91/// This allows tasks to own and transfer data without wrapping in Arc/Mutex.
92pub trait OnceTask: Send + Sync {
93	fn execute_once(self: Box<Self>, ctx: &TaskContext) -> reifydb_core::Result<()>;
94	fn name(&self) -> &str;
95	fn priority(&self) -> Priority;
96}
97
98pub type BoxedOnceTask = Box<dyn OnceTask>;
99
100/// Adapter to convert a FnOnce closure into an OnceTask
101pub struct OnceClosureTask<F>
102where
103	F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
104{
105	name: String,
106	priority: Priority,
107	task: Option<F>, // Wrapped in Option to enable taking
108}
109
110impl<F> OnceClosureTask<F>
111where
112	F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
113{
114	pub fn new(name: impl Into<String>, priority: Priority, task: F) -> Self {
115		Self {
116			name: name.into(),
117			priority,
118			task: Some(task),
119		}
120	}
121}
122
123impl<F> OnceTask for OnceClosureTask<F>
124where
125	F: FnOnce(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
126{
127	fn execute_once(mut self: Box<Self>, ctx: &TaskContext) -> reifydb_core::Result<()> {
128		let task = self.task.take().expect("Task already executed");
129		task(ctx)
130	}
131
132	fn name(&self) -> &str {
133		&self.name
134	}
135
136	fn priority(&self) -> Priority {
137		self.priority
138	}
139}
140
141/// Adapter to convert a closure into a SchedulableTask
142pub struct ClosureTask<F>
143where
144	F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
145{
146	name: String,
147	priority: Priority,
148	task: F,
149}
150
151impl<F> ClosureTask<F>
152where
153	F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
154{
155	pub fn new(name: impl Into<String>, priority: Priority, task: F) -> Self {
156		Self {
157			name: name.into(),
158			priority,
159			task,
160		}
161	}
162}
163
164impl<F> SchedulableTask for ClosureTask<F>
165where
166	F: Fn(&TaskContext) -> reifydb_core::Result<()> + Send + Sync,
167{
168	fn execute(&self, ctx: &TaskContext) -> reifydb_core::Result<()> {
169		(self.task)(ctx)
170	}
171
172	fn name(&self) -> &str {
173		&self.name
174	}
175
176	fn priority(&self) -> Priority {
177		self.priority
178	}
179}
180
181/// Macro for creating tasks with less boilerplate
182///
183/// # Examples
184///
185/// ```ignore
186/// // Minimal - anonymous task with Normal priority
187/// let task = task!(|ctx| {
188///     // task body
189///     Ok(())
190/// });
191///
192/// // With name only (Normal priority)
193/// let task = task!("my_task", |ctx| {
194///     // task body
195///     Ok(())
196/// });
197///
198/// // With name and priority
199/// let task = task!("my_task", High, |ctx| {
200///     // task body
201///     Ok(())
202/// });
203///
204/// // With priority only (anonymous task)
205/// let task = task!(Low, |ctx| {
206///     // task body
207///     Ok(())
208/// });
209///
210/// // With move semantics (works with all patterns)
211/// let captured = 42;
212/// let task = task!("my_task", move |ctx| {
213///     println!("Captured: {}", captured);
214///     Ok(())
215/// });
216/// ```
217#[macro_export]
218macro_rules! task {
219	// Pattern: just closure (unnamed task with Normal priority)
220	($closure:expr) => {
221		Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
222	};
223
224	// Pattern: Priority literal (Low/Normal/High), closure - unnamed task
225	(Low, $closure:expr) => {
226		Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Low, $closure))
227	};
228	(Normal, $closure:expr) => {
229		Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
230	};
231	(High, $closure:expr) => {
232		Box::new($crate::ClosureTask::new("unnamed", $crate::Priority::High, $closure))
233	};
234
235	// Pattern: name (string literal), closure (Normal priority)
236	($name:literal, $closure:expr) => {
237		Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
238	};
239
240	// Pattern: Priority literal, name (string literal), closure
241	(Low, $name:literal, $closure:expr) => {
242		Box::new($crate::ClosureTask::new($name, $crate::Priority::Low, $closure))
243	};
244	(Normal, $name:literal, $closure:expr) => {
245		Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
246	};
247	(High, $name:literal, $closure:expr) => {
248		Box::new($crate::ClosureTask::new($name, $crate::Priority::High, $closure))
249	};
250
251	// Pattern: name (string literal), Priority literal, closure
252	($name:literal, Low, $closure:expr) => {
253		Box::new($crate::ClosureTask::new($name, $crate::Priority::Low, $closure))
254	};
255	($name:literal, Normal, $closure:expr) => {
256		Box::new($crate::ClosureTask::new($name, $crate::Priority::Normal, $closure))
257	};
258	($name:literal, High, $closure:expr) => {
259		Box::new($crate::ClosureTask::new($name, $crate::Priority::High, $closure))
260	};
261
262	// Pattern: Priority value (expr), closure - for when Priority is imported
263	($priority:expr, $closure:expr) => {
264		Box::new($crate::ClosureTask::new("unnamed", $priority, $closure))
265	};
266
267	// Pattern: name (expr), Priority value (expr), closure
268	($name:expr, $priority:expr, $closure:expr) => {
269		Box::new($crate::ClosureTask::new($name, $priority, $closure))
270	};
271}
272
273/// Macro for creating one-time tasks with FnOnce semantics
274///
275/// Unlike `task!` which creates reusable tasks with `Fn` closures,
276/// `task_once!` creates tasks that consume themselves and can move
277/// captured variables without Arc/Mutex wrappers.
278///
279/// # Examples
280///
281/// ```ignore
282/// // Minimal - anonymous task with Normal priority
283/// let task = task_once!(|ctx| {
284///     // task body
285///     Ok(())
286/// });
287///
288/// // With name and priority
289/// let task = task_once!("my_task", High, move |ctx| {
290///     // Can move values naturally
291///     Ok(())
292/// });
293/// ```
294#[macro_export]
295macro_rules! task_once {
296	// Pattern: just closure (unnamed task with Normal priority)
297	($closure:expr) => {
298		Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
299	};
300
301	// Pattern: Priority literal (Low/Normal/High), closure - unnamed task
302	(Low, $closure:expr) => {
303		Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Low, $closure))
304	};
305	(Normal, $closure:expr) => {
306		Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::Normal, $closure))
307	};
308	(High, $closure:expr) => {
309		Box::new($crate::OnceClosureTask::new("unnamed", $crate::Priority::High, $closure))
310	};
311
312	// Pattern: name (string literal), closure (Normal priority)
313	($name:literal, $closure:expr) => {
314		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
315	};
316
317	// Pattern: Priority literal, name (string literal), closure
318	(Low, $name:literal, $closure:expr) => {
319		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Low, $closure))
320	};
321	(Normal, $name:literal, $closure:expr) => {
322		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
323	};
324	(High, $name:literal, $closure:expr) => {
325		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::High, $closure))
326	};
327
328	// Pattern: name (string literal), Priority literal, closure
329	($name:literal, Low, $closure:expr) => {
330		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Low, $closure))
331	};
332	($name:literal, Normal, $closure:expr) => {
333		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::Normal, $closure))
334	};
335	($name:literal, High, $closure:expr) => {
336		Box::new($crate::OnceClosureTask::new($name, $crate::Priority::High, $closure))
337	};
338
339	// Pattern: Priority value (expr), closure - for when Priority is imported
340	($priority:expr, $closure:expr) => {
341		Box::new($crate::OnceClosureTask::new("unnamed", $priority, $closure))
342	};
343
344	// Pattern: name (expr), Priority value (expr), closure
345	($name:expr, $priority:expr, $closure:expr) => {
346		Box::new($crate::OnceClosureTask::new($name, $priority, $closure))
347	};
348}
349
350pub trait Scheduler: Send + Sync {
351	/// Schedule a task to run at fixed intervals
352	///
353	/// The task will be scheduled to run every `interval` duration.
354	/// The next execution time is calculated when the task is picked up
355	/// for execution (not when it completes). This means if a task takes
356	/// longer than its interval, multiple instances may be queued.
357	fn every(&self, interval: Duration, task: BoxedTask) -> reifydb_core::Result<TaskHandle>;
358
359	/// Submit a one-time task for immediate execution with FnOnce semantics
360	///
361	/// The task will be queued for execution based on its priority and consumed on execution.
362	/// This allows tasks to move captured values without requiring Arc/Mutex wrappers.
363	/// This method returns immediately without waiting for execution.
364	///
365	/// # Arguments
366	/// * `task` - The one-time task to execute
367	///
368	/// # Returns
369	/// Ok(()) if task was successfully queued, Err if queue is full or worker pool is shut down
370	fn once(&self, task: BoxedOnceTask) -> reifydb_core::Result<()>;
371
372	/// Cancel a scheduled task
373	fn cancel(&self, handle: TaskHandle) -> reifydb_core::Result<()>;
374}
375
376/// Wrapper type for registering Scheduler in IocContainer
377///
378/// Since IocContainer uses TypeId-based resolution, trait objects like
379/// `Arc<dyn Scheduler>` cannot be registered directly. This newtype wrapper
380/// provides a concrete type that can be registered and resolved.
381#[derive(Clone)]
382pub struct SchedulerService(pub Arc<dyn Scheduler>);
383
384impl Deref for SchedulerService {
385	type Target = Arc<dyn Scheduler>;
386
387	fn deref(&self) -> &Self::Target {
388		&self.0
389	}
390}
391
392#[cfg(test)]
393mod tests {
394	use super::*;
395	use crate::Priority::{High, Low, Normal};
396
397	#[test]
398	fn test_task_macro_minimal() {
399		// Test minimal syntax: just closure (unnamed task with Normal priority)
400		let task: BoxedTask = task!(|_ctx| { Ok(()) });
401
402		assert_eq!(task.name(), "unnamed");
403		assert_eq!(task.priority(), Normal);
404	}
405
406	#[test]
407	fn test_task_macro_with_name() {
408		// Test with name only (Normal priority)
409		let task: BoxedTask = task!("test_task", |_ctx| { Ok(()) });
410
411		assert_eq!(task.name(), "test_task");
412		assert_eq!(task.priority(), Normal);
413	}
414
415	#[test]
416	fn test_task_macro_with_priority() {
417		// Test with priority only (unnamed task)
418		let task: BoxedTask = task!(High, |_ctx| { Ok(()) });
419
420		assert_eq!(task.name(), "unnamed");
421		assert_eq!(task.priority(), High);
422	}
423
424	#[test]
425	fn test_task_macro_priority_name() {
426		// Test with priority first, then name
427		let task: BoxedTask = task!(Low, "priority_first", |_ctx| { Ok(()) });
428
429		assert_eq!(task.name(), "priority_first");
430		assert_eq!(task.priority(), Low);
431	}
432
433	#[test]
434	fn test_task_macro_name_priority() {
435		// Test with name first, then priority
436		let task: BoxedTask = task!("name_first", High, |_ctx| { Ok(()) });
437
438		assert_eq!(task.name(), "name_first");
439		assert_eq!(task.priority(), High);
440	}
441
442	#[test]
443	fn test_task_macro_with_move_closure() {
444		// Test with move closure and captured variables
445		let captured_value = 42;
446		let task: BoxedTask = task!("move_task", move |_ctx| {
447			// Use captured value to ensure move semantics work
448			let _val = captured_value;
449			Ok(())
450		});
451
452		assert_eq!(task.name(), "move_task");
453		assert_eq!(task.priority(), Normal);
454	}
455
456	#[test]
457	fn test_task_macro_all_priorities() {
458		// Test all priority levels
459		let low_task: BoxedTask = task!(Low, |_ctx| { Ok(()) });
460		let normal_task: BoxedTask = task!(Normal, |_ctx| { Ok(()) });
461		let high_task: BoxedTask = task!(High, |_ctx| { Ok(()) });
462
463		assert_eq!(low_task.priority(), Low);
464		assert_eq!(normal_task.priority(), Normal);
465		assert_eq!(high_task.priority(), High);
466	}
467}