Skip to main content

ftui_runtime/
subscription.rs

1#![forbid(unsafe_code)]
2
3//! Subscription system for continuous event sources.
4//!
5//! Subscriptions provide a declarative way to receive events from external
6//! sources like timers, file watchers, or network connections. The runtime
7//! manages subscription lifecycles automatically based on what the model
8//! declares as active.
9//!
10//! # How it works
11//!
12//! 1. `Model::subscriptions()` returns the set of active subscriptions
13//! 2. After each `update()`, the runtime compares active vs previous subscriptions
14//! 3. New subscriptions are started, removed ones are stopped
15//! 4. Subscription messages are routed through `Model::update()`
16
17use crate::cancellation::{CancellationSource, CancellationToken};
18use std::collections::HashSet;
19use std::sync::mpsc;
20use std::thread;
21use web_time::{Duration, Instant};
22
23/// A unique identifier for a subscription.
24///
25/// Used by the runtime to track which subscriptions are active and
26/// to deduplicate subscriptions across update cycles.
27pub type SubId = u64;
28
29/// A subscription produces messages from an external event source.
30///
31/// Subscriptions run on background threads and send messages through
32/// the provided channel. The runtime manages their lifecycle.
33pub trait Subscription<M: Send + 'static>: Send {
34    /// Unique identifier for deduplication.
35    ///
36    /// Subscriptions with the same ID are considered identical.
37    /// The runtime uses this to avoid restarting unchanged subscriptions.
38    fn id(&self) -> SubId;
39
40    /// Start the subscription, sending messages through the channel.
41    ///
42    /// This is called on a background thread. Implementations should
43    /// loop and send messages until the channel is disconnected (receiver dropped)
44    /// or the stop signal is received.
45    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
46}
47
48/// Signal for stopping a subscription.
49///
50/// When the runtime stops a subscription, it sets this signal. The subscription
51/// should check it periodically and exit its run loop when set.
52///
53/// Backed by [`CancellationToken`] for structured cancellation.
54#[derive(Clone)]
55pub struct StopSignal {
56    token: CancellationToken,
57}
58
59impl StopSignal {
60    /// Create a new stop signal pair (signal, trigger).
61    pub(crate) fn new() -> (Self, StopTrigger) {
62        let source = CancellationSource::new();
63        let signal = Self {
64            token: source.token(),
65        };
66        let trigger = StopTrigger { source };
67        (signal, trigger)
68    }
69
70    /// Check if the stop signal has been triggered.
71    pub fn is_stopped(&self) -> bool {
72        self.token.is_cancelled()
73    }
74
75    /// Wait for either the stop signal or a timeout.
76    ///
77    /// Returns `true` if stopped, `false` if timed out.
78    /// Blocks the thread efficiently using a condition variable.
79    /// Handles spurious wakeups by looping until condition met or timeout expired.
80    pub fn wait_timeout(&self, duration: Duration) -> bool {
81        self.token.wait_timeout(duration)
82    }
83
84    /// Access the underlying cancellation token.
85    ///
86    /// This enables integration with Asupersync-style structured cancellation
87    /// while preserving backwards compatibility with the `StopSignal` API.
88    pub fn cancellation_token(&self) -> &CancellationToken {
89        &self.token
90    }
91}
92
93/// Trigger to stop a subscription from the runtime side.
94///
95/// Backed by [`CancellationSource`] for structured cancellation.
96pub(crate) struct StopTrigger {
97    source: CancellationSource,
98}
99
100impl StopTrigger {
101    /// Signal the subscription to stop.
102    pub(crate) fn stop(&self) {
103        self.source.cancel();
104    }
105}
106
107/// A running subscription handle.
108pub(crate) struct RunningSubscription {
109    pub(crate) id: SubId,
110    trigger: StopTrigger,
111    thread: Option<thread::JoinHandle<()>>,
112    /// Tracks whether the subscription thread panicked (set by the catch_unwind wrapper).
113    panicked: std::sync::Arc<std::sync::atomic::AtomicBool>,
114}
115
116const SUBSCRIPTION_STOP_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
117/// Poll interval for bounded subscription thread joins (bd-1f2aw).
118///
119/// Same rationale as the executor shutdown polls: `JoinHandle` has no
120/// `join_timeout` in stable Rust, so we poll `is_finished()` with a short
121/// sleep. 1ms minimizes stop latency while avoiding spin.
122const SUBSCRIPTION_STOP_JOIN_POLL: Duration = Duration::from_millis(1);
123
124impl RunningSubscription {
125    /// Returns true if the subscription thread panicked.
126    pub(crate) fn has_panicked(&self) -> bool {
127        self.panicked.load(std::sync::atomic::Ordering::Acquire)
128    }
129
130    /// Signal the subscription to stop (phase 1 of two-phase shutdown).
131    ///
132    /// Does NOT join the thread — call [`join_bounded`] after signalling all
133    /// subscriptions to allow parallel wind-down (bd-1f2aw).
134    pub(crate) fn signal_stop(&self) {
135        self.trigger.stop();
136    }
137
138    /// Join the subscription thread with a bounded timeout (phase 2).
139    ///
140    /// Returns the join handle if the thread did not finish within the timeout,
141    /// allowing callers to log and move on without blocking indefinitely.
142    pub(crate) fn join_bounded(mut self) -> Option<thread::JoinHandle<()>> {
143        let handle = self.thread.take()?;
144        let start = Instant::now();
145
146        // Fast path: subscription already finished (common for short-lived subs).
147        if handle.is_finished() {
148            let _ = handle.join();
149            tracing::trace!(
150                sub_id = self.id,
151                panicked = self.has_panicked(),
152                elapsed_us = start.elapsed().as_micros() as u64,
153                "subscription join (fast path)"
154            );
155            return None;
156        }
157
158        // Slow path: bounded poll loop (bd-1f2aw).
159        while !handle.is_finished() {
160            if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
161                tracing::warn!(
162                    sub_id = self.id,
163                    panicked = self.has_panicked(),
164                    timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
165                    "subscription join timed out, detaching thread"
166                );
167                return Some(handle);
168            }
169            thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
170        }
171
172        let _ = handle.join();
173        tracing::trace!(
174            sub_id = self.id,
175            panicked = self.has_panicked(),
176            elapsed_us = start.elapsed().as_micros() as u64,
177            "subscription join (slow path)"
178        );
179        None
180    }
181
182    /// Stop the subscription and join its thread if it exits promptly.
183    ///
184    /// Convenience method combining signal + join for single-subscription stops.
185    /// Used by tests and external callers that stop a single subscription.
186    #[cfg_attr(not(test), allow(dead_code))]
187    pub(crate) fn stop(mut self) {
188        self.trigger.stop();
189        if let Some(handle) = self.thread.take() {
190            let start = Instant::now();
191            // Fast path: subscription already finished (common for short-lived subs).
192            if handle.is_finished() {
193                let _ = handle.join();
194                tracing::trace!(
195                    sub_id = self.id,
196                    panicked = self.has_panicked(),
197                    elapsed_us = start.elapsed().as_micros() as u64,
198                    "subscription stop (fast path)"
199                );
200                return;
201            }
202            // Slow path: bounded poll loop (bd-1f2aw).
203            while !handle.is_finished() {
204                if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
205                    tracing::warn!(
206                        sub_id = self.id,
207                        panicked = self.has_panicked(),
208                        timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
209                        "subscription did not stop within timeout; detaching thread"
210                    );
211                    return;
212                }
213                thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
214            }
215            let _ = handle.join();
216            tracing::trace!(
217                sub_id = self.id,
218                panicked = self.has_panicked(),
219                elapsed_us = start.elapsed().as_micros() as u64,
220                "subscription stop (slow path)"
221            );
222        }
223    }
224}
225
226impl Drop for RunningSubscription {
227    fn drop(&mut self) {
228        self.trigger.stop();
229        // Don't join in drop to avoid blocking
230    }
231}
232
233/// Manages the lifecycle of subscriptions for a program.
234pub(crate) struct SubscriptionManager<M: Send + 'static> {
235    active: Vec<RunningSubscription>,
236    sender: mpsc::Sender<M>,
237    receiver: mpsc::Receiver<M>,
238}
239
240impl<M: Send + 'static> SubscriptionManager<M> {
241    pub(crate) fn new() -> Self {
242        let (sender, receiver) = mpsc::channel();
243        Self {
244            active: Vec::new(),
245            sender,
246            receiver,
247        }
248    }
249
250    /// Update the set of active subscriptions.
251    ///
252    /// Compares the new set against currently running subscriptions:
253    /// - Starts subscriptions that are new (ID not in active set)
254    /// - Stops subscriptions that are no longer declared (ID not in new set)
255    /// - Leaves unchanged subscriptions running
256    pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
257        let reconcile_start = Instant::now();
258        let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
259        let active_count_before = self.active.len();
260
261        crate::debug_trace!(
262            "reconcile: new_ids={:?}, active_before={}",
263            new_ids,
264            active_count_before
265        );
266        tracing::trace!(
267            new_id_count = new_ids.len(),
268            active_before = active_count_before,
269            new_ids = ?new_ids,
270            "subscription reconcile starting"
271        );
272
273        // Stop subscriptions that are no longer active (two-phase: bd-1f2aw).
274        let mut remaining = Vec::new();
275        let mut to_stop = Vec::new();
276        for running in self.active.drain(..) {
277            if new_ids.contains(&running.id) {
278                remaining.push(running);
279            } else {
280                crate::debug_trace!("stopping subscription: id={}", running.id);
281                tracing::debug!(sub_id = running.id, "Stopping subscription");
282                crate::effect_system::record_subscription_stop("subscription", running.id, 0);
283                crate::effect_system::record_dynamics_sub_stop();
284                to_stop.push(running);
285            }
286        }
287        // Phase 1: Signal all removals.
288        for running in &to_stop {
289            running.signal_stop();
290        }
291        let stopped_count = to_stop.len();
292        // Phase 2: Join with bounded timeout.
293        for running in to_stop {
294            let _ = running.join_bounded();
295        }
296        self.active = remaining;
297
298        // Start new subscriptions
299        let mut started_count = 0usize;
300        let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
301        for sub in subscriptions {
302            let id = sub.id();
303            if !active_ids.insert(id) {
304                continue;
305            }
306            started_count += 1;
307
308            crate::debug_trace!("starting subscription: id={}", id);
309            tracing::debug!(sub_id = id, "Starting subscription");
310            crate::effect_system::record_subscription_start("subscription", id);
311            crate::effect_system::record_dynamics_sub_start();
312            let (signal, trigger) = StopSignal::new();
313            let sender = self.sender.clone();
314            let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
315            let panicked_flag = panicked.clone();
316            let sub_id_for_thread = id;
317
318            let thread = thread::spawn(move || {
319                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
320                    sub.run(sender, signal);
321                }));
322                if let Err(payload) = result {
323                    panicked_flag.store(true, std::sync::atomic::Ordering::Release);
324                    crate::effect_system::record_dynamics_sub_panic();
325                    let panic_msg = match payload.downcast_ref::<&str>() {
326                        Some(s) => (*s).to_string(),
327                        None => match payload.downcast_ref::<String>() {
328                            Some(s) => s.clone(),
329                            None => "unknown panic payload".to_string(),
330                        },
331                    };
332                    crate::effect_system::error_effect_panic(
333                        "subscription",
334                        &format!("sub_id={sub_id_for_thread}: {panic_msg}"),
335                    );
336                }
337            });
338
339            self.active.push(RunningSubscription {
340                id,
341                trigger,
342                thread: Some(thread),
343                panicked,
344            });
345        }
346
347        let active_count_after = self.active.len();
348        let reconcile_elapsed_us = reconcile_start.elapsed().as_micros() as u64;
349        crate::effect_system::record_dynamics_reconcile(reconcile_elapsed_us);
350        crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
351        tracing::trace!(
352            active_before = active_count_before,
353            active_after = active_count_after,
354            started = started_count,
355            stopped = stopped_count,
356            reconcile_us = reconcile_elapsed_us,
357            "subscription reconcile complete"
358        );
359    }
360
361    /// Drain pending messages from subscriptions.
362    pub(crate) fn drain_messages(&self) -> Vec<M> {
363        let mut messages = Vec::new();
364        while let Ok(msg) = self.receiver.try_recv() {
365            messages.push(msg);
366        }
367        messages
368    }
369
370    /// Return the number of active subscriptions.
371    #[inline]
372    pub(crate) fn active_count(&self) -> usize {
373        self.active.len()
374    }
375
376    /// Stop all running subscriptions using two-phase parallel shutdown (bd-1f2aw).
377    ///
378    /// Phase 1: Signal all subscriptions to stop (non-blocking).
379    /// Phase 2: Join all threads with bounded timeout.
380    ///
381    /// This is significantly faster than sequential stop when multiple
382    /// subscriptions are active, because all threads begin winding down
383    /// simultaneously rather than waiting for each to finish in turn.
384    pub(crate) fn stop_all(&mut self) {
385        let count = self.active.len();
386        if count == 0 {
387            return;
388        }
389        let start = Instant::now();
390
391        // Phase 1: Signal all subscriptions to stop (parallel).
392        for running in &self.active {
393            running.signal_stop();
394        }
395
396        let signal_elapsed_us = start.elapsed().as_micros() as u64;
397        tracing::trace!(
398            target: "ftui.runtime",
399            count,
400            signal_elapsed_us,
401            "subscription stop_all phase 1 (signal) complete"
402        );
403
404        // Phase 2: Join all threads with bounded timeout.
405        let mut panicked_count = 0_usize;
406        let mut timed_out_count = 0_usize;
407        for running in self.active.drain(..) {
408            if running.has_panicked() {
409                panicked_count += 1;
410            }
411            if running.join_bounded().is_some() {
412                timed_out_count += 1;
413            }
414        }
415
416        let shutdown_elapsed_us = start.elapsed().as_micros() as u64;
417        crate::effect_system::record_dynamics_shutdown(shutdown_elapsed_us, timed_out_count as u64);
418        tracing::debug!(
419            target: "ftui.runtime",
420            count,
421            panicked_count,
422            timed_out_count,
423            elapsed_us = shutdown_elapsed_us,
424            "subscription stop_all complete"
425        );
426    }
427}
428
429impl<M: Send + 'static> Drop for SubscriptionManager<M> {
430    fn drop(&mut self) {
431        self.stop_all();
432    }
433}
434
435// --- Built-in subscriptions ---
436
437/// A subscription that fires at a fixed interval.
438///
439/// # Example
440///
441/// ```ignore
442/// fn subscriptions(&self) -> Vec<Box<dyn Subscription<MyMsg>>> {
443///     vec![Box::new(Every::new(Duration::from_secs(1), || MyMsg::Tick))]
444/// }
445/// ```
446pub struct Every<M: Send + 'static> {
447    id: SubId,
448    interval: Duration,
449    make_msg: Box<dyn Fn() -> M + Send + Sync>,
450}
451
452impl<M: Send + 'static> Every<M> {
453    /// Create a tick subscription with the given interval and message factory.
454    pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
455        // Generate a stable ID from the interval to allow deduplication
456        let id = interval.as_nanos() as u64 ^ 0x5449_434B; // "TICK" magic
457        Self {
458            id,
459            interval,
460            make_msg: Box::new(make_msg),
461        }
462    }
463
464    /// Create a tick subscription with an explicit ID.
465    pub fn with_id(
466        id: SubId,
467        interval: Duration,
468        make_msg: impl Fn() -> M + Send + Sync + 'static,
469    ) -> Self {
470        Self {
471            id,
472            interval,
473            make_msg: Box::new(make_msg),
474        }
475    }
476}
477
478impl<M: Send + 'static> Subscription<M> for Every<M> {
479    fn id(&self) -> SubId {
480        self.id
481    }
482
483    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
484        let mut tick_count: u64 = 0;
485        crate::debug_trace!(
486            "Every subscription started: id={}, interval={:?}",
487            self.id,
488            self.interval
489        );
490        loop {
491            if stop.wait_timeout(self.interval) {
492                crate::debug_trace!(
493                    "Every subscription stopped: id={}, sent {} ticks",
494                    self.id,
495                    tick_count
496                );
497                break;
498            }
499            tick_count += 1;
500            let msg = (self.make_msg)();
501            if sender.send(msg).is_err() {
502                crate::debug_trace!(
503                    "Every subscription channel closed: id={}, sent {} ticks",
504                    self.id,
505                    tick_count
506                );
507                break;
508            }
509        }
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    #[derive(Debug, Clone, PartialEq)]
518    enum TestMsg {
519        Tick,
520        Value(i32),
521    }
522
523    struct ChannelSubscription<M: Send + 'static> {
524        id: SubId,
525        receiver: mpsc::Receiver<M>,
526        poll: Duration,
527    }
528
529    impl<M: Send + 'static> ChannelSubscription<M> {
530        fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
531            Self {
532                id,
533                receiver,
534                poll: Duration::from_millis(5),
535            }
536        }
537    }
538
539    impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
540        fn id(&self) -> SubId {
541            self.id
542        }
543
544        fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
545            loop {
546                if stop.is_stopped() {
547                    break;
548                }
549                match self.receiver.recv_timeout(self.poll) {
550                    Ok(msg) => {
551                        if sender.send(msg).is_err() {
552                            break;
553                        }
554                    }
555                    Err(mpsc::RecvTimeoutError::Timeout) => {}
556                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
557                }
558            }
559        }
560    }
561
562    fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
563        let (tx, rx) = mpsc::channel();
564        (ChannelSubscription::new(id, rx), tx)
565    }
566
567    #[test]
568    fn stop_signal_starts_false() {
569        let (signal, _trigger) = StopSignal::new();
570        assert!(!signal.is_stopped());
571    }
572
573    #[test]
574    fn stop_signal_becomes_true_after_trigger() {
575        let (signal, trigger) = StopSignal::new();
576        trigger.stop();
577        assert!(signal.is_stopped());
578    }
579
580    #[test]
581    fn stop_signal_wait_returns_true_when_stopped() {
582        let (signal, trigger) = StopSignal::new();
583        trigger.stop();
584        assert!(signal.wait_timeout(Duration::from_millis(100)));
585    }
586
587    #[test]
588    fn stop_signal_wait_returns_false_on_timeout() {
589        let (signal, _trigger) = StopSignal::new();
590        assert!(!signal.wait_timeout(Duration::from_millis(10)));
591    }
592
593    #[test]
594    fn channel_subscription_forwards_messages() {
595        let (sub, event_tx) = channel_subscription(1);
596        let (tx, rx) = mpsc::channel();
597        let (signal, trigger) = StopSignal::new();
598
599        let handle = thread::spawn(move || {
600            sub.run(tx, signal);
601        });
602
603        event_tx.send(TestMsg::Value(1)).unwrap();
604        event_tx.send(TestMsg::Value(2)).unwrap();
605        thread::sleep(Duration::from_millis(10));
606        trigger.stop();
607        handle.join().unwrap();
608
609        let msgs: Vec<_> = rx.try_iter().collect();
610        assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
611    }
612
613    #[test]
614    fn every_subscription_fires() {
615        let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
616        let (tx, rx) = mpsc::channel();
617        let (signal, trigger) = StopSignal::new();
618
619        let handle = thread::spawn(move || {
620            sub.run(tx, signal);
621        });
622
623        // Wait for a few ticks
624        thread::sleep(Duration::from_millis(50));
625        trigger.stop();
626        handle.join().unwrap();
627
628        let msgs: Vec<_> = rx.try_iter().collect();
629        assert!(!msgs.is_empty(), "Should have received at least one tick");
630        assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
631    }
632
633    #[test]
634    fn every_subscription_uses_stable_id() {
635        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
636        let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
637        assert_eq!(sub1.id(), sub2.id());
638    }
639
640    #[test]
641    fn every_subscription_different_intervals_different_ids() {
642        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
643        let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
644        assert_ne!(sub1.id(), sub2.id());
645    }
646
647    #[test]
648    fn subscription_manager_starts_subscriptions() {
649        let mut mgr = SubscriptionManager::<TestMsg>::new();
650        let (sub, event_tx) = channel_subscription(1);
651        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
652
653        mgr.reconcile(subs);
654        event_tx.send(TestMsg::Value(42)).unwrap();
655
656        // Give the thread a moment to send
657        thread::sleep(Duration::from_millis(20));
658
659        let msgs = mgr.drain_messages();
660        assert_eq!(msgs, vec![TestMsg::Value(42)]);
661    }
662
663    #[test]
664    fn subscription_manager_dedupes_duplicate_ids() {
665        let mut mgr = SubscriptionManager::<TestMsg>::new();
666        let (sub_a, tx_a) = channel_subscription(7);
667        let (sub_b, tx_b) = channel_subscription(7);
668        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
669
670        mgr.reconcile(subs);
671
672        tx_a.send(TestMsg::Value(1)).unwrap();
673        assert!(
674            tx_b.send(TestMsg::Value(2)).is_err(),
675            "Duplicate subscription should be dropped"
676        );
677
678        thread::sleep(Duration::from_millis(20));
679        let msgs = mgr.drain_messages();
680        assert_eq!(msgs, vec![TestMsg::Value(1)]);
681    }
682
683    #[test]
684    fn subscription_manager_stops_removed() {
685        let mut mgr = SubscriptionManager::<TestMsg>::new();
686
687        // Start with one subscription
688        mgr.reconcile(vec![Box::new(Every::with_id(
689            99,
690            Duration::from_millis(5),
691            || TestMsg::Tick,
692        ))]);
693
694        thread::sleep(Duration::from_millis(20));
695        let msgs_before = mgr.drain_messages();
696        assert!(!msgs_before.is_empty());
697
698        // Remove it
699        mgr.reconcile(vec![]);
700
701        // Drain any remaining buffered messages
702        thread::sleep(Duration::from_millis(20));
703        let _ = mgr.drain_messages();
704
705        // After stopping, no more messages should arrive
706        thread::sleep(Duration::from_millis(30));
707        let msgs_after = mgr.drain_messages();
708        assert!(
709            msgs_after.is_empty(),
710            "Should stop receiving after reconcile with empty set"
711        );
712    }
713
714    #[test]
715    fn subscription_manager_keeps_unchanged() {
716        let mut mgr = SubscriptionManager::<TestMsg>::new();
717
718        // Start subscription
719        mgr.reconcile(vec![Box::new(Every::with_id(
720            50,
721            Duration::from_millis(10),
722            || TestMsg::Tick,
723        ))]);
724
725        thread::sleep(Duration::from_millis(30));
726        let _ = mgr.drain_messages();
727
728        // Reconcile with same ID - should keep running
729        mgr.reconcile(vec![Box::new(Every::with_id(
730            50,
731            Duration::from_millis(10),
732            || TestMsg::Tick,
733        ))]);
734
735        thread::sleep(Duration::from_millis(30));
736        let msgs = mgr.drain_messages();
737        assert!(!msgs.is_empty(), "Subscription should still be running");
738    }
739
740    #[test]
741    fn subscription_manager_stop_all() {
742        let mut mgr = SubscriptionManager::<TestMsg>::new();
743
744        mgr.reconcile(vec![
745            Box::new(Every::with_id(1, Duration::from_millis(5), || {
746                TestMsg::Value(1)
747            })),
748            Box::new(Every::with_id(2, Duration::from_millis(5), || {
749                TestMsg::Value(2)
750            })),
751        ]);
752
753        thread::sleep(Duration::from_millis(20));
754        mgr.stop_all();
755
756        thread::sleep(Duration::from_millis(20));
757        let _ = mgr.drain_messages();
758        thread::sleep(Duration::from_millis(30));
759        let msgs = mgr.drain_messages();
760        assert!(msgs.is_empty());
761    }
762
763    // =========================================================================
764    // ADDITIONAL TESTS - Cmd sequencing + Subscriptions (bd-2nu8.10.2)
765    // =========================================================================
766
767    #[test]
768    fn stop_signal_is_cloneable() {
769        let (signal, trigger) = StopSignal::new();
770        let signal_clone = signal.clone();
771
772        assert!(!signal.is_stopped());
773        assert!(!signal_clone.is_stopped());
774
775        trigger.stop();
776
777        assert!(signal.is_stopped());
778        assert!(signal_clone.is_stopped());
779    }
780
781    #[test]
782    fn stop_signal_wait_wakes_immediately_when_already_stopped() {
783        let (signal, trigger) = StopSignal::new();
784        trigger.stop();
785
786        // Should return immediately, not wait for timeout
787        let start = Instant::now();
788        let stopped = signal.wait_timeout(Duration::from_secs(10));
789        let elapsed = start.elapsed();
790
791        assert!(stopped);
792        assert!(elapsed < Duration::from_millis(100));
793    }
794
795    #[test]
796    fn stop_signal_wait_is_interrupted_by_trigger() {
797        let (signal, trigger) = StopSignal::new();
798
799        let signal_clone = signal.clone();
800        let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
801
802        // Give thread time to start waiting
803        thread::sleep(Duration::from_millis(20));
804        trigger.stop();
805
806        let stopped = handle.join().unwrap();
807        assert!(stopped);
808    }
809
810    #[test]
811    fn channel_subscription_no_messages_without_events() {
812        let (sub, _event_tx) = channel_subscription(1);
813        let (tx, rx) = mpsc::channel();
814        let (signal, trigger) = StopSignal::new();
815
816        let handle = thread::spawn(move || {
817            sub.run(tx, signal);
818        });
819
820        thread::sleep(Duration::from_millis(10));
821        trigger.stop();
822        handle.join().unwrap();
823
824        let msgs: Vec<_> = rx.try_iter().collect();
825        assert!(msgs.is_empty());
826    }
827
828    #[test]
829    fn channel_subscription_id_is_preserved() {
830        let (sub, _tx) = channel_subscription(42);
831        assert_eq!(sub.id(), 42);
832    }
833
834    #[test]
835    fn channel_subscription_stops_on_disconnected_receiver() {
836        let (sub, event_tx) = channel_subscription(1);
837        let (tx, _rx) = mpsc::channel();
838        let (signal, _trigger) = StopSignal::new();
839
840        drop(event_tx);
841
842        let handle = thread::spawn(move || {
843            sub.run(tx, signal);
844        });
845
846        let result = handle.join();
847        assert!(result.is_ok());
848    }
849
850    #[test]
851    fn every_with_id_preserves_custom_id() {
852        let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
853        assert_eq!(sub.id(), 12345);
854    }
855
856    #[test]
857    fn every_stops_on_disconnected_receiver() {
858        let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
859        let (tx, rx) = mpsc::channel();
860        let (signal, _trigger) = StopSignal::new();
861
862        // Drop receiver before running
863        drop(rx);
864
865        // Should exit the loop when send fails
866        let handle = thread::spawn(move || {
867            sub.run(tx, signal);
868        });
869
870        // Should complete quickly, not hang
871        let result = handle.join();
872        assert!(result.is_ok());
873    }
874
875    #[test]
876    fn every_respects_interval() {
877        let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
878        let (tx, rx) = mpsc::channel();
879        let (signal, trigger) = StopSignal::new();
880
881        let start = Instant::now();
882        let handle = thread::spawn(move || {
883            sub.run(tx, signal);
884        });
885
886        // Wait for 3 ticks worth of time
887        thread::sleep(Duration::from_millis(160));
888        trigger.stop();
889        handle.join().unwrap();
890
891        let msgs: Vec<_> = rx.try_iter().collect();
892        let elapsed = start.elapsed();
893
894        // Should have approximately 3 ticks (at 50ms intervals over 160ms)
895        assert!(
896            msgs.len() >= 2,
897            "Expected at least 2 ticks, got {}",
898            msgs.len()
899        );
900        assert!(
901            msgs.len() <= 4,
902            "Expected at most 4 ticks, got {}",
903            msgs.len()
904        );
905        assert!(elapsed >= Duration::from_millis(150));
906    }
907
908    #[test]
909    fn subscription_manager_empty_reconcile() {
910        let mut mgr = SubscriptionManager::<TestMsg>::new();
911
912        // Reconcile with empty list should not panic
913        mgr.reconcile(vec![]);
914        let msgs = mgr.drain_messages();
915        assert!(msgs.is_empty());
916    }
917
918    #[test]
919    fn subscription_manager_drain_messages_returns_all() {
920        let mut mgr = SubscriptionManager::<TestMsg>::new();
921        let (sub, event_tx) = channel_subscription(1);
922        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
923
924        mgr.reconcile(subs);
925        event_tx.send(TestMsg::Value(1)).unwrap();
926        event_tx.send(TestMsg::Value(2)).unwrap();
927        thread::sleep(Duration::from_millis(20));
928
929        let msgs = mgr.drain_messages();
930        assert_eq!(msgs.len(), 2);
931        assert_eq!(msgs[0], TestMsg::Value(1));
932        assert_eq!(msgs[1], TestMsg::Value(2));
933
934        // Second drain should be empty
935        let msgs2 = mgr.drain_messages();
936        assert!(msgs2.is_empty());
937    }
938
939    #[test]
940    fn subscription_manager_replaces_subscription_with_different_id() {
941        let mut mgr = SubscriptionManager::<TestMsg>::new();
942        let (sub1, tx1) = channel_subscription(1);
943
944        // Start with ID 1
945        mgr.reconcile(vec![Box::new(sub1)]);
946        tx1.send(TestMsg::Value(1)).unwrap();
947        thread::sleep(Duration::from_millis(20));
948        let msgs1 = mgr.drain_messages();
949        assert_eq!(msgs1, vec![TestMsg::Value(1)]);
950
951        // Replace with ID 2
952        let (sub2, tx2) = channel_subscription(2);
953        mgr.reconcile(vec![Box::new(sub2)]);
954        tx2.send(TestMsg::Value(2)).unwrap();
955        thread::sleep(Duration::from_millis(20));
956        let msgs2 = mgr.drain_messages();
957        assert_eq!(msgs2, vec![TestMsg::Value(2)]);
958    }
959
960    #[test]
961    fn subscription_manager_multiple_subscriptions() {
962        let mut mgr = SubscriptionManager::<TestMsg>::new();
963        let (sub1, tx1) = channel_subscription(1);
964        let (sub2, tx2) = channel_subscription(2);
965        let (sub3, tx3) = channel_subscription(3);
966        let subs: Vec<Box<dyn Subscription<TestMsg>>> =
967            vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
968
969        mgr.reconcile(subs);
970        tx1.send(TestMsg::Value(10)).unwrap();
971        tx2.send(TestMsg::Value(20)).unwrap();
972        tx3.send(TestMsg::Value(30)).unwrap();
973        thread::sleep(Duration::from_millis(30));
974
975        let mut msgs = mgr.drain_messages();
976        msgs.sort_by_key(|m| match m {
977            TestMsg::Value(v) => *v,
978            _ => 0,
979        });
980
981        assert_eq!(msgs.len(), 3);
982        assert_eq!(msgs[0], TestMsg::Value(10));
983        assert_eq!(msgs[1], TestMsg::Value(20));
984        assert_eq!(msgs[2], TestMsg::Value(30));
985    }
986
987    #[test]
988    fn subscription_manager_partial_update() {
989        let mut mgr = SubscriptionManager::<TestMsg>::new();
990
991        // Start with 3 subscriptions
992        mgr.reconcile(vec![
993            Box::new(Every::with_id(1, Duration::from_millis(10), || {
994                TestMsg::Value(1)
995            })),
996            Box::new(Every::with_id(2, Duration::from_millis(10), || {
997                TestMsg::Value(2)
998            })),
999            Box::new(Every::with_id(3, Duration::from_millis(10), || {
1000                TestMsg::Value(3)
1001            })),
1002        ]);
1003
1004        thread::sleep(Duration::from_millis(30));
1005        let _ = mgr.drain_messages();
1006
1007        // Remove subscription 2, keep 1 and 3
1008        mgr.reconcile(vec![
1009            Box::new(Every::with_id(1, Duration::from_millis(10), || {
1010                TestMsg::Value(1)
1011            })),
1012            Box::new(Every::with_id(3, Duration::from_millis(10), || {
1013                TestMsg::Value(3)
1014            })),
1015        ]);
1016
1017        // Drain any in-flight messages that were sent before the stop signal was processed.
1018        // This clears the race window between stop signal and message send.
1019        let _ = mgr.drain_messages();
1020
1021        // Now wait for new messages from the remaining subscriptions
1022        thread::sleep(Duration::from_millis(30));
1023        let msgs = mgr.drain_messages();
1024
1025        // Should only have values 1 and 3, not 2
1026        let values: Vec<i32> = msgs
1027            .iter()
1028            .filter_map(|m| match m {
1029                TestMsg::Value(v) => Some(*v),
1030                _ => None,
1031            })
1032            .collect();
1033
1034        assert!(
1035            values.contains(&1),
1036            "Should still receive from subscription 1"
1037        );
1038        assert!(
1039            values.contains(&3),
1040            "Should still receive from subscription 3"
1041        );
1042        assert!(
1043            !values.contains(&2),
1044            "Should not receive from stopped subscription 2"
1045        );
1046    }
1047
1048    #[test]
1049    fn subscription_manager_drop_stops_all() {
1050        let (_signal, _) = StopSignal::new();
1051        let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1052        let flag_clone = flag.clone();
1053
1054        struct FlagSubscription {
1055            id: SubId,
1056            flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
1057        }
1058
1059        impl Subscription<TestMsg> for FlagSubscription {
1060            fn id(&self) -> SubId {
1061                self.id
1062            }
1063
1064            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1065                while !stop.is_stopped() {
1066                    thread::sleep(Duration::from_millis(5));
1067                }
1068                self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
1069            }
1070        }
1071
1072        {
1073            let mut mgr = SubscriptionManager::<TestMsg>::new();
1074            mgr.reconcile(vec![Box::new(FlagSubscription {
1075                id: 1,
1076                flag: flag_clone,
1077            })]);
1078
1079            thread::sleep(Duration::from_millis(20));
1080            // mgr drops here, should stop all subscriptions
1081        }
1082
1083        thread::sleep(Duration::from_millis(50));
1084        assert!(
1085            flag.load(std::sync::atomic::Ordering::SeqCst),
1086            "Subscription should have stopped on drop"
1087        );
1088    }
1089
1090    #[test]
1091    fn running_subscription_stop_joins_thread() {
1092        use std::sync::atomic::{AtomicBool, Ordering};
1093
1094        let completed = std::sync::Arc::new(AtomicBool::new(false));
1095        let completed_clone = completed.clone();
1096
1097        let (signal, trigger) = StopSignal::new();
1098        let (_tx, _rx) = mpsc::channel::<TestMsg>();
1099
1100        let thread = thread::spawn(move || {
1101            while !signal.is_stopped() {
1102                thread::sleep(Duration::from_millis(5));
1103            }
1104            completed_clone.store(true, Ordering::SeqCst);
1105        });
1106
1107        let running = RunningSubscription {
1108            id: 1,
1109            trigger,
1110            thread: Some(thread),
1111            panicked: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1112        };
1113
1114        running.stop();
1115        assert!(completed.load(Ordering::SeqCst));
1116    }
1117
1118    #[test]
1119    fn running_subscription_stop_times_out_for_uncooperative_thread() {
1120        use std::sync::atomic::{AtomicBool, Ordering};
1121
1122        let completed = std::sync::Arc::new(AtomicBool::new(false));
1123        let completed_clone = completed.clone();
1124
1125        let (_signal, trigger) = StopSignal::new();
1126        let thread = thread::spawn(move || {
1127            thread::sleep(Duration::from_millis(500));
1128            completed_clone.store(true, Ordering::SeqCst);
1129        });
1130
1131        let running = RunningSubscription {
1132            id: 7,
1133            trigger,
1134            thread: Some(thread),
1135            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1136        };
1137
1138        let start = Instant::now();
1139        running.stop();
1140        assert!(
1141            start.elapsed() < Duration::from_millis(400),
1142            "stop() should not block behind an uncooperative subscription thread"
1143        );
1144
1145        thread::sleep(Duration::from_millis(550));
1146        assert!(completed.load(Ordering::SeqCst));
1147    }
1148
1149    #[test]
1150    fn every_id_stable_across_instances() {
1151        // Same interval should produce same ID
1152        let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1153        let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1154        let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
1155
1156        assert_eq!(sub1.id(), sub2.id());
1157        assert_eq!(sub2.id(), sub3.id()); // ID is based on interval, not message factory
1158    }
1159
1160    #[test]
1161    fn drain_messages_preserves_order() {
1162        let mut mgr = SubscriptionManager::<TestMsg>::new();
1163
1164        // Use a custom subscription that sends messages in order
1165        struct OrderedSubscription {
1166            values: Vec<i32>,
1167        }
1168
1169        impl Subscription<TestMsg> for OrderedSubscription {
1170            fn id(&self) -> SubId {
1171                999
1172            }
1173
1174            fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1175                for v in &self.values {
1176                    let _ = sender.send(TestMsg::Value(*v));
1177                    thread::sleep(Duration::from_millis(1));
1178                }
1179            }
1180        }
1181
1182        mgr.reconcile(vec![Box::new(OrderedSubscription {
1183            values: vec![1, 2, 3, 4, 5],
1184        })]);
1185
1186        thread::sleep(Duration::from_millis(30));
1187        let msgs = mgr.drain_messages();
1188
1189        let values: Vec<i32> = msgs
1190            .iter()
1191            .filter_map(|m| match m {
1192                TestMsg::Value(v) => Some(*v),
1193                _ => None,
1194            })
1195            .collect();
1196
1197        assert_eq!(values, vec![1, 2, 3, 4, 5]);
1198    }
1199
1200    #[test]
1201    fn subscription_manager_new_is_empty() {
1202        let mgr = SubscriptionManager::<TestMsg>::new();
1203        let msgs = mgr.drain_messages();
1204        assert!(msgs.is_empty());
1205    }
1206
1207    // =========================================================================
1208    // LIFECYCLE CONTRACT TESTS (bd-1dg21)
1209    //
1210    // These tests capture the observable behavioral contract of the subscription
1211    // system that MUST be preserved during the Asupersync migration. Each test
1212    // documents a specific guarantee that downstream code relies on.
1213    // =========================================================================
1214
1215    /// CONTRACT: StopSignal backed by CancellationToken must remain functional
1216    /// even after concurrent thread panics. The AtomicBool-based implementation
1217    /// is inherently poison-resistant.
1218    #[test]
1219    fn contract_stop_signal_resilient_to_thread_panics() {
1220        let (signal, trigger) = StopSignal::new();
1221        let signal_clone = signal.clone();
1222
1223        // Panic in a thread that holds a clone of the signal
1224        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1225            assert!(!signal_clone.is_stopped());
1226            panic!("intentional panic while holding signal clone");
1227        }));
1228        assert!(result.is_err());
1229
1230        // Signal must still be checkable and triggerable after thread panic
1231        assert!(
1232            !signal.is_stopped(),
1233            "signal should still report not-stopped"
1234        );
1235        trigger.stop();
1236        assert!(
1237            signal.is_stopped(),
1238            "signal should report stopped after trigger"
1239        );
1240        assert!(
1241            signal.wait_timeout(Duration::from_millis(10)),
1242            "wait_timeout should return true when stopped"
1243        );
1244    }
1245
1246    /// CONTRACT: StopSignal exposes its underlying CancellationToken for
1247    /// Asupersync integration.
1248    #[test]
1249    fn contract_stop_signal_exposes_cancellation_token() {
1250        let (signal, trigger) = StopSignal::new();
1251        let token = signal.cancellation_token();
1252        assert!(!token.is_cancelled(), "token should start uncancelled");
1253        trigger.stop();
1254        assert!(token.is_cancelled(), "token should be cancelled after stop");
1255    }
1256
1257    /// CONTRACT: stop_all() must complete within a bounded time even if subscription
1258    /// threads are uncooperative. The 250ms join timeout per subscription is the
1259    /// upper bound.
1260    #[test]
1261    fn contract_stop_all_bounded_time_with_uncooperative_subscriptions() {
1262        let mut mgr = SubscriptionManager::<TestMsg>::new();
1263
1264        // Create subscriptions that ignore the stop signal
1265        struct UncooperativeSub {
1266            id: SubId,
1267        }
1268
1269        impl Subscription<TestMsg> for UncooperativeSub {
1270            fn id(&self) -> SubId {
1271                self.id
1272            }
1273
1274            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1275                // Ignore stop signal entirely, sleep for a long time
1276                thread::sleep(Duration::from_secs(5));
1277            }
1278        }
1279
1280        mgr.reconcile(vec![
1281            Box::new(UncooperativeSub { id: 100 }),
1282            Box::new(UncooperativeSub { id: 200 }),
1283        ]);
1284
1285        thread::sleep(Duration::from_millis(20)); // let threads start
1286
1287        let start = Instant::now();
1288        mgr.stop_all();
1289        let elapsed = start.elapsed();
1290
1291        // 2 subscriptions * 250ms timeout each = 500ms max, plus some margin
1292        assert!(
1293            elapsed < Duration::from_millis(800),
1294            "stop_all took {elapsed:?}, expected < 800ms for 2 uncooperative subscriptions"
1295        );
1296    }
1297
1298    /// CONTRACT: reconcile() must not start a new subscription for an ID that is
1299    /// already active, even if the subscription object is different.
1300    #[test]
1301    fn contract_reconcile_deduplicates_by_id_not_identity() {
1302        let mut mgr = SubscriptionManager::<TestMsg>::new();
1303        let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1304
1305        struct CountingSub {
1306            id: SubId,
1307            counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
1308        }
1309
1310        impl Subscription<TestMsg> for CountingSub {
1311            fn id(&self) -> SubId {
1312                self.id
1313            }
1314
1315            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1316                self.counter
1317                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1318                while !stop.is_stopped() {
1319                    thread::sleep(Duration::from_millis(5));
1320                }
1321            }
1322        }
1323
1324        // First reconcile starts one thread
1325        mgr.reconcile(vec![Box::new(CountingSub {
1326            id: 42,
1327            counter: counter.clone(),
1328        })]);
1329        thread::sleep(Duration::from_millis(20));
1330        assert_eq!(
1331            counter.load(std::sync::atomic::Ordering::SeqCst),
1332            1,
1333            "first reconcile should start exactly 1 thread"
1334        );
1335
1336        // Second reconcile with same ID must NOT start another thread
1337        mgr.reconcile(vec![Box::new(CountingSub {
1338            id: 42,
1339            counter: counter.clone(),
1340        })]);
1341        thread::sleep(Duration::from_millis(20));
1342        assert_eq!(
1343            counter.load(std::sync::atomic::Ordering::SeqCst),
1344            1,
1345            "second reconcile with same ID must not start another thread"
1346        );
1347
1348        mgr.stop_all();
1349    }
1350
1351    /// CONTRACT: When a subscription is removed via reconcile(), messages it sent
1352    /// before being stopped may still be in the channel. drain_messages() must
1353    /// return these buffered messages.
1354    #[test]
1355    fn contract_buffered_messages_available_after_subscription_stopped() {
1356        let mut mgr = SubscriptionManager::<TestMsg>::new();
1357
1358        struct BurstSub;
1359
1360        impl Subscription<TestMsg> for BurstSub {
1361            fn id(&self) -> SubId {
1362                77
1363            }
1364
1365            fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1366                // Send a burst of messages immediately
1367                for i in 0..10 {
1368                    let _ = sender.send(TestMsg::Value(i));
1369                }
1370                // Then wait for stop
1371                while !stop.is_stopped() {
1372                    thread::sleep(Duration::from_millis(5));
1373                }
1374            }
1375        }
1376
1377        mgr.reconcile(vec![Box::new(BurstSub)]);
1378        thread::sleep(Duration::from_millis(30));
1379
1380        // Remove the subscription
1381        mgr.reconcile(vec![]);
1382
1383        // Messages sent before stop should still be drainable
1384        let msgs = mgr.drain_messages();
1385        let values: Vec<i32> = msgs
1386            .iter()
1387            .filter_map(|m| match m {
1388                TestMsg::Value(v) => Some(*v),
1389                _ => None,
1390            })
1391            .collect();
1392
1393        assert!(
1394            values.len() >= 5,
1395            "Expected at least 5 buffered messages after stop, got {}",
1396            values.len()
1397        );
1398    }
1399
1400    /// CONTRACT: active_count() must accurately reflect the number of running
1401    /// subscriptions at all times.
1402    #[test]
1403    fn contract_active_count_tracks_running_subscriptions() {
1404        let mut mgr = SubscriptionManager::<TestMsg>::new();
1405
1406        assert_eq!(mgr.active_count(), 0, "empty manager");
1407
1408        mgr.reconcile(vec![
1409            Box::new(Every::with_id(1, Duration::from_millis(50), || {
1410                TestMsg::Tick
1411            })),
1412            Box::new(Every::with_id(2, Duration::from_millis(50), || {
1413                TestMsg::Tick
1414            })),
1415        ]);
1416        assert_eq!(mgr.active_count(), 2, "after starting 2");
1417
1418        mgr.reconcile(vec![Box::new(Every::with_id(
1419            1,
1420            Duration::from_millis(50),
1421            || TestMsg::Tick,
1422        ))]);
1423        assert_eq!(mgr.active_count(), 1, "after removing 1");
1424
1425        mgr.stop_all();
1426        assert_eq!(mgr.active_count(), 0, "after stop_all");
1427    }
1428
1429    /// CONTRACT: The Every subscription ID must be derived from interval only,
1430    /// not from the message factory closure. Two Every subscriptions with the
1431    /// same interval MUST have the same ID regardless of message content.
1432    #[test]
1433    fn contract_every_id_derived_from_interval_only() {
1434        let sub_a = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1435        let sub_b = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(999));
1436        assert_eq!(
1437            sub_a.id(),
1438            sub_b.id(),
1439            "Every ID must depend only on interval, not message factory"
1440        );
1441
1442        let sub_c = Every::<TestMsg>::new(Duration::from_millis(200), || TestMsg::Tick);
1443        assert_ne!(
1444            sub_a.id(),
1445            sub_c.id(),
1446            "Different intervals must produce different IDs"
1447        );
1448    }
1449
1450    /// CONTRACT: The Every subscription ID formula must remain stable across
1451    /// versions. This captures the exact formula: interval_nanos XOR 0x5449_434B.
1452    #[test]
1453    fn contract_every_id_formula_is_stable() {
1454        let interval = Duration::from_millis(100);
1455        let expected_id = interval.as_nanos() as u64 ^ 0x5449_434B;
1456        let sub = Every::<TestMsg>::new(interval, || TestMsg::Tick);
1457        assert_eq!(
1458            sub.id(),
1459            expected_id,
1460            "Every ID formula must be: interval.as_nanos() as u64 ^ 0x5449_434B"
1461        );
1462    }
1463
1464    /// CONTRACT: Drop on SubscriptionManager must stop all subscriptions.
1465    /// This is the safety net for cleanup even if stop_all() is not called.
1466    #[test]
1467    fn contract_drop_triggers_stop_all() {
1468        use std::sync::atomic::{AtomicUsize, Ordering};
1469
1470        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1471
1472        struct StopCountingSub {
1473            id: SubId,
1474            counter: std::sync::Arc<AtomicUsize>,
1475        }
1476
1477        impl Subscription<TestMsg> for StopCountingSub {
1478            fn id(&self) -> SubId {
1479                self.id
1480            }
1481
1482            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1483                while !stop.is_stopped() {
1484                    thread::sleep(Duration::from_millis(5));
1485                }
1486                self.counter.fetch_add(1, Ordering::SeqCst);
1487            }
1488        }
1489
1490        {
1491            let mut mgr = SubscriptionManager::<TestMsg>::new();
1492            mgr.reconcile(vec![
1493                Box::new(StopCountingSub {
1494                    id: 1,
1495                    counter: stop_count.clone(),
1496                }),
1497                Box::new(StopCountingSub {
1498                    id: 2,
1499                    counter: stop_count.clone(),
1500                }),
1501                Box::new(StopCountingSub {
1502                    id: 3,
1503                    counter: stop_count.clone(),
1504                }),
1505            ]);
1506            thread::sleep(Duration::from_millis(20));
1507            // mgr dropped here
1508        }
1509
1510        // Give threads time to notice stop signal and exit
1511        thread::sleep(Duration::from_millis(400));
1512        assert_eq!(
1513            stop_count.load(std::sync::atomic::Ordering::SeqCst),
1514            3,
1515            "all 3 subscription threads must have observed stop signal on drop"
1516        );
1517    }
1518
1519    /// CONTRACT: SUBSCRIPTION_STOP_JOIN_TIMEOUT must be exactly 250ms.
1520    /// The Asupersync migration must preserve this timeout bound.
1521    #[test]
1522    fn contract_stop_join_timeout_is_250ms() {
1523        assert_eq!(
1524            SUBSCRIPTION_STOP_JOIN_TIMEOUT,
1525            Duration::from_millis(250),
1526            "join timeout must be 250ms"
1527        );
1528        assert_eq!(
1529            SUBSCRIPTION_STOP_JOIN_POLL,
1530            Duration::from_millis(1),
1531            "join poll interval must be 1ms (bd-1f2aw)"
1532        );
1533    }
1534
1535    // =========================================================================
1536    // STRUCTURED LIFECYCLE TESTS (bd-1f2aw)
1537    //
1538    // These tests validate the structured cancellation, panic resilience,
1539    // and parallel shutdown improvements.
1540    // =========================================================================
1541
1542    /// bd-1f2aw: A panicking subscription must not crash the runtime.
1543    /// The panic is caught, the panicked flag is set, and telemetry is emitted.
1544    #[test]
1545    fn lifecycle_panic_in_subscription_is_caught() {
1546        use std::sync::atomic::Ordering;
1547
1548        let mut mgr = SubscriptionManager::<TestMsg>::new();
1549
1550        struct PanickingSub;
1551
1552        impl Subscription<TestMsg> for PanickingSub {
1553            fn id(&self) -> SubId {
1554                0xDEAD
1555            }
1556
1557            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1558                panic!("intentional test panic in subscription");
1559            }
1560        }
1561
1562        mgr.reconcile(vec![Box::new(PanickingSub)]);
1563
1564        // Give the thread time to panic and be caught.
1565        thread::sleep(Duration::from_millis(50));
1566
1567        // The manager should still be functional.
1568        assert_eq!(
1569            mgr.active_count(),
1570            1,
1571            "panicked sub still tracked as active"
1572        );
1573
1574        // The panicked flag should be set.
1575        assert!(
1576            mgr.active[0].panicked.load(Ordering::Acquire),
1577            "panicked flag should be set after subscription panic"
1578        );
1579
1580        // stop_all should not panic even with a panicked subscription.
1581        mgr.stop_all();
1582        assert_eq!(mgr.active_count(), 0);
1583    }
1584
1585    /// bd-1f2aw: A panicking subscription must not prevent other subscriptions
1586    /// from continuing to deliver messages.
1587    #[test]
1588    fn lifecycle_panic_does_not_affect_sibling_subscriptions() {
1589        let mut mgr = SubscriptionManager::<TestMsg>::new();
1590
1591        struct PanickingSub;
1592        impl Subscription<TestMsg> for PanickingSub {
1593            fn id(&self) -> SubId {
1594                0xBAD
1595            }
1596            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1597                panic!("boom");
1598            }
1599        }
1600
1601        mgr.reconcile(vec![
1602            Box::new(PanickingSub),
1603            Box::new(Every::with_id(42, Duration::from_millis(10), || {
1604                TestMsg::Tick
1605            })),
1606        ]);
1607
1608        // Wait for panic to happen and ticks to arrive.
1609        // The tick subscription fires every 10ms; wait long enough for several.
1610        thread::sleep(Duration::from_millis(100));
1611
1612        let msgs = mgr.drain_messages();
1613        assert!(
1614            !msgs.is_empty(),
1615            "sibling subscription should still deliver messages after a panic in another sub"
1616        );
1617
1618        mgr.stop_all();
1619    }
1620
1621    /// bd-1f2aw: Parallel phased shutdown (stop_all) must be faster than
1622    /// sequential shutdown when multiple subscriptions need to wind down.
1623    #[test]
1624    fn lifecycle_stop_all_parallel_shutdown() {
1625        use std::sync::atomic::{AtomicUsize, Ordering};
1626
1627        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1628        let sub_count = 4;
1629
1630        struct SlowStopSub {
1631            id: SubId,
1632            counter: std::sync::Arc<AtomicUsize>,
1633        }
1634
1635        impl Subscription<TestMsg> for SlowStopSub {
1636            fn id(&self) -> SubId {
1637                self.id
1638            }
1639
1640            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1641                // Wait for stop, then simulate slow cleanup (50ms).
1642                while !stop.is_stopped() {
1643                    thread::sleep(Duration::from_millis(5));
1644                }
1645                thread::sleep(Duration::from_millis(50));
1646                self.counter.fetch_add(1, Ordering::SeqCst);
1647            }
1648        }
1649
1650        let mut mgr = SubscriptionManager::<TestMsg>::new();
1651        let subs: Vec<Box<dyn Subscription<TestMsg>>> = (0..sub_count)
1652            .map(|i| -> Box<dyn Subscription<TestMsg>> {
1653                Box::new(SlowStopSub {
1654                    id: 1000 + i,
1655                    counter: stop_count.clone(),
1656                })
1657            })
1658            .collect();
1659
1660        mgr.reconcile(subs);
1661        thread::sleep(Duration::from_millis(20));
1662
1663        let start = Instant::now();
1664        mgr.stop_all();
1665        let elapsed = start.elapsed();
1666
1667        // With parallel signal, all 4 subs start their 50ms cleanup
1668        // simultaneously. Sequential would take ~200ms (4 * 50ms).
1669        // Parallel should complete in ~50ms + join overhead.
1670        // Use 150ms as a generous bound (well under 200ms sequential).
1671        assert!(
1672            elapsed < Duration::from_millis(150),
1673            "parallel stop_all took {elapsed:?}, expected < 150ms \
1674             (sequential would be ~{expected_sequential}ms)",
1675            expected_sequential = sub_count * 50
1676        );
1677
1678        // All subscriptions should have completed cleanup.
1679        thread::sleep(Duration::from_millis(20));
1680        assert_eq!(
1681            stop_count.load(Ordering::SeqCst),
1682            sub_count as usize,
1683            "all subscriptions should have completed their cleanup"
1684        );
1685    }
1686
1687    /// bd-1f2aw: Two-phase signal+join in reconcile should allow parallel
1688    /// wind-down when removing multiple subscriptions at once.
1689    #[test]
1690    fn lifecycle_reconcile_removal_uses_parallel_stop() {
1691        use std::sync::atomic::{AtomicUsize, Ordering};
1692
1693        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1694
1695        struct SlowStopSub {
1696            id: SubId,
1697            counter: std::sync::Arc<AtomicUsize>,
1698        }
1699
1700        impl Subscription<TestMsg> for SlowStopSub {
1701            fn id(&self) -> SubId {
1702                self.id
1703            }
1704
1705            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1706                while !stop.is_stopped() {
1707                    thread::sleep(Duration::from_millis(5));
1708                }
1709                thread::sleep(Duration::from_millis(40));
1710                self.counter.fetch_add(1, Ordering::SeqCst);
1711            }
1712        }
1713
1714        let mut mgr = SubscriptionManager::<TestMsg>::new();
1715        mgr.reconcile(vec![
1716            Box::new(SlowStopSub {
1717                id: 2000,
1718                counter: stop_count.clone(),
1719            }),
1720            Box::new(SlowStopSub {
1721                id: 2001,
1722                counter: stop_count.clone(),
1723            }),
1724            Box::new(SlowStopSub {
1725                id: 2002,
1726                counter: stop_count.clone(),
1727            }),
1728        ]);
1729        thread::sleep(Duration::from_millis(20));
1730
1731        // Remove all subscriptions via reconcile.
1732        let start = Instant::now();
1733        mgr.reconcile(vec![]);
1734        let elapsed = start.elapsed();
1735
1736        // Parallel: ~40ms + overhead. Sequential would be ~120ms.
1737        assert!(
1738            elapsed < Duration::from_millis(100),
1739            "reconcile removal took {elapsed:?}, expected < 100ms with parallel stop"
1740        );
1741
1742        thread::sleep(Duration::from_millis(20));
1743        assert_eq!(stop_count.load(Ordering::SeqCst), 3);
1744    }
1745
1746    /// bd-1f2aw: has_panicked() must reflect the actual panic state of the thread.
1747    #[test]
1748    fn lifecycle_has_panicked_tracks_state() {
1749        let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1750        let panicked_flag = panicked.clone();
1751
1752        let (signal, trigger) = StopSignal::new();
1753        let thread = thread::spawn(move || {
1754            signal.wait_timeout(Duration::from_secs(10));
1755        });
1756
1757        let running = RunningSubscription {
1758            id: 999,
1759            trigger,
1760            thread: Some(thread),
1761            panicked,
1762        };
1763
1764        assert!(!running.has_panicked(), "should not be panicked initially");
1765
1766        // Simulate a panic flag (normally set by the catch_unwind wrapper).
1767        panicked_flag.store(true, std::sync::atomic::Ordering::Release);
1768        assert!(running.has_panicked(), "should reflect panicked state");
1769
1770        running.stop();
1771    }
1772
1773    /// bd-1f2aw: signal_stop + join_bounded should work correctly as a two-phase
1774    /// shutdown for individual subscriptions.
1775    #[test]
1776    fn lifecycle_signal_then_join_works() {
1777        use std::sync::atomic::{AtomicBool, Ordering};
1778
1779        let completed = std::sync::Arc::new(AtomicBool::new(false));
1780        let completed_clone = completed.clone();
1781
1782        let (signal, trigger) = StopSignal::new();
1783        let thread = thread::spawn(move || {
1784            while !signal.is_stopped() {
1785                thread::sleep(Duration::from_millis(5));
1786            }
1787            completed_clone.store(true, Ordering::SeqCst);
1788        });
1789
1790        let running = RunningSubscription {
1791            id: 888,
1792            trigger,
1793            thread: Some(thread),
1794            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1795        };
1796
1797        running.signal_stop();
1798        let leftover = running.join_bounded();
1799        assert!(
1800            leftover.is_none(),
1801            "cooperative thread should join within timeout"
1802        );
1803        assert!(
1804            completed.load(Ordering::SeqCst),
1805            "thread should have completed"
1806        );
1807    }
1808
1809    /// bd-1f2aw: join_bounded must return the handle for uncooperative threads.
1810    #[test]
1811    fn lifecycle_join_bounded_returns_handle_for_uncooperative() {
1812        use std::sync::atomic::AtomicBool;
1813
1814        let (_signal, trigger) = StopSignal::new();
1815        let thread = thread::spawn(move || {
1816            thread::sleep(Duration::from_millis(500));
1817        });
1818
1819        let running = RunningSubscription {
1820            id: 777,
1821            trigger,
1822            thread: Some(thread),
1823            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1824        };
1825
1826        running.signal_stop();
1827        let start = Instant::now();
1828        let leftover = running.join_bounded();
1829        let elapsed = start.elapsed();
1830
1831        assert!(
1832            leftover.is_some(),
1833            "uncooperative thread should not join within timeout"
1834        );
1835        assert!(
1836            elapsed < Duration::from_millis(400),
1837            "join_bounded should respect the 250ms timeout, took {elapsed:?}"
1838        );
1839    }
1840
1841    /// bd-1f2aw: Restart semantics — a subscription that was stopped via
1842    /// reconcile can be re-started by including it in a subsequent reconcile.
1843    #[test]
1844    fn lifecycle_restart_after_stop() {
1845        let mut mgr = SubscriptionManager::<TestMsg>::new();
1846
1847        // Start subscription.
1848        mgr.reconcile(vec![Box::new(Every::with_id(
1849            300,
1850            Duration::from_millis(10),
1851            || TestMsg::Tick,
1852        ))]);
1853        thread::sleep(Duration::from_millis(30));
1854        let msgs = mgr.drain_messages();
1855        assert!(!msgs.is_empty(), "should receive ticks");
1856
1857        // Remove it.
1858        mgr.reconcile(vec![]);
1859        thread::sleep(Duration::from_millis(20));
1860        let _ = mgr.drain_messages();
1861        thread::sleep(Duration::from_millis(30));
1862        let msgs = mgr.drain_messages();
1863        assert!(msgs.is_empty(), "should stop receiving after removal");
1864
1865        // Restart with same ID.
1866        mgr.reconcile(vec![Box::new(Every::with_id(
1867            300,
1868            Duration::from_millis(10),
1869            || TestMsg::Value(99),
1870        ))]);
1871        thread::sleep(Duration::from_millis(30));
1872        let msgs = mgr.drain_messages();
1873        assert!(
1874            !msgs.is_empty(),
1875            "should receive messages again after restart"
1876        );
1877        assert!(
1878            msgs.iter().any(|m| matches!(m, TestMsg::Value(99))),
1879            "restarted sub should use the new message factory"
1880        );
1881
1882        mgr.stop_all();
1883    }
1884
1885    /// bd-1f2aw: Non-interference contract — subscriptions communicate
1886    /// exclusively through the mpsc channel. They have no access to terminal
1887    /// state, frame buffers, or render surfaces.
1888    ///
1889    /// This test verifies the architectural invariant by demonstrating that
1890    /// subscription threads only interact with the manager through messages,
1891    /// and that the manager's state is consistent after concurrent operations.
1892    #[test]
1893    fn lifecycle_non_interference_with_manager_state() {
1894        use std::sync::atomic::{AtomicUsize, Ordering};
1895
1896        let msg_count = std::sync::Arc::new(AtomicUsize::new(0));
1897
1898        struct CountingSub {
1899            id: SubId,
1900            counter: std::sync::Arc<AtomicUsize>,
1901        }
1902
1903        impl Subscription<TestMsg> for CountingSub {
1904            fn id(&self) -> SubId {
1905                self.id
1906            }
1907
1908            fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1909                while !stop.is_stopped() {
1910                    if sender.send(TestMsg::Tick).is_err() {
1911                        break;
1912                    }
1913                    self.counter.fetch_add(1, Ordering::SeqCst);
1914                    thread::sleep(Duration::from_millis(5));
1915                }
1916            }
1917        }
1918
1919        let mut mgr = SubscriptionManager::<TestMsg>::new();
1920
1921        // Start multiple subscriptions.
1922        mgr.reconcile(vec![
1923            Box::new(CountingSub {
1924                id: 400,
1925                counter: msg_count.clone(),
1926            }),
1927            Box::new(CountingSub {
1928                id: 401,
1929                counter: msg_count.clone(),
1930            }),
1931        ]);
1932
1933        thread::sleep(Duration::from_millis(50));
1934
1935        // Manager state is consistent while subscriptions are running.
1936        assert_eq!(mgr.active_count(), 2);
1937        let drained = mgr.drain_messages();
1938        let sent_count = msg_count.load(Ordering::SeqCst);
1939        assert!(sent_count > 0, "subscriptions should have sent messages");
1940        assert!(
1941            drained.len() <= sent_count,
1942            "drained {} but only {} sent",
1943            drained.len(),
1944            sent_count
1945        );
1946
1947        // Stop all — manager state is consistent after shutdown.
1948        mgr.stop_all();
1949        assert_eq!(mgr.active_count(), 0);
1950
1951        // Drain remaining buffered messages.
1952        let remaining = mgr.drain_messages();
1953        let total_drained = drained.len() + remaining.len();
1954        let final_sent = msg_count.load(Ordering::SeqCst);
1955        assert!(
1956            total_drained <= final_sent,
1957            "total drained ({total_drained}) must not exceed total sent ({final_sent})"
1958        );
1959    }
1960
1961    /// bd-1f2aw: Shutdown ordering contract — stop_all() must signal all
1962    /// subscriptions before joining any. Verify by checking that all subs
1963    /// observe the stop signal approximately simultaneously.
1964    #[test]
1965    fn lifecycle_shutdown_signal_ordering() {
1966        use std::sync::atomic::{AtomicU64, Ordering};
1967
1968        let signal_times =
1969            std::sync::Arc::new([AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)]);
1970        let epoch = Instant::now();
1971
1972        struct TimingStopSub {
1973            id: SubId,
1974            index: usize,
1975            signal_times: std::sync::Arc<[AtomicU64; 3]>,
1976            epoch: Instant,
1977        }
1978
1979        impl Subscription<TestMsg> for TimingStopSub {
1980            fn id(&self) -> SubId {
1981                self.id
1982            }
1983
1984            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1985                while !stop.is_stopped() {
1986                    thread::sleep(Duration::from_millis(1));
1987                }
1988                let elapsed_us = self.epoch.elapsed().as_micros() as u64;
1989                self.signal_times[self.index].store(elapsed_us, Ordering::SeqCst);
1990            }
1991        }
1992
1993        let mut mgr = SubscriptionManager::<TestMsg>::new();
1994        mgr.reconcile(vec![
1995            Box::new(TimingStopSub {
1996                id: 500,
1997                index: 0,
1998                signal_times: signal_times.clone(),
1999                epoch,
2000            }),
2001            Box::new(TimingStopSub {
2002                id: 501,
2003                index: 1,
2004                signal_times: signal_times.clone(),
2005                epoch,
2006            }),
2007            Box::new(TimingStopSub {
2008                id: 502,
2009                index: 2,
2010                signal_times: signal_times.clone(),
2011                epoch,
2012            }),
2013        ]);
2014        thread::sleep(Duration::from_millis(20));
2015
2016        mgr.stop_all();
2017
2018        // All three should have observed the stop signal at approximately
2019        // the same time (within 10ms of each other), because phase 1 signals
2020        // all before phase 2 joins any.
2021        let t0 = signal_times[0].load(Ordering::SeqCst);
2022        let t1 = signal_times[1].load(Ordering::SeqCst);
2023        let t2 = signal_times[2].load(Ordering::SeqCst);
2024
2025        assert!(
2026            t0 > 0 && t1 > 0 && t2 > 0,
2027            "all subs should have recorded stop time"
2028        );
2029
2030        let max_t = t0.max(t1).max(t2);
2031        let min_t = t0.min(t1).min(t2);
2032        let spread_us = max_t - min_t;
2033
2034        assert!(
2035            spread_us < 10_000, // 10ms
2036            "stop signal spread should be < 10ms for parallel signaling, got {spread_us}us \
2037             (t0={t0}, t1={t1}, t2={t2})"
2038        );
2039    }
2040}