reifydb_sub_task/
subsystem.rs1use std::{
5 any::Any,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10};
11
12use dashmap::DashMap;
13use mpsc::Sender;
14use reifydb_core::{
15 interface::version::{ComponentType, HasVersion, SystemVersion},
16 util::ioc::IocContainer,
17};
18use reifydb_engine::engine::StandardEngine;
19use reifydb_runtime::SharedRuntime;
20use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
21use reifydb_type::Result;
22use tokio::{sync::mpsc, task::JoinHandle};
23use tracing::{info, instrument};
24
25use crate::{
26 coordinator,
27 coordinator::TaskCoordinatorMessage,
28 handle::TaskHandle,
29 registry::{TaskEntry, TaskRegistry},
30 task::ScheduledTask,
31};
32
33pub struct TaskSubsystem {
34 running: AtomicBool,
35
36 handle: Option<TaskHandle>,
37
38 coordinator_tx: Option<Sender<TaskCoordinatorMessage>>,
39
40 coordinator_handle: Option<JoinHandle<()>>,
41
42 runtime: SharedRuntime,
43
44 engine: StandardEngine,
45
46 registry: TaskRegistry,
47
48 initial_tasks: Vec<ScheduledTask>,
49}
50
51impl TaskSubsystem {
52 #[instrument(name = "task::subsystem::new", level = "debug", skip(ioc, initial_tasks))]
53 pub fn new(ioc: &IocContainer, initial_tasks: Vec<ScheduledTask>) -> Self {
54 let runtime = ioc.resolve::<SharedRuntime>().expect("SharedRuntime not registered in IoC");
55 let engine = ioc.resolve::<StandardEngine>().expect("StandardEngine not registered in IoC");
56 let registry = Arc::new(DashMap::new());
57
58 Self {
59 running: AtomicBool::new(false),
60 handle: None,
61 coordinator_tx: None,
62 coordinator_handle: None,
63 runtime,
64 engine,
65 registry,
66 initial_tasks,
67 }
68 }
69
70 pub fn handle(&self) -> Option<TaskHandle> {
71 self.handle.clone()
72 }
73}
74
75impl Subsystem for TaskSubsystem {
76 fn name(&self) -> &'static str {
77 "sub-task"
78 }
79
80 #[instrument(name = "task::subsystem::start", level = "debug", skip(self))]
81 fn start(&mut self) -> Result<()> {
82 if self.running.load(Ordering::Acquire) {
83 return Ok(());
84 }
85
86 info!("Starting task subsystem");
87
88 let (coordinator_tx, coordinator_rx) = mpsc::channel(100);
89
90 for task in self.initial_tasks.drain(..) {
91 let next_execution = self.runtime.clock().instant() + task.schedule.initial_delay();
92 self.registry.insert(
93 task.id,
94 TaskEntry {
95 task: Arc::new(task),
96 next_execution,
97 },
98 );
99 }
100
101 let handle = TaskHandle::new(self.registry.clone(), coordinator_tx.clone());
102
103 let registry = self.registry.clone();
104 let runtime = self.runtime.clone();
105 let engine = self.engine.clone();
106
107 let join_handle = self.runtime.spawn(async move {
108 coordinator::run_coordinator(registry, coordinator_rx, runtime, engine).await;
109 });
110
111 self.handle = Some(handle);
112 self.coordinator_tx = Some(coordinator_tx);
113 self.coordinator_handle = Some(join_handle);
114 self.running.store(true, Ordering::Release);
115
116 info!("Task subsystem started");
117
118 Ok(())
119 }
120
121 #[instrument(name = "task::subsystem::shutdown", level = "debug", skip(self))]
122 fn shutdown(&mut self) -> Result<()> {
123 if self.running.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_err() {
124 return Ok(());
125 }
126
127 info!("Shutting down task subsystem");
128
129 if let Some(coordinator_tx) = self.coordinator_tx.take() {
130 let _ = coordinator_tx.blocking_send(TaskCoordinatorMessage::Shutdown);
131 }
132
133 if let Some(join_handle) = self.coordinator_handle.take() {
134 let _ = self.runtime.block_on(join_handle);
135 }
136
137 self.handle = None;
138
139 info!("Task subsystem shut down");
140
141 Ok(())
142 }
143
144 #[instrument(name = "task::subsystem::is_running", level = "trace", skip(self))]
145 fn is_running(&self) -> bool {
146 self.running.load(Ordering::Acquire)
147 }
148
149 #[instrument(name = "task::subsystem::health_status", level = "debug", skip(self))]
150 fn health_status(&self) -> HealthStatus {
151 if self.is_running() {
152 HealthStatus::Healthy
153 } else {
154 HealthStatus::Unknown
155 }
156 }
157
158 fn as_any(&self) -> &dyn Any {
159 self
160 }
161
162 fn as_any_mut(&mut self) -> &mut dyn Any {
163 self
164 }
165}
166
167impl HasVersion for TaskSubsystem {
168 fn version(&self) -> SystemVersion {
169 SystemVersion {
170 name: env!("CARGO_PKG_NAME")
171 .strip_prefix("reifydb-")
172 .unwrap_or(env!("CARGO_PKG_NAME"))
173 .to_string(),
174 version: env!("CARGO_PKG_VERSION").to_string(),
175 description: "Periodic task scheduler subsystem".to_string(),
176 r#type: ComponentType::Subsystem,
177 }
178 }
179}