Skip to main content

reifydb_sub_task/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10};
11
12use dashmap::DashMap;
13use mpsc::Sender;
14use reifydb_core::{
15	interface::version::{ComponentType, HasVersion, SystemVersion},
16	util::ioc::IocContainer,
17};
18use reifydb_engine::engine::StandardEngine;
19use reifydb_runtime::SharedRuntime;
20use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
21use reifydb_type::Result;
22use tokio::{sync::mpsc, task::JoinHandle};
23use tracing::{info, instrument};
24
25use crate::{
26	coordinator,
27	coordinator::TaskCoordinatorMessage,
28	handle::TaskHandle,
29	registry::{TaskEntry, TaskRegistry},
30	task::ScheduledTask,
31};
32
33pub struct TaskSubsystem {
34	running: AtomicBool,
35
36	handle: Option<TaskHandle>,
37
38	coordinator_tx: Option<Sender<TaskCoordinatorMessage>>,
39
40	coordinator_handle: Option<JoinHandle<()>>,
41
42	runtime: SharedRuntime,
43
44	engine: StandardEngine,
45
46	registry: TaskRegistry,
47
48	initial_tasks: Vec<ScheduledTask>,
49}
50
51impl TaskSubsystem {
52	#[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
53	pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
54		let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
55		let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
56		let registry = Arc::new(DashMap::new());
57
58		Self {
59			running: AtomicBool::new(false),
60			handle: None,
61			coordinator_tx: None,
62			coordinator_handle: None,
63			runtime,
64			engine,
65			registry,
66			initial_tasks,
67		}
68	}
69
70	pub fn handle(&self) -> Option<TaskHandle> {
71		self.handle.clone()
72	}
73}
74
75impl Subsystem for TaskSubsystem {
76	fn name(&self) -> &'static str {
77		"sub-task"
78	}
79
80	#[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
81	fn start(&mut self) -> Result<()> {
82		if self.running.load(Ordering::Acquire) {
83			return Ok(());
84		}
85
86		info!("Starting task subsystem");
87
88		let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
89
90		for task in self.initial_tasks.drain(..) {
91			let next_execution = self.runtime.clock().instant() + task.schedule.initial_delay();
92			self.registry.insert(
93				task.id,
94				TaskEntry {
95					task: Arc::new(task),
96					next_execution,
97				},
98			);
99		}
100
101		let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
102
103		let registry = self.registry.clone();
104		let runtime = self.runtime.clone();
105		let engine = self.engine.clone();
106
107		let join_handle = self.runtime.spawn(async move {
108			coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
109		});
110
111		self.handle = Some(handle);
112		self.coordinator_tx = Some(coordinator_tx);
113		self.coordinator_handle = Some(join_handle);
114		self.running.store(true, Ordering::Release);
115
116		info!("Task subsystem started");
117
118		Ok(())
119	}
120
121	#[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
122	fn shutdown(&mut self) -> Result<()> {
123		if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
124			return Ok(());
125		}
126
127		info!("Shutting down task subsystem");
128
129		if let Some(coordinator_tx) = self.coordinator_tx.take() {
130			let _ = coordinator_tx.blocking_send(TaskCoordinatorMessage::Shutdown);
131		}
132
133		if let Some(join_handle) = self.coordinator_handle.take() {
134			let _ = self.runtime.block_on(join_handle);
135		}
136
137		self.handle = None;
138
139		info!("Task subsystem shut down");
140
141		Ok(())
142	}
143
144	#[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
145	fn is_running(&self) -> bool {
146		self.running.load(Ordering::Acquire)
147	}
148
149	#[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
150	fn health_status(&self) -> HealthStatus {
151		if self.is_running() {
152			HealthStatus::Healthy
153		} else {
154			HealthStatus::Unknown
155		}
156	}
157
158	fn as_any(&self) -> &dyn Any {
159		self
160	}
161
162	fn as_any_mut(&mut self) -> &mut dyn Any {
163		self
164	}
165}
166
167impl HasVersion for TaskSubsystem {
168	fn version(&self) -> SystemVersion {
169		SystemVersion {
170			name: env!("CARGO_PKG_NAME")
171				.strip_prefix("reifydb-")
172				.unwrap_or(env!("CARGO_PKG_NAME"))
173				.to_string(),
174			version: env!("CARGO_PKG_VERSION").to_string(),
175			description: "Periodic task scheduler subsystem".to_string(),
176			r#type: ComponentType::Subsystem,
177		}
178	}
179}