hyperclock/
engine.rs

1//! The core engine that orchestrates the entire Hyperclock system.
2
3use crate::ENGINE_NAME; 
4use colored::Colorize;
5use crate::common::{ListenerId, PhaseId, TaskId};
6use crate::components::task::{LifecycleLoop, LifecycleStep, RepetitionPolicy};
7use crate::components::watcher::{ConditionalWatcher, GongWatcher, IntervalWatcher};
8use crate::config::HyperclockConfig;
9use crate::events::{
10    AutomationEvent, ConditionalEvent, GongEvent, PhaseEvent, SystemEvent, TaskEvent, UserEvent,
11};
12use crate::time::{SystemClock, TickEvent};
13use slotmap::SlotMap;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{broadcast, RwLock};
18use tracing::{error, info, trace};
19
20/// The main Hyperclock engine.
21///
22/// This struct is the central point of control. It holds the system's configuration,
23/// manages all active listeners and tasks, and drives the event loop. The `Engine`
24/// is designed to be cloned and shared across tasks, providing a handle to the
25/// running instance.
26#[derive(Clone)]
27pub struct HyperclockEngine {
28    config: Arc<HyperclockConfig>,
29    tick_sender: broadcast::Sender<Arc<TickEvent>>,
30    phase_sender: broadcast::Sender<PhaseEvent>,
31    system_event_sender: broadcast::Sender<SystemEvent>,
32    gong_event_sender: broadcast::Sender<GongEvent>,
33    task_event_sender: broadcast::Sender<TaskEvent>,
34    automation_event_sender: broadcast::Sender<AutomationEvent>,
35    conditional_event_sender: broadcast::Sender<ConditionalEvent>,
36    user_event_sender: broadcast::Sender<UserEvent>,
37    interval_watchers: Arc<RwLock<SlotMap<ListenerId, IntervalWatcher>>>,
38    gong_watchers: Arc<RwLock<SlotMap<ListenerId, GongWatcher>>>,
39    conditional_watchers: Arc<RwLock<SlotMap<ListenerId, ConditionalWatcher>>>,
40    lifecycle_loops: Arc<RwLock<SlotMap<TaskId, LifecycleLoop>>>,
41    lifecycle_triggers: Arc<RwLock<HashMap<ListenerId, TaskId>>>,
42}
43
44// Core implementation block for internal logic.
45impl HyperclockEngine {
46    /// Creates a new `HyperclockEngine` with the given configuration.
47    pub fn new(config: HyperclockConfig) -> Self {
48        const CHANNEL_CAPACITY: usize = 256;
49        let (tick_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
50        let (phase_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
51        let (system_event_sender, _) = broadcast::channel(64);
52        let (gong_event_sender, _) = broadcast::channel(64);
53        let (task_event_sender, _) = broadcast::channel(64);
54        let (automation_event_sender, _) = broadcast::channel(64);
55        let (conditional_event_sender, _) = broadcast::channel(64);
56        let (user_event_sender, _) = broadcast::channel(64);
57
58        let config_arc = Arc::new(config);
59        let gong_config_arc = Arc::new(config_arc.gong_config.clone());
60        let mut gong_watchers = SlotMap::with_key();
61        gong_watchers.insert(GongWatcher::new(gong_config_arc));
62
63        Self {
64            config: config_arc,
65            tick_sender,
66            phase_sender,
67            system_event_sender,
68            gong_event_sender,
69            task_event_sender,
70            automation_event_sender,
71            conditional_event_sender,
72            user_event_sender,
73            interval_watchers: Arc::new(RwLock::new(SlotMap::with_key())),
74            gong_watchers: Arc::new(RwLock::new(gong_watchers)),
75            conditional_watchers: Arc::new(RwLock::new(SlotMap::with_key())),
76            lifecycle_loops: Arc::new(RwLock::new(SlotMap::with_key())),
77            lifecycle_triggers: Arc::new(RwLock::new(HashMap::new())),
78        }
79    }
80
81    /// Runs the engine's main loop until a shutdown signal is received.
82    ///
83    /// This method will:
84    /// 1. Spawn the `SystemClock` task.
85    /// 2. Spawn the main dispatcher task that listens for ticks and fires events.
86    /// 3. Wait for a Ctrl+C signal to initiate a graceful shutdown.
87    pub async fn run(&self) -> anyhow::Result<()> {
88        info!("{} starting up...", ENGINE_NAME.cyan());
89        let (shutdown_tx, _) = broadcast::channel(1);
90
91        let clock = SystemClock::new(self.config.resolution.clone(), self.tick_sender.clone());
92        let clock_shutdown_rx = shutdown_tx.subscribe();
93        tokio::spawn(async move { clock.run(clock_shutdown_rx).await });
94
95        let dispatcher = self.clone();
96        let dispatcher_shutdown_rx = shutdown_tx.subscribe();
97        tokio::spawn(async move { dispatcher.dispatcher_loop(dispatcher_shutdown_rx).await });
98
99        info!(
100            "{} running at {:?}. Press Ctrl+C to shut down.",
101            ENGINE_NAME.cyan(), self.config.resolution
102        );
103        tokio::signal::ctrl_c().await?;
104
105        info!("Shutdown signal received. Broadcasting to all tasks...");
106        if shutdown_tx.send(()).is_err() {
107            error!("Failed to send shutdown signal. Some tasks may not terminate gracefully.");
108        }
109        tokio::time::sleep(Duration::from_millis(50)).await;
110        self.system_event_sender
111            .send(SystemEvent::EngineShutdown)
112            .ok();
113        info!("{} has shut down.", ENGINE_NAME.cyan());
114        Ok(())
115    }
116
117    #[doc(hidden)]
118    async fn dispatcher_loop(self, mut shutdown_rx: broadcast::Receiver<()>) {
119        let mut tick_rx = self.tick_sender.subscribe();
120        let mut task_rx = self.task_event_sender.subscribe();
121        self.system_event_sender
122            .send(SystemEvent::EngineStarted {
123                timestamp: tokio::time::Instant::now(),
124            })
125            .ok();
126        loop {
127            tokio::select! {
128                biased;
129                _ = shutdown_rx.recv() => break,
130                Ok(tick) = tick_rx.recv() => {
131                    trace!("Tick #{} received.", tick.tick_count);
132                    self.process_tick_watchers(&tick).await;
133                    for phase_config in self.config.phases.iter() {
134                        let phase_event = PhaseEvent { phase: phase_config.id, tick: tick.clone() };
135                        self.process_phase_watchers(&phase_event).await;
136                        self.phase_sender.send(phase_event).ok();
137                    }
138                }
139                Ok(task_event) = task_rx.recv() => {
140                    if let TaskEvent::TaskFired { listener_id, .. } = task_event {
141                         self.process_lifecycle_trigger(listener_id).await;
142                    }
143                }
144            }
145        }
146    }
147
148    #[doc(hidden)]
149    async fn process_phase_watchers(&self, phase_event: &PhaseEvent) {
150        let mut interval_watchers = self.interval_watchers.write().await;
151        for (id, watcher) in interval_watchers.iter_mut() {
152            if watcher.process_phase(phase_event.phase) {
153                self.task_event_sender
154                    .send(TaskEvent::TaskFired {
155                        listener_id: id,
156                        tick: phase_event.tick.clone(),
157                    })
158                    .ok();
159            }
160        }
161    }
162
163    #[doc(hidden)]
164    async fn process_tick_watchers(&self, tick: &Arc<TickEvent>) {
165        let mut conditional_watchers = self.conditional_watchers.write().await;
166        let mut fired_one_shots = Vec::new();
167        for (id, watcher) in conditional_watchers.iter_mut() {
168            if watcher.check_and_fire() {
169                self.conditional_event_sender
170                    .send(ConditionalEvent {
171                        condition_id: id,
172                        timestamp: tick.timestamp,
173                    })
174                    .ok();
175                if watcher.is_one_shot {
176                    fired_one_shots.push(id);
177                }
178            }
179        }
180        for id in fired_one_shots {
181            if conditional_watchers.remove(id).is_some() {
182                self.system_event_sender
183                    .send(SystemEvent::ListenerRemoved { id })
184                    .ok();
185            }
186        }
187        let mut gong_watchers = self.gong_watchers.write().await;
188        for (_id, watcher) in gong_watchers.iter_mut() {
189            watcher.process_tick(tick, &self.gong_event_sender);
190        }
191    }
192
193    #[doc(hidden)]
194    async fn process_lifecycle_trigger(&self, interval_listener_id: ListenerId) {
195        let lifecycle_id = self
196            .lifecycle_triggers
197            .read()
198            .await
199            .get(&interval_listener_id)
200            .copied();
201        if let Some(id) = lifecycle_id {
202            let mut loops = self.lifecycle_loops.write().await;
203            let mut should_remove = false;
204            if let Some(lifecycle) = loops.get_mut(id) {
205                if lifecycle.advance(&self.automation_event_sender) {
206                    should_remove = true;
207                }
208            }
209            if should_remove {
210                if let Some(removed_loop) = loops.remove(id) {
211                    self.remove_interval_listener(removed_loop.listener_id).await;
212                    self.lifecycle_triggers
213                        .write()
214                        .await
215                        .remove(&removed_loop.listener_id);
216                }
217            }
218        }
219    }
220}
221
222// Public API implementation block.
223impl HyperclockEngine {
224    /// Registers a task to be executed at a regular interval.
225    ///
226    /// The task's interval timer will only advance during the specified `phase`.
227    /// The provided `task_logic` closure will be executed each time the interval elapses.
228    ///
229    /// # Arguments
230    /// * `phase_to_watch` - The `PhaseId` during which this interval is active.
231    /// * `interval` - The `Duration` between task executions.
232    /// * `task_logic` - A closure to execute when the interval fires.
233    ///
234    /// # Returns
235    /// A `ListenerId` which can be used to later remove this watcher.
236    pub async fn on_interval(
237        &self,
238        phase_to_watch: PhaseId,
239        interval: Duration,
240        task_logic: impl FnMut() + Send + Sync + 'static,
241    ) -> ListenerId {
242        let watcher = IntervalWatcher::new(phase_to_watch, interval, Box::new(task_logic));
243        let mut watchers = self.interval_watchers.write().await;
244        let id = watchers.insert(watcher);
245        self.system_event_sender
246            .send(SystemEvent::ListenerAdded { id })
247            .ok();
248        id
249    }
250
251    /// Registers a task to be executed whenever a given condition is met.
252    ///
253    /// The `condition` closure is checked on every tick of the engine. If it returns `true`,
254    /// the `task_logic` closure is executed.
255    ///
256    /// # Arguments
257    /// * `condition` - A closure that returns `true` when the task should fire.
258    /// * `task_logic` - A closure to execute when the condition is met.
259    /// * `is_one_shot` - If true, the watcher will be automatically removed after firing once.
260    ///
261    /// # Returns
262    /// A `ListenerId` which can be used to later remove this watcher.
263    pub async fn on_conditional(
264        &self,
265        condition: impl Fn() -> bool + Send + Sync + 'static,
266        task_logic: impl FnMut() + Send + Sync + 'static,
267        is_one_shot: bool,
268    ) -> ListenerId {
269        let watcher =
270            ConditionalWatcher::new(Box::new(condition), Box::new(task_logic), is_one_shot);
271        let mut watchers = self.conditional_watchers.write().await;
272        let id = watchers.insert(watcher);
273        self.system_event_sender
274            .send(SystemEvent::ListenerAdded { id })
275            .ok();
276        id
277    }
278
279    /// Adds a new `LifecycleLoop` to the engine.
280    ///
281    /// This creates a complex automation that executes a sequence of steps,
282    /// with each step advancing after the specified `interval`.
283    ///
284    /// # Returns
285    /// A `TaskId` for the created lifecycle loop.
286    pub async fn add_lifecycle_loop(
287        &self,
288        phase_to_watch: PhaseId,
289        interval: Duration,
290        steps: Vec<LifecycleStep>,
291        repetition_policy: RepetitionPolicy,
292    ) -> TaskId {
293        // This interval watcher exists solely to drive the lifecycle loop.
294        let interval_watcher_id = self
295            .on_interval(phase_to_watch, interval, || {})
296            .await;
297        let mut loops = self.lifecycle_loops.write().await;
298        let lifecycle_id = loops.insert_with_key(|key| {
299            self.automation_event_sender
300                .send(AutomationEvent::LifecycleStarted { id: key })
301                .ok();
302            LifecycleLoop::new(key, interval_watcher_id, steps, repetition_policy)
303        });
304        self.lifecycle_triggers
305            .write()
306            .await
307            .insert(interval_watcher_id, lifecycle_id);
308        lifecycle_id
309    }
310
311    /// Removes an interval listener from the engine.
312    ///
313    /// Returns `true` if the listener was found and removed.
314    pub async fn remove_interval_listener(&self, id: ListenerId) -> bool {
315        let was_removed = self.interval_watchers.write().await.remove(id).is_some();
316        if was_removed {
317            self.system_event_sender
318                .send(SystemEvent::ListenerRemoved { id })
319                .ok();
320        }
321        was_removed
322    }
323
324    /// Removes a conditional listener from the engine.
325    ///
326    /// Returns `true` if the listener was found and removed.
327    pub async fn remove_conditional_listener(&self, id: ListenerId) -> bool {
328        let was_removed = self.conditional_watchers.write().await.remove(id).is_some();
329        if was_removed {
330            self.system_event_sender
331                .send(SystemEvent::ListenerRemoved { id })
332                .ok();
333        }
334        was_removed
335    }
336
337    /// Removes a lifecycle loop from the engine.
338    ///
339    /// This also removes the internal interval watcher that drives the loop.
340    /// Returns `true` if the loop was found and removed.
341    pub async fn remove_lifecycle_loop(&self, id: TaskId) -> bool {
342        if let Some(removed_loop) = self.lifecycle_loops.write().await.remove(id) {
343            self.remove_interval_listener(removed_loop.listener_id)
344                .await;
345            self.lifecycle_triggers
346                .write()
347                .await
348                .remove(&removed_loop.listener_id);
349            true
350        } else {
351            false
352        }
353    }
354
355    /// Subscribes to the raw `TickEvent` stream.
356    ///
357    /// This provides access to the highest-frequency event in the engine, firing
358    /// directly from the `SystemClock` before phase processing. This is a
359    /// power-user feature for tasks that need to react to every single tick
360    /// without regard for the phase cycle.
361    ///
362    /// Most users should prefer `subscribe_phase_events`.
363    pub fn subscribe_tick_events(&self) -> broadcast::Receiver<Arc<TickEvent>> {
364        self.tick_sender.subscribe()
365    }
366
367    /// Subscribes to the `SystemEvent` stream.
368    pub fn subscribe_system_events(&self) -> broadcast::Receiver<SystemEvent> {
369        self.system_event_sender.subscribe()
370    }
371
372    /// Subscribes to the `PhaseEvent` stream.
373    pub fn subscribe_phase_events(&self) -> broadcast::Receiver<PhaseEvent> {
374        self.phase_sender.subscribe()
375    }
376
377    /// Subscribes to the `GongEvent` stream.
378    pub fn subscribe_gong_events(&self) -> broadcast::Receiver<GongEvent> {
379        self.gong_event_sender.subscribe()
380    }
381
382    /// Subscribes to the `TaskEvent` stream.
383    pub fn subscribe_task_events(&self) -> broadcast::Receiver<TaskEvent> {
384        self.task_event_sender.subscribe()
385    }
386
387    /// Subscribes to the `AutomationEvent` stream.
388    pub fn subscribe_automation_events(&self) -> broadcast::Receiver<AutomationEvent> {
389        self.automation_event_sender.subscribe()
390    }
391
392    /// Subscribes to the `ConditionalEvent` stream.
393    pub fn subscribe_conditional_events(&self) -> broadcast::Receiver<ConditionalEvent> {
394        self.conditional_event_sender.subscribe()
395    }
396
397    /// Subscribes to the `UserEvent` stream.
398    pub fn subscribe_user_events(&self) -> broadcast::Receiver<UserEvent> {
399        self.user_event_sender.subscribe()
400    }
401
402    /// Broadcasts a custom `UserEvent` to all subscribers.
403    pub fn broadcast_user_event(&self, event: UserEvent) {
404        self.user_event_sender.send(event).ok();
405    }
406}