reifydb_sub_task/
subsystem.rs1use std::{
5 any::Any,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 thread,
11};
12
13use dashmap::DashMap;
14use mpsc::Sender;
15use reifydb_core::{
16 interface::version::{ComponentType, HasVersion, SystemVersion},
17 util::ioc::IocContainer,
18};
19use reifydb_engine::engine::StandardEngine;
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_value::Result;
23use tokio::{sync::mpsc, task::JoinHandle};
24use tracing::{info, instrument};
25
26use crate::{
27 coordinator,
28 coordinator::TaskCoordinatorMessage,
29 handle::TaskHandle,
30 registry::{TaskEntry, TaskRegistry},
31 task::ScheduledTask,
32};
33
34pub struct TaskSubsystem {
35 running: AtomicBool,
36
37 handle: Option<TaskHandle>,
38
39 coordinator_tx: Option<Sender<TaskCoordinatorMessage>>,
40
41 coordinator_handle: Option<JoinHandle<()>>,
42
43 runtime: SharedRuntime,
44
45 engine: StandardEngine,
46
47 registry: TaskRegistry,
48
49 initial_tasks: Vec<ScheduledTask>,
50}
51
52impl TaskSubsystem {
53 #[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
54 pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
55 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
56 let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
57 let registry = Arc::new(DashMap::new());
58
59 Self {
60 running: AtomicBool::new(false),
61 handle: None,
62 coordinator_tx: None,
63 coordinator_handle: None,
64 runtime,
65 engine,
66 registry,
67 initial_tasks,
68 }
69 }
70
71 pub fn handle(&self) -> Option<TaskHandle> {
72 self.handle.clone()
73 }
74}
75
76impl Subsystem for TaskSubsystem {
77 fn name(&self) -> &'static str {
78 "sub-task"
79 }
80
81 #[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
82 fn start(&mut self) -> Result<()> {
83 if self.running.load(Ordering::Acquire) {
84 return Ok(());
85 }
86
87 info!("Starting task subsystem");
88
89 let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
90
91 for task in self.initial_tasks.drain(..) {
92 let next_execution = self.runtime.clock().instant() + task.schedule.initial_delay();
93 self.registry.insert(
94 task.id,
95 TaskEntry {
96 task: Arc::new(task),
97 next_execution,
98 },
99 );
100 }
101
102 let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
103
104 let registry = self.registry.clone();
105 let runtime = self.runtime.clone();
106 let engine = self.engine.clone();
107
108 let join_handle = self.runtime.spawn(async move {
109 coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
110 });
111
112 self.handle = Some(handle);
113 self.coordinator_tx = Some(coordinator_tx);
114 self.coordinator_handle = Some(join_handle);
115 self.running.store(true, Ordering::Release);
116
117 info!("Task subsystem started");
118
119 Ok(())
120 }
121
122 #[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
123 fn shutdown(&mut self) -> Result<()> {
124 if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
125 return Ok(());
126 }
127
128 info!("Shutting down task subsystem");
129
130 let coordinator_tx = self.coordinator_tx.take();
131 let coordinator_handle = self.coordinator_handle.take();
132 let runtime = self.runtime.clone();
133 let worker = thread::spawn(move || {
134 if let Some(coordinator_tx) = coordinator_tx {
135 let _ = coordinator_tx.blocking_send(TaskCoordinatorMessage::Shutdown);
136 }
137 if let Some(join_handle) = coordinator_handle {
138 let _ = runtime.block_on(join_handle);
139 }
140 });
141 let _ = worker.join();
142
143 self.handle = None;
144
145 info!("Task subsystem shut down");
146
147 Ok(())
148 }
149
150 #[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
151 fn is_running(&self) -> bool {
152 self.running.load(Ordering::Acquire)
153 }
154
155 #[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
156 fn health_status(&self) -> HealthStatus {
157 if self.is_running() {
158 HealthStatus::Healthy
159 } else {
160 HealthStatus::Unknown
161 }
162 }
163
164 fn as_any(&self) -> &dyn Any {
165 self
166 }
167
168 fn as_any_mut(&mut self) -> &mut dyn Any {
169 self
170 }
171}
172
173impl HasVersion for TaskSubsystem {
174 fn version(&self) -> SystemVersion {
175 SystemVersion {
176 name: env!("CARGO_PKG_NAME")
177 .strip_prefix("reifydb-")
178 .unwrap_or(env!("CARGO_PKG_NAME"))
179 .to_string(),
180 version: env!("CARGO_PKG_VERSION").to_string(),
181 description: "Periodic task scheduler subsystem".to_string(),
182 r#type: ComponentType::Subsystem,
183 }
184 }
185}