Skip to main content

ftui_runtime/
subscription.rs

1#![forbid(unsafe_code)]
2
3//! Subscription system for continuous event sources.
4//!
5//! Subscriptions provide a declarative way to receive events from external
6//! sources like timers, file watchers, or network connections. The runtime
7//! manages subscription lifecycles automatically based on what the model
8//! declares as active.
9//!
10//! # How it works
11//!
12//! 1. `Model::subscriptions()` returns the set of active subscriptions
13//! 2. After each `update()`, the runtime compares active vs previous subscriptions
14//! 3. New subscriptions are started, removed ones are stopped
15//! 4. Subscription messages are routed through `Model::update()`
16
17use std::collections::HashSet;
18use std::sync::mpsc;
19use std::thread;
20use web_time::{Duration, Instant};
21
22/// A unique identifier for a subscription.
23///
24/// Used by the runtime to track which subscriptions are active and
25/// to deduplicate subscriptions across update cycles.
26pub type SubId = u64;
27
28/// A subscription produces messages from an external event source.
29///
30/// Subscriptions run on background threads and send messages through
31/// the provided channel. The runtime manages their lifecycle.
32pub trait Subscription<M: Send + 'static>: Send {
33    /// Unique identifier for deduplication.
34    ///
35    /// Subscriptions with the same ID are considered identical.
36    /// The runtime uses this to avoid restarting unchanged subscriptions.
37    fn id(&self) -> SubId;
38
39    /// Start the subscription, sending messages through the channel.
40    ///
41    /// This is called on a background thread. Implementations should
42    /// loop and send messages until the channel is disconnected (receiver dropped)
43    /// or the stop signal is received.
44    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
45}
46
47/// Signal for stopping a subscription.
48///
49/// When the runtime stops a subscription, it sets this signal. The subscription
50/// should check it periodically and exit its run loop when set.
51#[derive(Clone)]
52pub struct StopSignal {
53    inner: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
54}
55
56impl StopSignal {
57    /// Create a new stop signal pair (signal, trigger).
58    pub(crate) fn new() -> (Self, StopTrigger) {
59        let inner = std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
60        let signal = Self {
61            inner: inner.clone(),
62        };
63        let trigger = StopTrigger { inner };
64        (signal, trigger)
65    }
66
67    /// Check if the stop signal has been triggered.
68    pub fn is_stopped(&self) -> bool {
69        let (lock, _) = &*self.inner;
70        // Recover from poisoned mutex - if a thread panicked, we still want to check the flag
71        *lock.lock().unwrap_or_else(|e| e.into_inner())
72    }
73
74    /// Wait for either the stop signal or a timeout.
75    ///
76    /// Returns `true` if stopped, `false` if timed out.
77    /// Blocks the thread efficiently using a condition variable.
78    /// Handles spurious wakeups by looping until condition met or timeout expired.
79    pub fn wait_timeout(&self, duration: Duration) -> bool {
80        let (lock, cvar) = &*self.inner;
81        // Recover from poisoned mutex - if a thread panicked, we still need to check/wait
82        let mut stopped = lock.lock().unwrap_or_else(|e| e.into_inner());
83        if *stopped {
84            return true;
85        }
86
87        let start = Instant::now();
88        let mut remaining = duration;
89
90        loop {
91            // Recover from poisoned mutex on wait_timeout as well
92            let (guard, result) = cvar
93                .wait_timeout(stopped, remaining)
94                .unwrap_or_else(|e| e.into_inner());
95            stopped = guard;
96            if *stopped {
97                return true;
98            }
99            if result.timed_out() {
100                return false;
101            }
102            // Check if we really timed out (spurious wakeup protection)
103            let elapsed = start.elapsed();
104            if elapsed >= duration {
105                return false;
106            }
107            remaining = duration - elapsed;
108        }
109    }
110}
111
112/// Trigger to stop a subscription from the runtime side.
113pub(crate) struct StopTrigger {
114    inner: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
115}
116
117impl StopTrigger {
118    /// Signal the subscription to stop.
119    pub(crate) fn stop(&self) {
120        let (lock, cvar) = &*self.inner;
121        // Recover from poisoned mutex - we must set the stop flag regardless
122        let mut stopped = lock.lock().unwrap_or_else(|e| e.into_inner());
123        *stopped = true;
124        cvar.notify_all();
125    }
126}
127
128/// A running subscription handle.
129pub(crate) struct RunningSubscription {
130    pub(crate) id: SubId,
131    trigger: StopTrigger,
132    thread: Option<thread::JoinHandle<()>>,
133}
134
135impl RunningSubscription {
136    /// Stop the subscription and join its thread.
137    pub(crate) fn stop(mut self) {
138        self.trigger.stop();
139        if let Some(handle) = self.thread.take() {
140            // Give the thread a moment to finish, but don't block forever
141            let _ = handle.join();
142        }
143    }
144}
145
146impl Drop for RunningSubscription {
147    fn drop(&mut self) {
148        self.trigger.stop();
149        // Don't join in drop to avoid blocking
150    }
151}
152
153/// Manages the lifecycle of subscriptions for a program.
154pub(crate) struct SubscriptionManager<M: Send + 'static> {
155    active: Vec<RunningSubscription>,
156    sender: mpsc::Sender<M>,
157    receiver: mpsc::Receiver<M>,
158}
159
160impl<M: Send + 'static> SubscriptionManager<M> {
161    pub(crate) fn new() -> Self {
162        let (sender, receiver) = mpsc::channel();
163        Self {
164            active: Vec::new(),
165            sender,
166            receiver,
167        }
168    }
169
170    /// Update the set of active subscriptions.
171    ///
172    /// Compares the new set against currently running subscriptions:
173    /// - Starts subscriptions that are new (ID not in active set)
174    /// - Stops subscriptions that are no longer declared (ID not in new set)
175    /// - Leaves unchanged subscriptions running
176    pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
177        let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
178        let active_count_before = self.active.len();
179
180        crate::debug_trace!(
181            "reconcile: new_ids={:?}, active_before={}",
182            new_ids,
183            active_count_before
184        );
185        tracing::trace!(
186            new_id_count = new_ids.len(),
187            active_before = active_count_before,
188            new_ids = ?new_ids,
189            "subscription reconcile starting"
190        );
191
192        // Stop subscriptions that are no longer active
193        let mut remaining = Vec::new();
194        for running in self.active.drain(..) {
195            if new_ids.contains(&running.id) {
196                remaining.push(running);
197            } else {
198                crate::debug_trace!("stopping subscription: id={}", running.id);
199                tracing::debug!(sub_id = running.id, "Stopping subscription");
200                running.stop();
201            }
202        }
203        self.active = remaining;
204
205        // Start new subscriptions
206        let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
207        for sub in subscriptions {
208            let id = sub.id();
209            if !active_ids.insert(id) {
210                continue;
211            }
212
213            crate::debug_trace!("starting subscription: id={}", id);
214            tracing::debug!(sub_id = id, "Starting subscription");
215            let (signal, trigger) = StopSignal::new();
216            let sender = self.sender.clone();
217
218            let thread = thread::spawn(move || {
219                sub.run(sender, signal);
220            });
221
222            self.active.push(RunningSubscription {
223                id,
224                trigger,
225                thread: Some(thread),
226            });
227        }
228
229        let active_count_after = self.active.len();
230        crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
231        tracing::trace!(
232            active_before = active_count_before,
233            active_after = active_count_after,
234            started = active_count_after.saturating_sub(active_count_before),
235            stopped = active_count_before.saturating_sub(active_count_after),
236            "subscription reconcile complete"
237        );
238    }
239
240    /// Drain pending messages from subscriptions.
241    pub(crate) fn drain_messages(&self) -> Vec<M> {
242        let mut messages = Vec::new();
243        while let Ok(msg) = self.receiver.try_recv() {
244            messages.push(msg);
245        }
246        messages
247    }
248
249    /// Return the number of active subscriptions.
250    #[inline]
251    pub(crate) fn active_count(&self) -> usize {
252        self.active.len()
253    }
254
255    /// Stop all running subscriptions.
256    pub(crate) fn stop_all(&mut self) {
257        for running in self.active.drain(..) {
258            running.stop();
259        }
260    }
261}
262
263impl<M: Send + 'static> Drop for SubscriptionManager<M> {
264    fn drop(&mut self) {
265        self.stop_all();
266    }
267}
268
269// --- Built-in subscriptions ---
270
271/// A subscription that fires at a fixed interval.
272///
273/// # Example
274///
275/// ```ignore
276/// fn subscriptions(&self) -> Vec<Box<dyn Subscription<MyMsg>>> {
277///     vec![Box::new(Every::new(Duration::from_secs(1), || MyMsg::Tick))]
278/// }
279/// ```
280pub struct Every<M: Send + 'static> {
281    id: SubId,
282    interval: Duration,
283    make_msg: Box<dyn Fn() -> M + Send + Sync>,
284}
285
286impl<M: Send + 'static> Every<M> {
287    /// Create a tick subscription with the given interval and message factory.
288    pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
289        // Generate a stable ID from the interval to allow deduplication
290        let id = interval.as_nanos() as u64 ^ 0x5449_434B; // "TICK" magic
291        Self {
292            id,
293            interval,
294            make_msg: Box::new(make_msg),
295        }
296    }
297
298    /// Create a tick subscription with an explicit ID.
299    pub fn with_id(
300        id: SubId,
301        interval: Duration,
302        make_msg: impl Fn() -> M + Send + Sync + 'static,
303    ) -> Self {
304        Self {
305            id,
306            interval,
307            make_msg: Box::new(make_msg),
308        }
309    }
310}
311
312impl<M: Send + 'static> Subscription<M> for Every<M> {
313    fn id(&self) -> SubId {
314        self.id
315    }
316
317    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
318        let mut tick_count: u64 = 0;
319        crate::debug_trace!(
320            "Every subscription started: id={}, interval={:?}",
321            self.id,
322            self.interval
323        );
324        loop {
325            if stop.wait_timeout(self.interval) {
326                crate::debug_trace!(
327                    "Every subscription stopped: id={}, sent {} ticks",
328                    self.id,
329                    tick_count
330                );
331                break;
332            }
333            tick_count += 1;
334            let msg = (self.make_msg)();
335            if sender.send(msg).is_err() {
336                crate::debug_trace!(
337                    "Every subscription channel closed: id={}, sent {} ticks",
338                    self.id,
339                    tick_count
340                );
341                break;
342            }
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[derive(Debug, Clone, PartialEq)]
352    enum TestMsg {
353        Tick,
354        Value(i32),
355    }
356
357    struct ChannelSubscription<M: Send + 'static> {
358        id: SubId,
359        receiver: mpsc::Receiver<M>,
360        poll: Duration,
361    }
362
363    impl<M: Send + 'static> ChannelSubscription<M> {
364        fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
365            Self {
366                id,
367                receiver,
368                poll: Duration::from_millis(5),
369            }
370        }
371    }
372
373    impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
374        fn id(&self) -> SubId {
375            self.id
376        }
377
378        fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
379            loop {
380                if stop.is_stopped() {
381                    break;
382                }
383                match self.receiver.recv_timeout(self.poll) {
384                    Ok(msg) => {
385                        if sender.send(msg).is_err() {
386                            break;
387                        }
388                    }
389                    Err(mpsc::RecvTimeoutError::Timeout) => {}
390                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
391                }
392            }
393        }
394    }
395
396    fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
397        let (tx, rx) = mpsc::channel();
398        (ChannelSubscription::new(id, rx), tx)
399    }
400
401    #[test]
402    fn stop_signal_starts_false() {
403        let (signal, _trigger) = StopSignal::new();
404        assert!(!signal.is_stopped());
405    }
406
407    #[test]
408    fn stop_signal_becomes_true_after_trigger() {
409        let (signal, trigger) = StopSignal::new();
410        trigger.stop();
411        assert!(signal.is_stopped());
412    }
413
414    #[test]
415    fn stop_signal_wait_returns_true_when_stopped() {
416        let (signal, trigger) = StopSignal::new();
417        trigger.stop();
418        assert!(signal.wait_timeout(Duration::from_millis(100)));
419    }
420
421    #[test]
422    fn stop_signal_wait_returns_false_on_timeout() {
423        let (signal, _trigger) = StopSignal::new();
424        assert!(!signal.wait_timeout(Duration::from_millis(10)));
425    }
426
427    #[test]
428    fn channel_subscription_forwards_messages() {
429        let (sub, event_tx) = channel_subscription(1);
430        let (tx, rx) = mpsc::channel();
431        let (signal, trigger) = StopSignal::new();
432
433        let handle = thread::spawn(move || {
434            sub.run(tx, signal);
435        });
436
437        event_tx.send(TestMsg::Value(1)).unwrap();
438        event_tx.send(TestMsg::Value(2)).unwrap();
439        thread::sleep(Duration::from_millis(10));
440        trigger.stop();
441        handle.join().unwrap();
442
443        let msgs: Vec<_> = rx.try_iter().collect();
444        assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
445    }
446
447    #[test]
448    fn every_subscription_fires() {
449        let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
450        let (tx, rx) = mpsc::channel();
451        let (signal, trigger) = StopSignal::new();
452
453        let handle = thread::spawn(move || {
454            sub.run(tx, signal);
455        });
456
457        // Wait for a few ticks
458        thread::sleep(Duration::from_millis(50));
459        trigger.stop();
460        handle.join().unwrap();
461
462        let msgs: Vec<_> = rx.try_iter().collect();
463        assert!(!msgs.is_empty(), "Should have received at least one tick");
464        assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
465    }
466
467    #[test]
468    fn every_subscription_uses_stable_id() {
469        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
470        let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
471        assert_eq!(sub1.id(), sub2.id());
472    }
473
474    #[test]
475    fn every_subscription_different_intervals_different_ids() {
476        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
477        let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
478        assert_ne!(sub1.id(), sub2.id());
479    }
480
481    #[test]
482    fn subscription_manager_starts_subscriptions() {
483        let mut mgr = SubscriptionManager::<TestMsg>::new();
484        let (sub, event_tx) = channel_subscription(1);
485        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
486
487        mgr.reconcile(subs);
488        event_tx.send(TestMsg::Value(42)).unwrap();
489
490        // Give the thread a moment to send
491        thread::sleep(Duration::from_millis(20));
492
493        let msgs = mgr.drain_messages();
494        assert_eq!(msgs, vec![TestMsg::Value(42)]);
495    }
496
497    #[test]
498    fn subscription_manager_dedupes_duplicate_ids() {
499        let mut mgr = SubscriptionManager::<TestMsg>::new();
500        let (sub_a, tx_a) = channel_subscription(7);
501        let (sub_b, tx_b) = channel_subscription(7);
502        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
503
504        mgr.reconcile(subs);
505
506        tx_a.send(TestMsg::Value(1)).unwrap();
507        assert!(
508            tx_b.send(TestMsg::Value(2)).is_err(),
509            "Duplicate subscription should be dropped"
510        );
511
512        thread::sleep(Duration::from_millis(20));
513        let msgs = mgr.drain_messages();
514        assert_eq!(msgs, vec![TestMsg::Value(1)]);
515    }
516
517    #[test]
518    fn subscription_manager_stops_removed() {
519        let mut mgr = SubscriptionManager::<TestMsg>::new();
520
521        // Start with one subscription
522        mgr.reconcile(vec![Box::new(Every::with_id(
523            99,
524            Duration::from_millis(5),
525            || TestMsg::Tick,
526        ))]);
527
528        thread::sleep(Duration::from_millis(20));
529        let msgs_before = mgr.drain_messages();
530        assert!(!msgs_before.is_empty());
531
532        // Remove it
533        mgr.reconcile(vec![]);
534
535        // Drain any remaining buffered messages
536        thread::sleep(Duration::from_millis(20));
537        let _ = mgr.drain_messages();
538
539        // After stopping, no more messages should arrive
540        thread::sleep(Duration::from_millis(30));
541        let msgs_after = mgr.drain_messages();
542        assert!(
543            msgs_after.is_empty(),
544            "Should stop receiving after reconcile with empty set"
545        );
546    }
547
548    #[test]
549    fn subscription_manager_keeps_unchanged() {
550        let mut mgr = SubscriptionManager::<TestMsg>::new();
551
552        // Start subscription
553        mgr.reconcile(vec![Box::new(Every::with_id(
554            50,
555            Duration::from_millis(10),
556            || TestMsg::Tick,
557        ))]);
558
559        thread::sleep(Duration::from_millis(30));
560        let _ = mgr.drain_messages();
561
562        // Reconcile with same ID - should keep running
563        mgr.reconcile(vec![Box::new(Every::with_id(
564            50,
565            Duration::from_millis(10),
566            || TestMsg::Tick,
567        ))]);
568
569        thread::sleep(Duration::from_millis(30));
570        let msgs = mgr.drain_messages();
571        assert!(!msgs.is_empty(), "Subscription should still be running");
572    }
573
574    #[test]
575    fn subscription_manager_stop_all() {
576        let mut mgr = SubscriptionManager::<TestMsg>::new();
577
578        mgr.reconcile(vec![
579            Box::new(Every::with_id(1, Duration::from_millis(5), || {
580                TestMsg::Value(1)
581            })),
582            Box::new(Every::with_id(2, Duration::from_millis(5), || {
583                TestMsg::Value(2)
584            })),
585        ]);
586
587        thread::sleep(Duration::from_millis(20));
588        mgr.stop_all();
589
590        thread::sleep(Duration::from_millis(20));
591        let _ = mgr.drain_messages();
592        thread::sleep(Duration::from_millis(30));
593        let msgs = mgr.drain_messages();
594        assert!(msgs.is_empty());
595    }
596
597    // =========================================================================
598    // ADDITIONAL TESTS - Cmd sequencing + Subscriptions (bd-2nu8.10.2)
599    // =========================================================================
600
601    #[test]
602    fn stop_signal_is_cloneable() {
603        let (signal, trigger) = StopSignal::new();
604        let signal_clone = signal.clone();
605
606        assert!(!signal.is_stopped());
607        assert!(!signal_clone.is_stopped());
608
609        trigger.stop();
610
611        assert!(signal.is_stopped());
612        assert!(signal_clone.is_stopped());
613    }
614
615    #[test]
616    fn stop_signal_wait_wakes_immediately_when_already_stopped() {
617        let (signal, trigger) = StopSignal::new();
618        trigger.stop();
619
620        // Should return immediately, not wait for timeout
621        let start = Instant::now();
622        let stopped = signal.wait_timeout(Duration::from_secs(10));
623        let elapsed = start.elapsed();
624
625        assert!(stopped);
626        assert!(elapsed < Duration::from_millis(100));
627    }
628
629    #[test]
630    fn stop_signal_wait_is_interrupted_by_trigger() {
631        let (signal, trigger) = StopSignal::new();
632
633        let signal_clone = signal.clone();
634        let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
635
636        // Give thread time to start waiting
637        thread::sleep(Duration::from_millis(20));
638        trigger.stop();
639
640        let stopped = handle.join().unwrap();
641        assert!(stopped);
642    }
643
644    #[test]
645    fn channel_subscription_no_messages_without_events() {
646        let (sub, _event_tx) = channel_subscription(1);
647        let (tx, rx) = mpsc::channel();
648        let (signal, trigger) = StopSignal::new();
649
650        let handle = thread::spawn(move || {
651            sub.run(tx, signal);
652        });
653
654        thread::sleep(Duration::from_millis(10));
655        trigger.stop();
656        handle.join().unwrap();
657
658        let msgs: Vec<_> = rx.try_iter().collect();
659        assert!(msgs.is_empty());
660    }
661
662    #[test]
663    fn channel_subscription_id_is_preserved() {
664        let (sub, _tx) = channel_subscription(42);
665        assert_eq!(sub.id(), 42);
666    }
667
668    #[test]
669    fn channel_subscription_stops_on_disconnected_receiver() {
670        let (sub, event_tx) = channel_subscription(1);
671        let (tx, _rx) = mpsc::channel();
672        let (signal, _trigger) = StopSignal::new();
673
674        drop(event_tx);
675
676        let handle = thread::spawn(move || {
677            sub.run(tx, signal);
678        });
679
680        let result = handle.join();
681        assert!(result.is_ok());
682    }
683
684    #[test]
685    fn every_with_id_preserves_custom_id() {
686        let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
687        assert_eq!(sub.id(), 12345);
688    }
689
690    #[test]
691    fn every_stops_on_disconnected_receiver() {
692        let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
693        let (tx, rx) = mpsc::channel();
694        let (signal, _trigger) = StopSignal::new();
695
696        // Drop receiver before running
697        drop(rx);
698
699        // Should exit the loop when send fails
700        let handle = thread::spawn(move || {
701            sub.run(tx, signal);
702        });
703
704        // Should complete quickly, not hang
705        let result = handle.join();
706        assert!(result.is_ok());
707    }
708
709    #[test]
710    fn every_respects_interval() {
711        let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
712        let (tx, rx) = mpsc::channel();
713        let (signal, trigger) = StopSignal::new();
714
715        let start = Instant::now();
716        let handle = thread::spawn(move || {
717            sub.run(tx, signal);
718        });
719
720        // Wait for 3 ticks worth of time
721        thread::sleep(Duration::from_millis(160));
722        trigger.stop();
723        handle.join().unwrap();
724
725        let msgs: Vec<_> = rx.try_iter().collect();
726        let elapsed = start.elapsed();
727
728        // Should have approximately 3 ticks (at 50ms intervals over 160ms)
729        assert!(
730            msgs.len() >= 2,
731            "Expected at least 2 ticks, got {}",
732            msgs.len()
733        );
734        assert!(
735            msgs.len() <= 4,
736            "Expected at most 4 ticks, got {}",
737            msgs.len()
738        );
739        assert!(elapsed >= Duration::from_millis(150));
740    }
741
742    #[test]
743    fn subscription_manager_empty_reconcile() {
744        let mut mgr = SubscriptionManager::<TestMsg>::new();
745
746        // Reconcile with empty list should not panic
747        mgr.reconcile(vec![]);
748        let msgs = mgr.drain_messages();
749        assert!(msgs.is_empty());
750    }
751
752    #[test]
753    fn subscription_manager_drain_messages_returns_all() {
754        let mut mgr = SubscriptionManager::<TestMsg>::new();
755        let (sub, event_tx) = channel_subscription(1);
756        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
757
758        mgr.reconcile(subs);
759        event_tx.send(TestMsg::Value(1)).unwrap();
760        event_tx.send(TestMsg::Value(2)).unwrap();
761        thread::sleep(Duration::from_millis(20));
762
763        let msgs = mgr.drain_messages();
764        assert_eq!(msgs.len(), 2);
765        assert_eq!(msgs[0], TestMsg::Value(1));
766        assert_eq!(msgs[1], TestMsg::Value(2));
767
768        // Second drain should be empty
769        let msgs2 = mgr.drain_messages();
770        assert!(msgs2.is_empty());
771    }
772
773    #[test]
774    fn subscription_manager_replaces_subscription_with_different_id() {
775        let mut mgr = SubscriptionManager::<TestMsg>::new();
776        let (sub1, tx1) = channel_subscription(1);
777
778        // Start with ID 1
779        mgr.reconcile(vec![Box::new(sub1)]);
780        tx1.send(TestMsg::Value(1)).unwrap();
781        thread::sleep(Duration::from_millis(20));
782        let msgs1 = mgr.drain_messages();
783        assert_eq!(msgs1, vec![TestMsg::Value(1)]);
784
785        // Replace with ID 2
786        let (sub2, tx2) = channel_subscription(2);
787        mgr.reconcile(vec![Box::new(sub2)]);
788        tx2.send(TestMsg::Value(2)).unwrap();
789        thread::sleep(Duration::from_millis(20));
790        let msgs2 = mgr.drain_messages();
791        assert_eq!(msgs2, vec![TestMsg::Value(2)]);
792    }
793
794    #[test]
795    fn subscription_manager_multiple_subscriptions() {
796        let mut mgr = SubscriptionManager::<TestMsg>::new();
797        let (sub1, tx1) = channel_subscription(1);
798        let (sub2, tx2) = channel_subscription(2);
799        let (sub3, tx3) = channel_subscription(3);
800        let subs: Vec<Box<dyn Subscription<TestMsg>>> =
801            vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
802
803        mgr.reconcile(subs);
804        tx1.send(TestMsg::Value(10)).unwrap();
805        tx2.send(TestMsg::Value(20)).unwrap();
806        tx3.send(TestMsg::Value(30)).unwrap();
807        thread::sleep(Duration::from_millis(30));
808
809        let mut msgs = mgr.drain_messages();
810        msgs.sort_by_key(|m| match m {
811            TestMsg::Value(v) => *v,
812            _ => 0,
813        });
814
815        assert_eq!(msgs.len(), 3);
816        assert_eq!(msgs[0], TestMsg::Value(10));
817        assert_eq!(msgs[1], TestMsg::Value(20));
818        assert_eq!(msgs[2], TestMsg::Value(30));
819    }
820
821    #[test]
822    fn subscription_manager_partial_update() {
823        let mut mgr = SubscriptionManager::<TestMsg>::new();
824
825        // Start with 3 subscriptions
826        mgr.reconcile(vec![
827            Box::new(Every::with_id(1, Duration::from_millis(10), || {
828                TestMsg::Value(1)
829            })),
830            Box::new(Every::with_id(2, Duration::from_millis(10), || {
831                TestMsg::Value(2)
832            })),
833            Box::new(Every::with_id(3, Duration::from_millis(10), || {
834                TestMsg::Value(3)
835            })),
836        ]);
837
838        thread::sleep(Duration::from_millis(30));
839        let _ = mgr.drain_messages();
840
841        // Remove subscription 2, keep 1 and 3
842        mgr.reconcile(vec![
843            Box::new(Every::with_id(1, Duration::from_millis(10), || {
844                TestMsg::Value(1)
845            })),
846            Box::new(Every::with_id(3, Duration::from_millis(10), || {
847                TestMsg::Value(3)
848            })),
849        ]);
850
851        // Drain any in-flight messages that were sent before the stop signal was processed.
852        // This clears the race window between stop signal and message send.
853        let _ = mgr.drain_messages();
854
855        // Now wait for new messages from the remaining subscriptions
856        thread::sleep(Duration::from_millis(30));
857        let msgs = mgr.drain_messages();
858
859        // Should only have values 1 and 3, not 2
860        let values: Vec<i32> = msgs
861            .iter()
862            .filter_map(|m| match m {
863                TestMsg::Value(v) => Some(*v),
864                _ => None,
865            })
866            .collect();
867
868        assert!(
869            values.contains(&1),
870            "Should still receive from subscription 1"
871        );
872        assert!(
873            values.contains(&3),
874            "Should still receive from subscription 3"
875        );
876        assert!(
877            !values.contains(&2),
878            "Should not receive from stopped subscription 2"
879        );
880    }
881
882    #[test]
883    fn subscription_manager_drop_stops_all() {
884        let (_signal, _) = StopSignal::new();
885        let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
886        let flag_clone = flag.clone();
887
888        struct FlagSubscription {
889            id: SubId,
890            flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
891        }
892
893        impl Subscription<TestMsg> for FlagSubscription {
894            fn id(&self) -> SubId {
895                self.id
896            }
897
898            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
899                while !stop.is_stopped() {
900                    thread::sleep(Duration::from_millis(5));
901                }
902                self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
903            }
904        }
905
906        {
907            let mut mgr = SubscriptionManager::<TestMsg>::new();
908            mgr.reconcile(vec![Box::new(FlagSubscription {
909                id: 1,
910                flag: flag_clone,
911            })]);
912
913            thread::sleep(Duration::from_millis(20));
914            // mgr drops here, should stop all subscriptions
915        }
916
917        thread::sleep(Duration::from_millis(50));
918        assert!(
919            flag.load(std::sync::atomic::Ordering::SeqCst),
920            "Subscription should have stopped on drop"
921        );
922    }
923
924    #[test]
925    fn running_subscription_stop_joins_thread() {
926        use std::sync::atomic::{AtomicBool, Ordering};
927
928        let completed = std::sync::Arc::new(AtomicBool::new(false));
929        let completed_clone = completed.clone();
930
931        let (signal, trigger) = StopSignal::new();
932        let (_tx, _rx) = mpsc::channel::<TestMsg>();
933
934        let thread = thread::spawn(move || {
935            while !signal.is_stopped() {
936                thread::sleep(Duration::from_millis(5));
937            }
938            completed_clone.store(true, Ordering::SeqCst);
939        });
940
941        let running = RunningSubscription {
942            id: 1,
943            trigger,
944            thread: Some(thread),
945        };
946
947        running.stop();
948        assert!(completed.load(Ordering::SeqCst));
949    }
950
951    #[test]
952    fn every_id_stable_across_instances() {
953        // Same interval should produce same ID
954        let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
955        let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
956        let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
957
958        assert_eq!(sub1.id(), sub2.id());
959        assert_eq!(sub2.id(), sub3.id()); // ID is based on interval, not message factory
960    }
961
962    #[test]
963    fn drain_messages_preserves_order() {
964        let mut mgr = SubscriptionManager::<TestMsg>::new();
965
966        // Use a custom subscription that sends messages in order
967        struct OrderedSubscription {
968            values: Vec<i32>,
969        }
970
971        impl Subscription<TestMsg> for OrderedSubscription {
972            fn id(&self) -> SubId {
973                999
974            }
975
976            fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
977                for v in &self.values {
978                    let _ = sender.send(TestMsg::Value(*v));
979                    thread::sleep(Duration::from_millis(1));
980                }
981            }
982        }
983
984        mgr.reconcile(vec![Box::new(OrderedSubscription {
985            values: vec![1, 2, 3, 4, 5],
986        })]);
987
988        thread::sleep(Duration::from_millis(30));
989        let msgs = mgr.drain_messages();
990
991        let values: Vec<i32> = msgs
992            .iter()
993            .filter_map(|m| match m {
994                TestMsg::Value(v) => Some(*v),
995                _ => None,
996            })
997            .collect();
998
999        assert_eq!(values, vec![1, 2, 3, 4, 5]);
1000    }
1001
1002    #[test]
1003    fn subscription_manager_new_is_empty() {
1004        let mgr = SubscriptionManager::<TestMsg>::new();
1005        let msgs = mgr.drain_messages();
1006        assert!(msgs.is_empty());
1007    }
1008}