reifydb_sub_task/
subsystem.rs1use std::{
2 any::Any,
3 sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6 },
7 time::Instant,
8};
9
10use dashmap::DashMap;
11use mpsc::Sender;
12use reifydb_core::{
13 interface::version::{ComponentType, HasVersion, SystemVersion},
14 util::ioc::IocContainer,
15};
16use reifydb_engine::engine::StandardEngine;
17use reifydb_runtime::SharedRuntime;
18use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
19use reifydb_type::Result;
20use tokio::{sync::mpsc, task::JoinHandle};
21use tracing::{info, instrument};
22
23use crate::{
24 coordinator,
25 coordinator::CoordinatorMessage,
26 handle::TaskHandle,
27 registry::{TaskEntry, TaskRegistry},
28 task::ScheduledTask,
29};
30
31pub struct TaskSubsystem {
33 running: AtomicBool,
35 handle: Option<TaskHandle>,
37 coordinator_tx: Option<Sender<CoordinatorMessage>>,
39 coordinator_handle: Option<JoinHandle<()>>,
41 runtime: SharedRuntime,
43 engine: StandardEngine,
45 registry: TaskRegistry,
47 initial_tasks: Vec<ScheduledTask>,
49}
50
51impl TaskSubsystem {
52 #[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
54 pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
55 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
56 let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
57 let registry = Arc::new(DashMap::new());
58
59 Self {
60 running: AtomicBool::new(false),
61 handle: None,
62 coordinator_tx: None,
63 coordinator_handle: None,
64 runtime,
65 engine,
66 registry,
67 initial_tasks,
68 }
69 }
70
71 pub fn handle(&self) -> Option<TaskHandle> {
75 self.handle.clone()
76 }
77}
78
79impl Subsystem for TaskSubsystem {
80 fn name(&self) -> &'static str {
81 "sub-task"
82 }
83
84 #[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
85 fn start(&mut self) -> Result<()> {
86 if self.running.load(Ordering::Acquire) {
87 return Ok(());
89 }
90
91 info!("Starting task subsystem");
92
93 let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
95
96 for task in self.initial_tasks.drain(..) {
98 let next_execution = Instant::now() + task.schedule.initial_delay();
99 self.registry.insert(
100 task.id,
101 TaskEntry {
102 task: Arc::new(task),
103 next_execution,
104 },
105 );
106 }
107
108 let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
110
111 let registry = self.registry.clone();
113 let runtime = self.runtime.clone();
114 let engine = self.engine.clone();
115
116 let join_handle = self.runtime.spawn(async move {
117 coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
118 });
119
120 self.handle = Some(handle);
122 self.coordinator_tx = Some(coordinator_tx);
123 self.coordinator_handle = Some(join_handle);
124 self.running.store(true, Ordering::Release);
125
126 info!("Task subsystem started");
127
128 Ok(())
129 }
130
131 #[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
132 fn shutdown(&mut self) -> Result<()> {
133 if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
134 return Ok(());
136 }
137
138 info!("Shutting down task subsystem");
139
140 if let Some(coordinator_tx) = self.coordinator_tx.take() {
142 let _ = coordinator_tx.blocking_send(CoordinatorMessage::Shutdown);
143 }
144
145 if let Some(join_handle) = self.coordinator_handle.take() {
147 let _ = self.runtime.block_on(join_handle);
148 }
149
150 self.handle = None;
151
152 info!("Task subsystem shut down");
153
154 Ok(())
155 }
156
157 #[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
158 fn is_running(&self) -> bool {
159 self.running.load(Ordering::Acquire)
160 }
161
162 #[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
163 fn health_status(&self) -> HealthStatus {
164 if self.is_running() {
165 HealthStatus::Healthy
166 } else {
167 HealthStatus::Unknown
168 }
169 }
170
171 fn as_any(&self) -> &dyn Any {
172 self
173 }
174
175 fn as_any_mut(&mut self) -> &mut dyn Any {
176 self
177 }
178}
179
180impl HasVersion for TaskSubsystem {
181 fn version(&self) -> SystemVersion {
182 SystemVersion {
183 name: env!("CARGO_PKG_NAME")
184 .strip_prefix("reifydb-")
185 .unwrap_or(env!("CARGO_PKG_NAME"))
186 .to_string(),
187 version: env!("CARGO_PKG_VERSION").to_string(),
188 description: "Periodic task scheduler subsystem".to_string(),
189 r#type: ComponentType::Subsystem,
190 }
191 }
192}