Skip to main content

reifydb_sub_task/
subsystem.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	thread,
11};
12
13use dashmap::DashMap;
14use mpsc::Sender;
15use reifydb_core::{
16	interface::version::{ComponentType, HasVersion, SystemVersion},
17	util::ioc::IocContainer,
18};
19use reifydb_engine::engine::StandardEngine;
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_value::Result;
23use tokio::{sync::mpsc, task::JoinHandle};
24use tracing::{info, instrument};
25
26use crate::{
27	coordinator,
28	coordinator::TaskCoordinatorMessage,
29	handle::TaskHandle,
30	registry::{TaskEntry, TaskRegistry},
31	task::ScheduledTask,
32};
33
34pub struct TaskSubsystem {
35	running: AtomicBool,
36
37	handle: Option<TaskHandle>,
38
39	coordinator_tx: Option<Sender<TaskCoordinatorMessage>>,
40
41	coordinator_handle: Option<JoinHandle<()>>,
42
43	runtime: SharedRuntime,
44
45	engine: StandardEngine,
46
47	registry: TaskRegistry,
48
49	initial_tasks: Vec<ScheduledTask>,
50}
51
52impl TaskSubsystem {
53	#[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
54	pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
55		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
56		let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
57		let registry = Arc::new(DashMap::new());
58
59		Self {
60			running: AtomicBool::new(false),
61			handle: None,
62			coordinator_tx: None,
63			coordinator_handle: None,
64			runtime,
65			engine,
66			registry,
67			initial_tasks,
68		}
69	}
70
71	pub fn handle(&self) -> Option<TaskHandle> {
72		self.handle.clone()
73	}
74}
75
76impl Subsystem for TaskSubsystem {
77	fn name(&self) -> &'static str {
78		"sub-task"
79	}
80
81	#[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
82	fn start(&mut self) -> Result<()> {
83		if self.running.load(Ordering::Acquire) {
84			return Ok(());
85		}
86
87		info!("Starting task subsystem");
88
89		let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
90
91		for task in self.initial_tasks.drain(..) {
92			let next_execution = self.runtime.clock().instant() + task.schedule.initial_delay();
93			self.registry.insert(
94				task.id,
95				TaskEntry {
96					task: Arc::new(task),
97					next_execution,
98				},
99			);
100		}
101
102		let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
103
104		let registry = self.registry.clone();
105		let runtime = self.runtime.clone();
106		let engine = self.engine.clone();
107
108		let join_handle = self.runtime.spawn(async move {
109			coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
110		});
111
112		self.handle = Some(handle);
113		self.coordinator_tx = Some(coordinator_tx);
114		self.coordinator_handle = Some(join_handle);
115		self.running.store(true, Ordering::Release);
116
117		info!("Task subsystem started");
118
119		Ok(())
120	}
121
122	#[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
123	fn shutdown(&mut self) -> Result<()> {
124		if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
125			return Ok(());
126		}
127
128		info!("Shutting down task subsystem");
129
130		let coordinator_tx = self.coordinator_tx.take();
131		let coordinator_handle = self.coordinator_handle.take();
132		let runtime = self.runtime.clone();
133		let worker = thread::spawn(move || {
134			if let Some(coordinator_tx) = coordinator_tx {
135				let _ = coordinator_tx.blocking_send(TaskCoordinatorMessage::Shutdown);
136			}
137			if let Some(join_handle) = coordinator_handle {
138				let _ = runtime.block_on(join_handle);
139			}
140		});
141		let _ = worker.join();
142
143		self.handle = None;
144
145		info!("Task subsystem shut down");
146
147		Ok(())
148	}
149
150	#[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
151	fn is_running(&self) -> bool {
152		self.running.load(Ordering::Acquire)
153	}
154
155	#[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
156	fn health_status(&self) -> HealthStatus {
157		if self.is_running() {
158			HealthStatus::Healthy
159		} else {
160			HealthStatus::Unknown
161		}
162	}
163
164	fn as_any(&self) -> &dyn Any {
165		self
166	}
167
168	fn as_any_mut(&mut self) -> &mut dyn Any {
169		self
170	}
171}
172
173impl HasVersion for TaskSubsystem {
174	fn version(&self) -> SystemVersion {
175		SystemVersion {
176			name: env!("CARGO_PKG_NAME")
177				.strip_prefix("reifydb-")
178				.unwrap_or(env!("CARGO_PKG_NAME"))
179				.to_string(),
180			version: env!("CARGO_PKG_VERSION").to_string(),
181			description: "Periodic task scheduler subsystem".to_string(),
182			r#type: ComponentType::Subsystem,
183		}
184	}
185}