reifydb_sub_worker/
scheduler.rs1use std::{
5 cmp,
6 collections::{BinaryHeap, HashMap},
7 sync::{
8 Arc, Mutex,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use reifydb_core::Result;
15use reifydb_engine::StandardEngine;
16use reifydb_sub_api::{BoxedOnceTask, BoxedTask, Priority, TaskContext as CoreTaskContext, TaskHandle};
17
18use crate::task::{InternalTaskContext, PoolTask, ScheduledTask};
19
20pub(crate) struct SchedulableTaskAdapter {
22 task: BoxedTask,
23 engine: StandardEngine,
24}
25
26impl SchedulableTaskAdapter {
27 pub(crate) fn new(task: BoxedTask, engine: StandardEngine) -> Self {
28 Self {
29 task,
30 engine,
31 }
32 }
33}
34
35impl PoolTask for SchedulableTaskAdapter {
36 fn execute(&self, _ctx: &InternalTaskContext) -> Result<()> {
37 let core_ctx = CoreTaskContext::new(self.engine.clone());
38 self.task.execute(&core_ctx)
39 }
40
41 fn priority(&self) -> Priority {
42 self.task.priority()
43 }
44
45 fn name(&self) -> &str {
46 self.task.name()
47 }
48}
49
50pub(crate) struct OnceTaskAdapter {
56 task: Mutex<Option<BoxedOnceTask>>,
57 engine: StandardEngine,
58}
59
60impl OnceTaskAdapter {
61 pub(crate) fn new(task: BoxedOnceTask, engine: StandardEngine) -> Self {
62 Self {
63 task: Mutex::new(Some(task)),
64 engine,
65 }
66 }
67}
68
69impl PoolTask for OnceTaskAdapter {
70 fn execute(&self, _ctx: &InternalTaskContext) -> Result<()> {
71 let task = self.task.lock().unwrap().take();
73 if let Some(task) = task {
74 let core_ctx = CoreTaskContext::new(self.engine.clone());
75 task.execute_once(&core_ctx)
76 } else {
77 panic!("OnceTask already executed");
78 }
79 }
80
81 fn priority(&self) -> Priority {
82 self.task.lock().unwrap().as_ref().map(|t| t.priority()).unwrap_or(Priority::Normal)
83 }
84
85 fn name(&self) -> &str {
86 "once-task"
89 }
90}
91
92pub struct TaskScheduler {
94 next_handle: AtomicU64,
96
97 tasks: HashMap<TaskHandle, ScheduledTask>,
99
100 queue: BinaryHeap<ScheduledTaskRef>,
102}
103
104struct ScheduledTaskRef {
106 handle: TaskHandle,
107 next_run: Instant,
108 priority: Priority,
109}
110
111impl PartialEq for ScheduledTaskRef {
112 fn eq(&self, other: &Self) -> bool {
113 self.next_run == other.next_run && self.priority == other.priority
114 }
115}
116
117impl Eq for ScheduledTaskRef {}
118
119impl PartialOrd for ScheduledTaskRef {
120 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for ScheduledTaskRef {
126 fn cmp(&self, other: &Self) -> cmp::Ordering {
127 match other.next_run.cmp(&self.next_run) {
129 cmp::Ordering::Equal => {
130 self.priority.cmp(&other.priority)
132 }
133 other => other,
134 }
135 }
136}
137
138impl TaskScheduler {
139 pub fn new() -> Self {
140 Self {
141 next_handle: AtomicU64::new(1),
142 tasks: HashMap::new(),
143 queue: BinaryHeap::new(),
144 }
145 }
146
147 pub fn set_next_handle(&self, handle_id: u64) {
150 self.next_handle.store(handle_id, Ordering::Relaxed);
151 }
152
153 pub fn schedule_every_internal(
155 &mut self,
156 task: Box<dyn PoolTask>,
157 interval: Duration,
158 priority: Priority,
159 ) -> TaskHandle {
160 let handle = TaskHandle::from(self.next_handle.fetch_add(1, Ordering::Relaxed));
161 let next_run = Instant::now() + interval;
162
163 let scheduled = ScheduledTask::new(handle, task, next_run, Some(interval), priority);
164
165 self.queue.push(ScheduledTaskRef {
166 handle,
167 next_run,
168 priority,
169 });
170
171 self.tasks.insert(handle, scheduled);
172 handle
173 }
174
175 pub fn cancel(&mut self, handle: TaskHandle) {
177 self.tasks.remove(&handle);
178 }
181
182 pub fn get_ready_tasks(&mut self) -> Vec<Box<dyn PoolTask>> {
184 let now = Instant::now();
185 let mut ready = Vec::new();
186
187 while let Some(task_ref) = self.queue.peek() {
189 if task_ref.next_run > now {
190 break; }
192
193 let task_ref = self.queue.pop().unwrap();
194
195 if let Some(mut scheduled) = self.tasks.remove(&task_ref.handle) {
198 let shared_task = SharedTask::new(scheduled.task.clone());
199 ready.push(Box::new(shared_task) as Box<dyn PoolTask>);
200
201 if let Some(interval) = scheduled.interval {
202 scheduled.next_run = now + interval;
203
204 self.queue.push(ScheduledTaskRef {
205 handle: task_ref.handle,
206 next_run: scheduled.next_run,
207 priority: scheduled.priority,
208 });
209
210 self.tasks.insert(task_ref.handle, scheduled);
211 }
212 }
213 }
214
215 ready
216 }
217
218 pub fn next_run_time(&self) -> Option<Instant> {
220 self.queue.peek().map(|t| t.next_run)
221 }
222
223 pub fn task_count(&self) -> usize {
225 self.tasks.len()
226 }
227}
228
229struct SharedTask(Arc<dyn PoolTask>);
231
232impl SharedTask {
233 fn new(task: Arc<dyn PoolTask>) -> Self {
234 Self(task)
235 }
236}
237
238impl PoolTask for SharedTask {
239 fn execute(&self, ctx: &InternalTaskContext) -> crate::Result<()> {
240 self.0.execute(ctx)
241 }
242
243 fn priority(&self) -> Priority {
244 self.0.priority()
245 }
246
247 fn name(&self) -> &str {
248 self.0.name()
249 }
250
251 fn can_retry(&self) -> bool {
252 self.0.can_retry()
253 }
254
255 fn max_retries(&self) -> usize {
256 self.0.max_retries()
257 }
258}