reifydb_sub_worker/
client.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Channel-based client for scheduler communication
5
6use std::{
7	collections::VecDeque,
8	sync::{
9		Arc, Mutex,
10		atomic::{AtomicBool, AtomicU64, Ordering},
11		mpsc::{self, Sender},
12	},
13	time::Duration,
14};
15
16use diagnostic::shutdown;
17use reifydb_core::Result;
18use reifydb_sub_api::{BoxedOnceTask, BoxedTask, Priority, Scheduler, TaskHandle};
19use reifydb_type::{diagnostic, err, internal_err};
20
21/// Request types for scheduler operations
22pub enum SchedulerRequest {
23	ScheduleEvery {
24		task: BoxedTask,
25		interval: Duration,
26	},
27	Submit {
28		task: BoxedOnceTask,
29		priority: Priority,
30	},
31	Cancel {
32		handle: TaskHandle,
33	},
34}
35
36/// Response types for scheduler operations
37pub enum SchedulerResponse {
38	TaskScheduled(TaskHandle),
39	TaskSubmitted,
40	TaskCancelled,
41	Error(String),
42}
43
44/// Client for communicating with the worker subsystem's scheduler
45pub struct SchedulerClient {
46	sender: Sender<(SchedulerRequest, Sender<SchedulerResponse>)>,
47	pending_requests: Arc<Mutex<VecDeque<(SchedulerRequest, Sender<SchedulerResponse>)>>>,
48	next_handle: Arc<AtomicU64>,
49	running: Arc<AtomicBool>,
50}
51
52impl SchedulerClient {
53	pub fn new(
54		sender: Sender<(SchedulerRequest, Sender<SchedulerResponse>)>,
55		pending_requests: Arc<Mutex<VecDeque<(SchedulerRequest, Sender<SchedulerResponse>)>>>,
56		next_handle: Arc<AtomicU64>,
57		running: Arc<AtomicBool>,
58	) -> Self {
59		Self {
60			sender,
61			pending_requests,
62			next_handle,
63			running,
64		}
65	}
66}
67
68impl Scheduler for SchedulerClient {
69	fn every(&self, interval: Duration, task: BoxedTask) -> reifydb_core::Result<TaskHandle> {
70		if !self.running.load(Ordering::Relaxed) {
71			let handle = TaskHandle::from(self.next_handle.fetch_add(1, Ordering::Relaxed));
72
73			let (response_tx, _response_rx) = mpsc::channel();
74
75			let request = SchedulerRequest::ScheduleEvery {
76				task,
77				interval,
78			};
79
80			{
81				let mut pending = self.pending_requests.lock().unwrap();
82				pending.push_back((request, response_tx));
83			}
84
85			return Ok(handle);
86		}
87
88		let (response_tx, response_rx) = mpsc::channel();
89		let request = SchedulerRequest::ScheduleEvery {
90			task,
91			interval,
92		};
93
94		if self.sender.send((request, response_tx)).is_err() {
95			return err!(shutdown("Scheduler"));
96		}
97
98		let response = match response_rx.recv() {
99			Ok(resp) => resp,
100			Err(_) => return err!(shutdown("Scheduler")),
101		};
102
103		match response {
104			SchedulerResponse::TaskScheduled(handle) => Ok(handle),
105			SchedulerResponse::Error(msg) => {
106				internal_err!(msg)
107			}
108			_ => internal_err!("Unexpected response from scheduler"),
109		}
110	}
111
112	fn once(&self, task: BoxedOnceTask) -> reifydb_core::Result<()> {
113		let (response_tx, response_rx) = mpsc::channel();
114
115		let priority = task.priority();
116		let request = SchedulerRequest::Submit {
117			task,
118			priority,
119		};
120
121		if self.sender.send((request, response_tx)).is_err() {
122			return err!(shutdown("Scheduler"));
123		}
124
125		let response = match response_rx.recv() {
126			Ok(resp) => resp,
127			Err(_) => return err!(shutdown("Scheduler")),
128		};
129
130		match response {
131			SchedulerResponse::TaskSubmitted => Ok(()),
132			SchedulerResponse::Error(msg) => {
133				internal_err!(msg)
134			}
135			_ => internal_err!("Unexpected response from scheduler"),
136		}
137	}
138
139	fn cancel(&self, handle: TaskHandle) -> Result<()> {
140		let (response_tx, response_rx) = mpsc::channel();
141
142		let request = SchedulerRequest::Cancel {
143			handle,
144		};
145
146		if self.sender.send((request, response_tx)).is_err() {
147			return err!(shutdown("Scheduler"));
148		}
149
150		let response = match response_rx.recv() {
151			Ok(resp) => resp,
152			Err(_) => return err!(shutdown("Scheduler")),
153		};
154
155		match response {
156			SchedulerResponse::TaskCancelled => Ok(()),
157			SchedulerResponse::Error(msg) => {
158				internal_err!(msg)
159			}
160			_ => internal_err!("Unexpected response from scheduler"),
161		}
162	}
163}