1use crate::ENGINE_NAME;
4use colored::Colorize;
5use crate::common::{ListenerId, PhaseId, TaskId};
6use crate::components::task::{LifecycleLoop, LifecycleStep, RepetitionPolicy};
7use crate::components::watcher::{ConditionalWatcher, GongWatcher, IntervalWatcher};
8use crate::config::HyperclockConfig;
9use crate::events::{
10 AutomationEvent, ConditionalEvent, GongEvent, PhaseEvent, SystemEvent, TaskEvent, UserEvent,
11};
12use crate::time::{SystemClock, TickEvent};
13use slotmap::SlotMap;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{broadcast, RwLock};
18use tracing::{error, info, trace};
19
20#[derive(Clone)]
27pub struct HyperclockEngine {
28 config: Arc<HyperclockConfig>,
29 tick_sender: broadcast::Sender<Arc<TickEvent>>,
30 phase_sender: broadcast::Sender<PhaseEvent>,
31 system_event_sender: broadcast::Sender<SystemEvent>,
32 gong_event_sender: broadcast::Sender<GongEvent>,
33 task_event_sender: broadcast::Sender<TaskEvent>,
34 automation_event_sender: broadcast::Sender<AutomationEvent>,
35 conditional_event_sender: broadcast::Sender<ConditionalEvent>,
36 user_event_sender: broadcast::Sender<UserEvent>,
37 interval_watchers: Arc<RwLock<SlotMap<ListenerId, IntervalWatcher>>>,
38 gong_watchers: Arc<RwLock<SlotMap<ListenerId, GongWatcher>>>,
39 conditional_watchers: Arc<RwLock<SlotMap<ListenerId, ConditionalWatcher>>>,
40 lifecycle_loops: Arc<RwLock<SlotMap<TaskId, LifecycleLoop>>>,
41 lifecycle_triggers: Arc<RwLock<HashMap<ListenerId, TaskId>>>,
42}
43
44impl HyperclockEngine {
46 pub fn new(config: HyperclockConfig) -> Self {
48 const CHANNEL_CAPACITY: usize = 256;
49 let (tick_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
50 let (phase_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
51 let (system_event_sender, _) = broadcast::channel(64);
52 let (gong_event_sender, _) = broadcast::channel(64);
53 let (task_event_sender, _) = broadcast::channel(64);
54 let (automation_event_sender, _) = broadcast::channel(64);
55 let (conditional_event_sender, _) = broadcast::channel(64);
56 let (user_event_sender, _) = broadcast::channel(64);
57
58 let config_arc = Arc::new(config);
59 let gong_config_arc = Arc::new(config_arc.gong_config.clone());
60 let mut gong_watchers = SlotMap::with_key();
61 gong_watchers.insert(GongWatcher::new(gong_config_arc));
62
63 Self {
64 config: config_arc,
65 tick_sender,
66 phase_sender,
67 system_event_sender,
68 gong_event_sender,
69 task_event_sender,
70 automation_event_sender,
71 conditional_event_sender,
72 user_event_sender,
73 interval_watchers: Arc::new(RwLock::new(SlotMap::with_key())),
74 gong_watchers: Arc::new(RwLock::new(gong_watchers)),
75 conditional_watchers: Arc::new(RwLock::new(SlotMap::with_key())),
76 lifecycle_loops: Arc::new(RwLock::new(SlotMap::with_key())),
77 lifecycle_triggers: Arc::new(RwLock::new(HashMap::new())),
78 }
79 }
80
81 pub async fn run(&self) -> anyhow::Result<()> {
88 info!("{} starting up...", ENGINE_NAME.cyan());
89 let (shutdown_tx, _) = broadcast::channel(1);
90
91 let clock = SystemClock::new(self.config.resolution.clone(), self.tick_sender.clone());
92 let clock_shutdown_rx = shutdown_tx.subscribe();
93 tokio::spawn(async move { clock.run(clock_shutdown_rx).await });
94
95 let dispatcher = self.clone();
96 let dispatcher_shutdown_rx = shutdown_tx.subscribe();
97 tokio::spawn(async move { dispatcher.dispatcher_loop(dispatcher_shutdown_rx).await });
98
99 info!(
100 "{} running at {:?}. Press Ctrl+C to shut down.",
101 ENGINE_NAME.cyan(), self.config.resolution
102 );
103 tokio::signal::ctrl_c().await?;
104
105 info!("Shutdown signal received. Broadcasting to all tasks...");
106 if shutdown_tx.send(()).is_err() {
107 error!("Failed to send shutdown signal. Some tasks may not terminate gracefully.");
108 }
109 tokio::time::sleep(Duration::from_millis(50)).await;
110 self.system_event_sender
111 .send(SystemEvent::EngineShutdown)
112 .ok();
113 info!("{} has shut down.", ENGINE_NAME.cyan());
114 Ok(())
115 }
116
117 #[doc(hidden)]
118 async fn dispatcher_loop(self, mut shutdown_rx: broadcast::Receiver<()>) {
119 let mut tick_rx = self.tick_sender.subscribe();
120 let mut task_rx = self.task_event_sender.subscribe();
121 self.system_event_sender
122 .send(SystemEvent::EngineStarted {
123 timestamp: tokio::time::Instant::now(),
124 })
125 .ok();
126 loop {
127 tokio::select! {
128 biased;
129 _ = shutdown_rx.recv() => break,
130 Ok(tick) = tick_rx.recv() => {
131 trace!("Tick #{} received.", tick.tick_count);
132 self.process_tick_watchers(&tick).await;
133 for phase_config in self.config.phases.iter() {
134 let phase_event = PhaseEvent { phase: phase_config.id, tick: tick.clone() };
135 self.process_phase_watchers(&phase_event).await;
136 self.phase_sender.send(phase_event).ok();
137 }
138 }
139 Ok(task_event) = task_rx.recv() => {
140 if let TaskEvent::TaskFired { listener_id, .. } = task_event {
141 self.process_lifecycle_trigger(listener_id).await;
142 }
143 }
144 }
145 }
146 }
147
148 #[doc(hidden)]
149 async fn process_phase_watchers(&self, phase_event: &PhaseEvent) {
150 let mut interval_watchers = self.interval_watchers.write().await;
151 for (id, watcher) in interval_watchers.iter_mut() {
152 if watcher.process_phase(phase_event.phase) {
153 self.task_event_sender
154 .send(TaskEvent::TaskFired {
155 listener_id: id,
156 tick: phase_event.tick.clone(),
157 })
158 .ok();
159 }
160 }
161 }
162
163 #[doc(hidden)]
164 async fn process_tick_watchers(&self, tick: &Arc<TickEvent>) {
165 let mut conditional_watchers = self.conditional_watchers.write().await;
166 let mut fired_one_shots = Vec::new();
167 for (id, watcher) in conditional_watchers.iter_mut() {
168 if watcher.check_and_fire() {
169 self.conditional_event_sender
170 .send(ConditionalEvent {
171 condition_id: id,
172 timestamp: tick.timestamp,
173 })
174 .ok();
175 if watcher.is_one_shot {
176 fired_one_shots.push(id);
177 }
178 }
179 }
180 for id in fired_one_shots {
181 if conditional_watchers.remove(id).is_some() {
182 self.system_event_sender
183 .send(SystemEvent::ListenerRemoved { id })
184 .ok();
185 }
186 }
187 let mut gong_watchers = self.gong_watchers.write().await;
188 for (_id, watcher) in gong_watchers.iter_mut() {
189 watcher.process_tick(tick, &self.gong_event_sender);
190 }
191 }
192
193 #[doc(hidden)]
194 async fn process_lifecycle_trigger(&self, interval_listener_id: ListenerId) {
195 let lifecycle_id = self
196 .lifecycle_triggers
197 .read()
198 .await
199 .get(&interval_listener_id)
200 .copied();
201 if let Some(id) = lifecycle_id {
202 let mut loops = self.lifecycle_loops.write().await;
203 let mut should_remove = false;
204 if let Some(lifecycle) = loops.get_mut(id) {
205 if lifecycle.advance(&self.automation_event_sender) {
206 should_remove = true;
207 }
208 }
209 if should_remove {
210 if let Some(removed_loop) = loops.remove(id) {
211 self.remove_interval_listener(removed_loop.listener_id).await;
212 self.lifecycle_triggers
213 .write()
214 .await
215 .remove(&removed_loop.listener_id);
216 }
217 }
218 }
219 }
220}
221
222impl HyperclockEngine {
224 pub async fn on_interval(
237 &self,
238 phase_to_watch: PhaseId,
239 interval: Duration,
240 task_logic: impl FnMut() + Send + Sync + 'static,
241 ) -> ListenerId {
242 let watcher = IntervalWatcher::new(phase_to_watch, interval, Box::new(task_logic));
243 let mut watchers = self.interval_watchers.write().await;
244 let id = watchers.insert(watcher);
245 self.system_event_sender
246 .send(SystemEvent::ListenerAdded { id })
247 .ok();
248 id
249 }
250
251 pub async fn on_conditional(
264 &self,
265 condition: impl Fn() -> bool + Send + Sync + 'static,
266 task_logic: impl FnMut() + Send + Sync + 'static,
267 is_one_shot: bool,
268 ) -> ListenerId {
269 let watcher =
270 ConditionalWatcher::new(Box::new(condition), Box::new(task_logic), is_one_shot);
271 let mut watchers = self.conditional_watchers.write().await;
272 let id = watchers.insert(watcher);
273 self.system_event_sender
274 .send(SystemEvent::ListenerAdded { id })
275 .ok();
276 id
277 }
278
279 pub async fn add_lifecycle_loop(
287 &self,
288 phase_to_watch: PhaseId,
289 interval: Duration,
290 steps: Vec<LifecycleStep>,
291 repetition_policy: RepetitionPolicy,
292 ) -> TaskId {
293 let interval_watcher_id = self
295 .on_interval(phase_to_watch, interval, || {})
296 .await;
297 let mut loops = self.lifecycle_loops.write().await;
298 let lifecycle_id = loops.insert_with_key(|key| {
299 self.automation_event_sender
300 .send(AutomationEvent::LifecycleStarted { id: key })
301 .ok();
302 LifecycleLoop::new(key, interval_watcher_id, steps, repetition_policy)
303 });
304 self.lifecycle_triggers
305 .write()
306 .await
307 .insert(interval_watcher_id, lifecycle_id);
308 lifecycle_id
309 }
310
311 pub async fn remove_interval_listener(&self, id: ListenerId) -> bool {
315 let was_removed = self.interval_watchers.write().await.remove(id).is_some();
316 if was_removed {
317 self.system_event_sender
318 .send(SystemEvent::ListenerRemoved { id })
319 .ok();
320 }
321 was_removed
322 }
323
324 pub async fn remove_conditional_listener(&self, id: ListenerId) -> bool {
328 let was_removed = self.conditional_watchers.write().await.remove(id).is_some();
329 if was_removed {
330 self.system_event_sender
331 .send(SystemEvent::ListenerRemoved { id })
332 .ok();
333 }
334 was_removed
335 }
336
337 pub async fn remove_lifecycle_loop(&self, id: TaskId) -> bool {
342 if let Some(removed_loop) = self.lifecycle_loops.write().await.remove(id) {
343 self.remove_interval_listener(removed_loop.listener_id)
344 .await;
345 self.lifecycle_triggers
346 .write()
347 .await
348 .remove(&removed_loop.listener_id);
349 true
350 } else {
351 false
352 }
353 }
354
355 pub fn subscribe_tick_events(&self) -> broadcast::Receiver<Arc<TickEvent>> {
364 self.tick_sender.subscribe()
365 }
366
367 pub fn subscribe_system_events(&self) -> broadcast::Receiver<SystemEvent> {
369 self.system_event_sender.subscribe()
370 }
371
372 pub fn subscribe_phase_events(&self) -> broadcast::Receiver<PhaseEvent> {
374 self.phase_sender.subscribe()
375 }
376
377 pub fn subscribe_gong_events(&self) -> broadcast::Receiver<GongEvent> {
379 self.gong_event_sender.subscribe()
380 }
381
382 pub fn subscribe_task_events(&self) -> broadcast::Receiver<TaskEvent> {
384 self.task_event_sender.subscribe()
385 }
386
387 pub fn subscribe_automation_events(&self) -> broadcast::Receiver<AutomationEvent> {
389 self.automation_event_sender.subscribe()
390 }
391
392 pub fn subscribe_conditional_events(&self) -> broadcast::Receiver<ConditionalEvent> {
394 self.conditional_event_sender.subscribe()
395 }
396
397 pub fn subscribe_user_events(&self) -> broadcast::Receiver<UserEvent> {
399 self.user_event_sender.subscribe()
400 }
401
402 pub fn broadcast_user_event(&self, event: UserEvent) {
404 self.user_event_sender.send(event).ok();
405 }
406}