Skip to main content

reifydb_sub_task/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Instant,
11};
12
13use dashmap::DashMap;
14use mpsc::Sender;
15use reifydb_core::{
16	interface::version::{ComponentType, HasVersion, SystemVersion},
17	util::ioc::IocContainer,
18};
19use reifydb_engine::engine::StandardEngine;
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_type::Result;
23use tokio::{sync::mpsc, task::JoinHandle};
24use tracing::{info, instrument};
25
26use crate::{
27	coordinator,
28	coordinator::CoordinatorMessage,
29	handle::TaskHandle,
30	registry::{TaskEntry, TaskRegistry},
31	task::ScheduledTask,
32};
33
34/// Task scheduler subsystem
35pub struct TaskSubsystem {
36	/// Whether the subsystem is running
37	running: AtomicBool,
38	/// Handle to interact with the task scheduler
39	handle: Option<TaskHandle>,
40	/// Sender to the coordinator
41	coordinator_tx: Option<Sender<CoordinatorMessage>>,
42	/// Join handle for the coordinator task
43	coordinator_handle: Option<JoinHandle<()>>,
44	/// Shared runtime for spawning tasks
45	runtime: SharedRuntime,
46	/// Database engine for task execution
47	engine: StandardEngine,
48	/// Registry of scheduled tasks
49	registry: TaskRegistry,
50	/// Initial tasks to register on startup
51	initial_tasks: Vec<ScheduledTask>,
52}
53
54impl TaskSubsystem {
55	/// Create a new task subsystem
56	#[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
57	pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
58		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
59		let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
60		let registry = Arc::new(DashMap::new());
61
62		Self {
63			running: AtomicBool::new(false),
64			handle: None,
65			coordinator_tx: None,
66			coordinator_handle: None,
67			runtime,
68			engine,
69			registry,
70			initial_tasks,
71		}
72	}
73
74	/// Get a handle to interact with the task scheduler
75	///
76	/// Returns None if the subsystem is not running
77	pub fn handle(&self) -> Option<TaskHandle> {
78		self.handle.clone()
79	}
80}
81
82impl Subsystem for TaskSubsystem {
83	fn name(&self) -> &'static str {
84		"sub-task"
85	}
86
87	#[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
88	fn start(&mut self) -> Result<()> {
89		if self.running.load(Ordering::Acquire) {
90			// Already running
91			return Ok(());
92		}
93
94		info!("Starting task subsystem");
95
96		// Create coordinator channel
97		let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
98
99		// Register initial tasks in the registry
100		for task in self.initial_tasks.drain(..) {
101			let next_execution = Instant::now() + task.schedule.initial_delay();
102			self.registry.insert(
103				task.id,
104				TaskEntry {
105					task: Arc::new(task),
106					next_execution,
107				},
108			);
109		}
110
111		// Create handle
112		let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
113
114		// Spawn coordinator
115		let registry = self.registry.clone();
116		let runtime = self.runtime.clone();
117		let engine = self.engine.clone();
118
119		let join_handle = self.runtime.spawn(async move {
120			coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
121		});
122
123		// Store handle and coordinator_tx
124		self.handle = Some(handle);
125		self.coordinator_tx = Some(coordinator_tx);
126		self.coordinator_handle = Some(join_handle);
127		self.running.store(true, Ordering::Release);
128
129		info!("Task subsystem started");
130
131		Ok(())
132	}
133
134	#[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
135	fn shutdown(&mut self) -> Result<()> {
136		if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
137			// Already shutdown
138			return Ok(());
139		}
140
141		info!("Shutting down task subsystem");
142
143		// Send shutdown message to coordinator
144		if let Some(coordinator_tx) = self.coordinator_tx.take() {
145			let _ = coordinator_tx.blocking_send(CoordinatorMessage::Shutdown);
146		}
147
148		// Wait for the coordinator task to finish
149		if let Some(join_handle) = self.coordinator_handle.take() {
150			let _ = self.runtime.block_on(join_handle);
151		}
152
153		self.handle = None;
154
155		info!("Task subsystem shut down");
156
157		Ok(())
158	}
159
160	#[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
161	fn is_running(&self) -> bool {
162		self.running.load(Ordering::Acquire)
163	}
164
165	#[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
166	fn health_status(&self) -> HealthStatus {
167		if self.is_running() {
168			HealthStatus::Healthy
169		} else {
170			HealthStatus::Unknown
171		}
172	}
173
174	fn as_any(&self) -> &dyn Any {
175		self
176	}
177
178	fn as_any_mut(&mut self) -> &mut dyn Any {
179		self
180	}
181}
182
183impl HasVersion for TaskSubsystem {
184	fn version(&self) -> SystemVersion {
185		SystemVersion {
186			name: env!("CARGO_PKG_NAME")
187				.strip_prefix("reifydb-")
188				.unwrap_or(env!("CARGO_PKG_NAME"))
189				.to_string(),
190			version: env!("CARGO_PKG_VERSION").to_string(),
191			description: "Periodic task scheduler subsystem".to_string(),
192			r#type: ComponentType::Subsystem,
193		}
194	}
195}