reifydb_sub_worker/
client.rs1use std::{
7 collections::VecDeque,
8 sync::{
9 Arc, Mutex,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 mpsc::{self, Sender},
12 },
13 time::Duration,
14};
15
16use diagnostic::shutdown;
17use reifydb_core::Result;
18use reifydb_sub_api::{BoxedOnceTask, BoxedTask, Priority, Scheduler, TaskHandle};
19use reifydb_type::{diagnostic, err, internal_err};
20
21pub enum SchedulerRequest {
23 ScheduleEvery {
24 task: BoxedTask,
25 interval: Duration,
26 },
27 Submit {
28 task: BoxedOnceTask,
29 priority: Priority,
30 },
31 Cancel {
32 handle: TaskHandle,
33 },
34}
35
36pub enum SchedulerResponse {
38 TaskScheduled(TaskHandle),
39 TaskSubmitted,
40 TaskCancelled,
41 Error(String),
42}
43
44pub struct SchedulerClient {
46 sender: Sender<(SchedulerRequest, Sender<SchedulerResponse>)>,
47 pending_requests: Arc<Mutex<VecDeque<(SchedulerRequest, Sender<SchedulerResponse>)>>>,
48 next_handle: Arc<AtomicU64>,
49 running: Arc<AtomicBool>,
50}
51
52impl SchedulerClient {
53 pub fn new(
54 sender: Sender<(SchedulerRequest, Sender<SchedulerResponse>)>,
55 pending_requests: Arc<Mutex<VecDeque<(SchedulerRequest, Sender<SchedulerResponse>)>>>,
56 next_handle: Arc<AtomicU64>,
57 running: Arc<AtomicBool>,
58 ) -> Self {
59 Self {
60 sender,
61 pending_requests,
62 next_handle,
63 running,
64 }
65 }
66}
67
68impl Scheduler for SchedulerClient {
69 fn every(&self, interval: Duration, task: BoxedTask) -> reifydb_core::Result<TaskHandle> {
70 if !self.running.load(Ordering::Relaxed) {
71 let handle = TaskHandle::from(self.next_handle.fetch_add(1, Ordering::Relaxed));
72
73 let (response_tx, _response_rx) = mpsc::channel();
74
75 let request = SchedulerRequest::ScheduleEvery {
76 task,
77 interval,
78 };
79
80 {
81 let mut pending = self.pending_requests.lock().unwrap();
82 pending.push_back((request, response_tx));
83 }
84
85 return Ok(handle);
86 }
87
88 let (response_tx, response_rx) = mpsc::channel();
89 let request = SchedulerRequest::ScheduleEvery {
90 task,
91 interval,
92 };
93
94 if self.sender.send((request, response_tx)).is_err() {
95 return err!(shutdown("Scheduler"));
96 }
97
98 let response = match response_rx.recv() {
99 Ok(resp) => resp,
100 Err(_) => return err!(shutdown("Scheduler")),
101 };
102
103 match response {
104 SchedulerResponse::TaskScheduled(handle) => Ok(handle),
105 SchedulerResponse::Error(msg) => {
106 internal_err!(msg)
107 }
108 _ => internal_err!("Unexpected response from scheduler"),
109 }
110 }
111
112 fn once(&self, task: BoxedOnceTask) -> reifydb_core::Result<()> {
113 let (response_tx, response_rx) = mpsc::channel();
114
115 let priority = task.priority();
116 let request = SchedulerRequest::Submit {
117 task,
118 priority,
119 };
120
121 if self.sender.send((request, response_tx)).is_err() {
122 return err!(shutdown("Scheduler"));
123 }
124
125 let response = match response_rx.recv() {
126 Ok(resp) => resp,
127 Err(_) => return err!(shutdown("Scheduler")),
128 };
129
130 match response {
131 SchedulerResponse::TaskSubmitted => Ok(()),
132 SchedulerResponse::Error(msg) => {
133 internal_err!(msg)
134 }
135 _ => internal_err!("Unexpected response from scheduler"),
136 }
137 }
138
139 fn cancel(&self, handle: TaskHandle) -> Result<()> {
140 let (response_tx, response_rx) = mpsc::channel();
141
142 let request = SchedulerRequest::Cancel {
143 handle,
144 };
145
146 if self.sender.send((request, response_tx)).is_err() {
147 return err!(shutdown("Scheduler"));
148 }
149
150 let response = match response_rx.recv() {
151 Ok(resp) => resp,
152 Err(_) => return err!(shutdown("Scheduler")),
153 };
154
155 match response {
156 SchedulerResponse::TaskCancelled => Ok(()),
157 SchedulerResponse::Error(msg) => {
158 internal_err!(msg)
159 }
160 _ => internal_err!("Unexpected response from scheduler"),
161 }
162 }
163}