reifydb_sub_worker/
scheduler.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	cmp,
6	collections::{BinaryHeap, HashMap},
7	sync::{
8		Arc, Mutex,
9		atomic::{AtomicU64, Ordering},
10	},
11	time::{Duration, Instant},
12};
13
14use reifydb_core::Result;
15use reifydb_engine::StandardEngine;
16use reifydb_sub_api::{BoxedOnceTask, BoxedTask, Priority, TaskContext as CoreTaskContext, TaskHandle};
17
18use crate::task::{InternalTaskContext, PoolTask, ScheduledTask};
19
20/// Adapter from SchedulableTask to PoolTask
21pub(crate) struct SchedulableTaskAdapter {
22	task: BoxedTask,
23	engine: StandardEngine,
24}
25
26impl SchedulableTaskAdapter {
27	pub(crate) fn new(task: BoxedTask, engine: StandardEngine) -> Self {
28		Self {
29			task,
30			engine,
31		}
32	}
33}
34
35impl PoolTask for SchedulableTaskAdapter {
36	fn execute(&self, _ctx: &InternalTaskContext) -> Result<()> {
37		let core_ctx = CoreTaskContext::new(self.engine.clone());
38		self.task.execute(&core_ctx)
39	}
40
41	fn priority(&self) -> Priority {
42		self.task.priority()
43	}
44
45	fn name(&self) -> &str {
46		self.task.name()
47	}
48}
49
50/// Adapter from OnceTask to PoolTask
51///
52/// This adapter wraps a BoxedOnceTask in a Mutex so it can be
53/// executed once through the PoolTask interface. After execution,
54/// the task is consumed.
55pub(crate) struct OnceTaskAdapter {
56	task: Mutex<Option<BoxedOnceTask>>,
57	engine: StandardEngine,
58}
59
60impl OnceTaskAdapter {
61	pub(crate) fn new(task: BoxedOnceTask, engine: StandardEngine) -> Self {
62		Self {
63			task: Mutex::new(Some(task)),
64			engine,
65		}
66	}
67}
68
69impl PoolTask for OnceTaskAdapter {
70	fn execute(&self, _ctx: &InternalTaskContext) -> Result<()> {
71		// Take the task out of the Mutex. This can only be done once.
72		let task = self.task.lock().unwrap().take();
73		if let Some(task) = task {
74			let core_ctx = CoreTaskContext::new(self.engine.clone());
75			task.execute_once(&core_ctx)
76		} else {
77			panic!("OnceTask already executed");
78		}
79	}
80
81	fn priority(&self) -> Priority {
82		self.task.lock().unwrap().as_ref().map(|t| t.priority()).unwrap_or(Priority::Normal)
83	}
84
85	fn name(&self) -> &str {
86		// Name is called before execute, so task should still be present
87		// Return a generic name since we can't borrow from Mutex
88		"once-task"
89	}
90}
91
92/// Manages scheduled and periodic tasks
93pub struct TaskScheduler {
94	/// Next task handle ID
95	next_handle: AtomicU64,
96
97	/// All scheduled tasks by handle
98	tasks: HashMap<TaskHandle, ScheduledTask>,
99
100	/// Priority queue of tasks by next run time
101	queue: BinaryHeap<ScheduledTaskRef>,
102}
103
104/// Reference to a scheduled task in the priority queue
105struct ScheduledTaskRef {
106	handle: TaskHandle,
107	next_run: Instant,
108	priority: Priority,
109}
110
111impl PartialEq for ScheduledTaskRef {
112	fn eq(&self, other: &Self) -> bool {
113		self.next_run == other.next_run && self.priority == other.priority
114	}
115}
116
117impl Eq for ScheduledTaskRef {}
118
119impl PartialOrd for ScheduledTaskRef {
120	fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
121		Some(self.cmp(other))
122	}
123}
124
125impl Ord for ScheduledTaskRef {
126	fn cmp(&self, other: &Self) -> cmp::Ordering {
127		// Earlier time first
128		match other.next_run.cmp(&self.next_run) {
129			cmp::Ordering::Equal => {
130				// Higher priority first
131				self.priority.cmp(&other.priority)
132			}
133			other => other,
134		}
135	}
136}
137
138impl TaskScheduler {
139	pub fn new() -> Self {
140		Self {
141			next_handle: AtomicU64::new(1),
142			tasks: HashMap::new(),
143			queue: BinaryHeap::new(),
144		}
145	}
146
147	/// Set the next handle ID to use (for synchronization with
148	/// pre-generated handles)
149	pub fn set_next_handle(&self, handle_id: u64) {
150		self.next_handle.store(handle_id, Ordering::Relaxed);
151	}
152
153	/// Schedule a task to run at fixed intervals (internal implementation)
154	pub fn schedule_every_internal(
155		&mut self,
156		task: Box<dyn PoolTask>,
157		interval: Duration,
158		priority: Priority,
159	) -> TaskHandle {
160		let handle = TaskHandle::from(self.next_handle.fetch_add(1, Ordering::Relaxed));
161		let next_run = Instant::now() + interval;
162
163		let scheduled = ScheduledTask::new(handle, task, next_run, Some(interval), priority);
164
165		self.queue.push(ScheduledTaskRef {
166			handle,
167			next_run,
168			priority,
169		});
170
171		self.tasks.insert(handle, scheduled);
172		handle
173	}
174
175	/// Cancel a scheduled task
176	pub fn cancel(&mut self, handle: TaskHandle) {
177		self.tasks.remove(&handle);
178		// Note: The task reference remains in the queue but will be
179		// ignored when popped since it's no longer in the tasks map
180	}
181
182	/// Get all tasks that are ready to run
183	pub fn get_ready_tasks(&mut self) -> Vec<Box<dyn PoolTask>> {
184		let now = Instant::now();
185		let mut ready = Vec::new();
186
187		// Pop all tasks that are ready
188		while let Some(task_ref) = self.queue.peek() {
189			if task_ref.next_run > now {
190				break; // No more tasks ready
191			}
192
193			let task_ref = self.queue.pop().unwrap();
194
195			// Check if task still exists (might have been
196			// cancelled)
197			if let Some(mut scheduled) = self.tasks.remove(&task_ref.handle) {
198				let shared_task = SharedTask::new(scheduled.task.clone());
199				ready.push(Box::new(shared_task) as Box<dyn PoolTask>);
200
201				if let Some(interval) = scheduled.interval {
202					scheduled.next_run = now + interval;
203
204					self.queue.push(ScheduledTaskRef {
205						handle: task_ref.handle,
206						next_run: scheduled.next_run,
207						priority: scheduled.priority,
208					});
209
210					self.tasks.insert(task_ref.handle, scheduled);
211				}
212			}
213		}
214
215		ready
216	}
217
218	/// Get the next scheduled run time
219	pub fn next_run_time(&self) -> Option<Instant> {
220		self.queue.peek().map(|t| t.next_run)
221	}
222
223	/// Get number of scheduled tasks
224	pub fn task_count(&self) -> usize {
225		self.tasks.len()
226	}
227}
228
229/// Wrapper to share tasks safely across threads
230struct SharedTask(Arc<dyn PoolTask>);
231
232impl SharedTask {
233	fn new(task: Arc<dyn PoolTask>) -> Self {
234		Self(task)
235	}
236}
237
238impl PoolTask for SharedTask {
239	fn execute(&self, ctx: &InternalTaskContext) -> crate::Result<()> {
240		self.0.execute(ctx)
241	}
242
243	fn priority(&self) -> Priority {
244		self.0.priority()
245	}
246
247	fn name(&self) -> &str {
248		self.0.name()
249	}
250
251	fn can_retry(&self) -> bool {
252		self.0.can_retry()
253	}
254
255	fn max_retries(&self) -> usize {
256		self.0.max_retries()
257	}
258}