Skip to main content

reifydb_sub_task/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10};
11
12use dashmap::DashMap;
13use mpsc::Sender;
14use reifydb_core::{
15	interface::version::{ComponentType, HasVersion, SystemVersion},
16	util::ioc::IocContainer,
17};
18use reifydb_engine::engine::StandardEngine;
19use reifydb_runtime::SharedRuntime;
20use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
21use reifydb_type::Result;
22use tokio::{sync::mpsc, task::JoinHandle};
23use tracing::{info, instrument};
24
25use crate::{
26	coordinator,
27	coordinator::TaskCoordinatorMessage,
28	handle::TaskHandle,
29	registry::{TaskEntry, TaskRegistry},
30	task::ScheduledTask,
31};
32
33/// Task scheduler subsystem
34pub struct TaskSubsystem {
35	/// Whether the subsystem is running
36	running: AtomicBool,
37	/// Handle to interact with the task scheduler
38	handle: Option<TaskHandle>,
39	/// Sender to the coordinator
40	coordinator_tx: Option<Sender<TaskCoordinatorMessage>>,
41	/// Join handle for the coordinator task
42	coordinator_handle: Option<JoinHandle<()>>,
43	/// Shared runtime for spawning tasks
44	runtime: SharedRuntime,
45	/// Database engine for task execution
46	engine: StandardEngine,
47	/// Registry of scheduled tasks
48	registry: TaskRegistry,
49	/// Initial tasks to register on startup
50	initial_tasks: Vec<ScheduledTask>,
51}
52
53impl TaskSubsystem {
54	/// Create a new task subsystem
55	#[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
56	pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
57		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
58		let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
59		let registry = Arc::new(DashMap::new());
60
61		Self {
62			running: AtomicBool::new(false),
63			handle: None,
64			coordinator_tx: None,
65			coordinator_handle: None,
66			runtime,
67			engine,
68			registry,
69			initial_tasks,
70		}
71	}
72
73	/// Get a handle to interact with the task scheduler
74	///
75	/// Returns None if the subsystem is not running
76	pub fn handle(&self) -> Option<TaskHandle> {
77		self.handle.clone()
78	}
79}
80
81impl Subsystem for TaskSubsystem {
82	fn name(&self) -> &'static str {
83		"sub-task"
84	}
85
86	#[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
87	fn start(&mut self) -> Result<()> {
88		if self.running.load(Ordering::Acquire) {
89			// Already running
90			return Ok(());
91		}
92
93		info!("Starting task subsystem");
94
95		// Create coordinator channel
96		let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
97
98		// Register initial tasks in the registry
99		for task in self.initial_tasks.drain(..) {
100			let next_execution = self.runtime.clock().instant() + task.schedule.initial_delay();
101			self.registry.insert(
102				task.id,
103				TaskEntry {
104					task: Arc::new(task),
105					next_execution,
106				},
107			);
108		}
109
110		// Create handle
111		let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
112
113		// Spawn coordinator
114		let registry = self.registry.clone();
115		let runtime = self.runtime.clone();
116		let engine = self.engine.clone();
117
118		let join_handle = self.runtime.spawn(async move {
119			coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
120		});
121
122		// Store handle and coordinator_tx
123		self.handle = Some(handle);
124		self.coordinator_tx = Some(coordinator_tx);
125		self.coordinator_handle = Some(join_handle);
126		self.running.store(true, Ordering::Release);
127
128		info!("Task subsystem started");
129
130		Ok(())
131	}
132
133	#[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
134	fn shutdown(&mut self) -> Result<()> {
135		if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
136			// Already shutdown
137			return Ok(());
138		}
139
140		info!("Shutting down task subsystem");
141
142		// Send shutdown message to coordinator
143		if let Some(coordinator_tx) = self.coordinator_tx.take() {
144			let _ = coordinator_tx.blocking_send(TaskCoordinatorMessage::Shutdown);
145		}
146
147		// Wait for the coordinator task to finish
148		if let Some(join_handle) = self.coordinator_handle.take() {
149			let _ = self.runtime.block_on(join_handle);
150		}
151
152		self.handle = None;
153
154		info!("Task subsystem shut down");
155
156		Ok(())
157	}
158
159	#[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
160	fn is_running(&self) -> bool {
161		self.running.load(Ordering::Acquire)
162	}
163
164	#[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
165	fn health_status(&self) -> HealthStatus {
166		if self.is_running() {
167			HealthStatus::Healthy
168		} else {
169			HealthStatus::Unknown
170		}
171	}
172
173	fn as_any(&self) -> &dyn Any {
174		self
175	}
176
177	fn as_any_mut(&mut self) -> &mut dyn Any {
178		self
179	}
180}
181
182impl HasVersion for TaskSubsystem {
183	fn version(&self) -> SystemVersion {
184		SystemVersion {
185			name: env!("CARGO_PKG_NAME")
186				.strip_prefix("reifydb-")
187				.unwrap_or(env!("CARGO_PKG_NAME"))
188				.to_string(),
189			version: env!("CARGO_PKG_VERSION").to_string(),
190			description: "Periodic task scheduler subsystem".to_string(),
191			r#type: ComponentType::Subsystem,
192		}
193	}
194}