Skip to main content

reifydb_sub_task/
subsystem.rs

1use std::{
2	any::Any,
3	sync::{
4		Arc,
5		atomic::{AtomicBool, Ordering},
6	},
7	time::Instant,
8};
9
10use dashmap::DashMap;
11use mpsc::Sender;
12use reifydb_core::{
13	interface::version::{ComponentType, HasVersion, SystemVersion},
14	util::ioc::IocContainer,
15};
16use reifydb_engine::engine::StandardEngine;
17use reifydb_runtime::SharedRuntime;
18use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
19use reifydb_type::Result;
20use tokio::{sync::mpsc, task::JoinHandle};
21use tracing::{info, instrument};
22
23use crate::{
24	coordinator,
25	coordinator::CoordinatorMessage,
26	handle::TaskHandle,
27	registry::{TaskEntry, TaskRegistry},
28	task::ScheduledTask,
29};
30
31/// Task scheduler subsystem
32pub struct TaskSubsystem {
33	/// Whether the subsystem is running
34	running: AtomicBool,
35	/// Handle to interact with the task scheduler
36	handle: Option<TaskHandle>,
37	/// Sender to the coordinator
38	coordinator_tx: Option<Sender<CoordinatorMessage>>,
39	/// Join handle for the coordinator task
40	coordinator_handle: Option<JoinHandle<()>>,
41	/// Shared runtime for spawning tasks
42	runtime: SharedRuntime,
43	/// Database engine for task execution
44	engine: StandardEngine,
45	/// Registry of scheduled tasks
46	registry: TaskRegistry,
47	/// Initial tasks to register on startup
48	initial_tasks: Vec<ScheduledTask>,
49}
50
51impl TaskSubsystem {
52	/// Create a new task subsystem
53	#[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
54	pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
55		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
56		let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
57		let registry = Arc::new(DashMap::new());
58
59		Self {
60			running: AtomicBool::new(false),
61			handle: None,
62			coordinator_tx: None,
63			coordinator_handle: None,
64			runtime,
65			engine,
66			registry,
67			initial_tasks,
68		}
69	}
70
71	/// Get a handle to interact with the task scheduler
72	///
73	/// Returns None if the subsystem is not running
74	pub fn handle(&self) -> Option<TaskHandle> {
75		self.handle.clone()
76	}
77}
78
79impl Subsystem for TaskSubsystem {
80	fn name(&self) -> &'static str {
81		"sub-task"
82	}
83
84	#[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
85	fn start(&mut self) -> Result<()> {
86		if self.running.load(Ordering::Acquire) {
87			// Already running
88			return Ok(());
89		}
90
91		info!("Starting task subsystem");
92
93		// Create coordinator channel
94		let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
95
96		// Register initial tasks in the registry
97		for task in self.initial_tasks.drain(..) {
98			let next_execution = Instant::now() + task.schedule.initial_delay();
99			self.registry.insert(
100				task.id,
101				TaskEntry {
102					task: Arc::new(task),
103					next_execution,
104				},
105			);
106		}
107
108		// Create handle
109		let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
110
111		// Spawn coordinator
112		let registry = self.registry.clone();
113		let runtime = self.runtime.clone();
114		let engine = self.engine.clone();
115
116		let join_handle = self.runtime.spawn(async move {
117			coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
118		});
119
120		// Store handle and coordinator_tx
121		self.handle = Some(handle);
122		self.coordinator_tx = Some(coordinator_tx);
123		self.coordinator_handle = Some(join_handle);
124		self.running.store(true, Ordering::Release);
125
126		info!("Task subsystem started");
127
128		Ok(())
129	}
130
131	#[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
132	fn shutdown(&mut self) -> Result<()> {
133		if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
134			// Already shutdown
135			return Ok(());
136		}
137
138		info!("Shutting down task subsystem");
139
140		// Send shutdown message to coordinator
141		if let Some(coordinator_tx) = self.coordinator_tx.take() {
142			let _ = coordinator_tx.blocking_send(CoordinatorMessage::Shutdown);
143		}
144
145		// Wait for the coordinator task to finish
146		if let Some(join_handle) = self.coordinator_handle.take() {
147			let _ = self.runtime.block_on(join_handle);
148		}
149
150		self.handle = None;
151
152		info!("Task subsystem shut down");
153
154		Ok(())
155	}
156
157	#[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
158	fn is_running(&self) -> bool {
159		self.running.load(Ordering::Acquire)
160	}
161
162	#[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
163	fn health_status(&self) -> HealthStatus {
164		if self.is_running() {
165			HealthStatus::Healthy
166		} else {
167			HealthStatus::Unknown
168		}
169	}
170
171	fn as_any(&self) -> &dyn Any {
172		self
173	}
174
175	fn as_any_mut(&mut self) -> &mut dyn Any {
176		self
177	}
178}
179
180impl HasVersion for TaskSubsystem {
181	fn version(&self) -> SystemVersion {
182		SystemVersion {
183			name: env!("CARGO_PKG_NAME")
184				.strip_prefix("reifydb-")
185				.unwrap_or(env!("CARGO_PKG_NAME"))
186				.to_string(),
187			version: env!("CARGO_PKG_VERSION").to_string(),
188			description: "Periodic task scheduler subsystem".to_string(),
189			r#type: ComponentType::Subsystem,
190		}
191	}
192}