reifydb_sub_task/
subsystem.rs1use std::{
5 any::Any,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Instant,
11};
12
13use dashmap::DashMap;
14use mpsc::Sender;
15use reifydb_core::{
16 interface::version::{ComponentType, HasVersion, SystemVersion},
17 util::ioc::IocContainer,
18};
19use reifydb_engine::engine::StandardEngine;
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_type::Result;
23use tokio::{sync::mpsc, task::JoinHandle};
24use tracing::{info, instrument};
25
26use crate::{
27 coordinator,
28 coordinator::CoordinatorMessage,
29 handle::TaskHandle,
30 registry::{TaskEntry, TaskRegistry},
31 task::ScheduledTask,
32};
33
34pub struct TaskSubsystem {
36 running: AtomicBool,
38 handle: Option<TaskHandle>,
40 coordinator_tx: Option<Sender<CoordinatorMessage>>,
42 coordinator_handle: Option<JoinHandle<()>>,
44 runtime: SharedRuntime,
46 engine: StandardEngine,
48 registry: TaskRegistry,
50 initial_tasks: Vec<ScheduledTask>,
52}
53
54impl TaskSubsystem {
55 #[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
57 pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
58 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
59 let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
60 let registry = Arc::new(DashMap::new());
61
62 Self {
63 running: AtomicBool::new(false),
64 handle: None,
65 coordinator_tx: None,
66 coordinator_handle: None,
67 runtime,
68 engine,
69 registry,
70 initial_tasks,
71 }
72 }
73
74 pub fn handle(&self) -> Option<TaskHandle> {
78 self.handle.clone()
79 }
80}
81
82impl Subsystem for TaskSubsystem {
83 fn name(&self) -> &'static str {
84 "sub-task"
85 }
86
87 #[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
88 fn start(&mut self) -> Result<()> {
89 if self.running.load(Ordering::Acquire) {
90 return Ok(());
92 }
93
94 info!("Starting task subsystem");
95
96 let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
98
99 for task in self.initial_tasks.drain(..) {
101 let next_execution = Instant::now() + task.schedule.initial_delay();
102 self.registry.insert(
103 task.id,
104 TaskEntry {
105 task: Arc::new(task),
106 next_execution,
107 },
108 );
109 }
110
111 let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
113
114 let registry = self.registry.clone();
116 let runtime = self.runtime.clone();
117 let engine = self.engine.clone();
118
119 let join_handle = self.runtime.spawn(async move {
120 coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
121 });
122
123 self.handle = Some(handle);
125 self.coordinator_tx = Some(coordinator_tx);
126 self.coordinator_handle = Some(join_handle);
127 self.running.store(true, Ordering::Release);
128
129 info!("Task subsystem started");
130
131 Ok(())
132 }
133
134 #[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
135 fn shutdown(&mut self) -> Result<()> {
136 if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
137 return Ok(());
139 }
140
141 info!("Shutting down task subsystem");
142
143 if let Some(coordinator_tx) = self.coordinator_tx.take() {
145 let _ = coordinator_tx.blocking_send(CoordinatorMessage::Shutdown);
146 }
147
148 if let Some(join_handle) = self.coordinator_handle.take() {
150 let _ = self.runtime.block_on(join_handle);
151 }
152
153 self.handle = None;
154
155 info!("Task subsystem shut down");
156
157 Ok(())
158 }
159
160 #[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
161 fn is_running(&self) -> bool {
162 self.running.load(Ordering::Acquire)
163 }
164
165 #[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
166 fn health_status(&self) -> HealthStatus {
167 if self.is_running() {
168 HealthStatus::Healthy
169 } else {
170 HealthStatus::Unknown
171 }
172 }
173
174 fn as_any(&self) -> &dyn Any {
175 self
176 }
177
178 fn as_any_mut(&mut self) -> &mut dyn Any {
179 self
180 }
181}
182
183impl HasVersion for TaskSubsystem {
184 fn version(&self) -> SystemVersion {
185 SystemVersion {
186 name: env!("CARGO_PKG_NAME")
187 .strip_prefix("reifydb-")
188 .unwrap_or(env!("CARGO_PKG_NAME"))
189 .to_string(),
190 version: env!("CARGO_PKG_VERSION").to_string(),
191 description: "Periodic task scheduler subsystem".to_string(),
192 r#type: ComponentType::Subsystem,
193 }
194 }
195}