Skip to main content

ftui_runtime/
subscription.rs

1#![forbid(unsafe_code)]
2
3//! Subscription system for continuous event sources.
4//!
5//! Subscriptions provide a declarative way to receive events from external
6//! sources like timers, file watchers, or network connections. The runtime
7//! manages subscription lifecycles automatically based on what the model
8//! declares as active.
9//!
10//! # How it works
11//!
12//! 1. `Model::subscriptions()` returns the set of active subscriptions
13//! 2. After each `update()`, the runtime compares active vs previous subscriptions
14//! 3. New subscriptions are started, removed ones are stopped
15//! 4. Subscription messages are routed through `Model::update()`
16
17use crate::cancellation::{CancellationSource, CancellationToken};
18use std::collections::HashSet;
19use std::sync::mpsc;
20use std::thread;
21use web_time::{Duration, Instant};
22
23/// A unique identifier for a subscription.
24///
25/// Used by the runtime to track which subscriptions are active and
26/// to deduplicate subscriptions across update cycles.
27pub type SubId = u64;
28
29/// A subscription produces messages from an external event source.
30///
31/// Subscriptions run on background threads and send messages through
32/// the provided channel. The runtime manages their lifecycle.
33pub trait Subscription<M: Send + 'static>: Send {
34    /// Unique identifier for deduplication.
35    ///
36    /// Subscriptions with the same ID are considered identical.
37    /// The runtime uses this to avoid restarting unchanged subscriptions.
38    fn id(&self) -> SubId;
39
40    /// Start the subscription, sending messages through the channel.
41    ///
42    /// This is called on a background thread. Implementations should
43    /// loop and send messages until the channel is disconnected (receiver dropped)
44    /// or the stop signal is received.
45    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
46}
47
48/// Signal for stopping a subscription.
49///
50/// When the runtime stops a subscription, it sets this signal. The subscription
51/// should check it periodically and exit its run loop when set.
52///
53/// Backed by [`CancellationToken`] for structured cancellation.
54#[derive(Clone)]
55pub struct StopSignal {
56    token: CancellationToken,
57}
58
59impl StopSignal {
60    /// Create a new stop signal pair (signal, trigger).
61    pub(crate) fn new() -> (Self, StopTrigger) {
62        let source = CancellationSource::new();
63        let signal = Self {
64            token: source.token(),
65        };
66        let trigger = StopTrigger { source };
67        (signal, trigger)
68    }
69
70    /// Check if the stop signal has been triggered.
71    pub fn is_stopped(&self) -> bool {
72        self.token.is_cancelled()
73    }
74
75    /// Wait for either the stop signal or a timeout.
76    ///
77    /// Returns `true` if stopped, `false` if timed out.
78    /// Blocks the thread efficiently using a condition variable.
79    /// Handles spurious wakeups by looping until condition met or timeout expired.
80    pub fn wait_timeout(&self, duration: Duration) -> bool {
81        self.token.wait_timeout(duration)
82    }
83
84    /// Access the underlying cancellation token.
85    ///
86    /// This enables integration with Asupersync-style structured cancellation
87    /// while preserving backwards compatibility with the `StopSignal` API.
88    pub fn cancellation_token(&self) -> &CancellationToken {
89        &self.token
90    }
91}
92
93/// Trigger to stop a subscription from the runtime side.
94///
95/// Backed by [`CancellationSource`] for structured cancellation.
96pub(crate) struct StopTrigger {
97    source: CancellationSource,
98}
99
100impl StopTrigger {
101    /// Signal the subscription to stop.
102    pub(crate) fn stop(&self) {
103        self.source.cancel();
104    }
105}
106
107/// A running subscription handle.
108pub(crate) struct RunningSubscription {
109    pub(crate) id: SubId,
110    trigger: StopTrigger,
111    thread: Option<thread::JoinHandle<()>>,
112    /// Tracks whether the subscription thread panicked (set by the catch_unwind wrapper).
113    panicked: std::sync::Arc<std::sync::atomic::AtomicBool>,
114}
115
116const SUBSCRIPTION_STOP_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
117/// Poll interval for bounded subscription thread joins (bd-1f2aw).
118///
119/// Same rationale as the executor shutdown polls: `JoinHandle` has no
120/// `join_timeout` in stable Rust, so we poll `is_finished()` with a short
121/// sleep. 1ms minimizes stop latency while avoiding spin.
122const SUBSCRIPTION_STOP_JOIN_POLL: Duration = Duration::from_millis(1);
123
124impl RunningSubscription {
125    /// Returns true if the subscription thread panicked.
126    pub(crate) fn has_panicked(&self) -> bool {
127        self.panicked.load(std::sync::atomic::Ordering::Acquire)
128    }
129
130    /// Signal the subscription to stop (phase 1 of two-phase shutdown).
131    ///
132    /// Does NOT join the thread — call [`join_bounded`] after signalling all
133    /// subscriptions to allow parallel wind-down (bd-1f2aw).
134    pub(crate) fn signal_stop(&self) {
135        self.trigger.stop();
136    }
137
138    /// Join the subscription thread with a bounded timeout (phase 2).
139    ///
140    /// Returns the join handle if the thread did not finish within the timeout,
141    /// allowing callers to log and move on without blocking indefinitely.
142    pub(crate) fn join_bounded(mut self) -> Option<thread::JoinHandle<()>> {
143        let handle = self.thread.take()?;
144        let start = Instant::now();
145
146        // Fast path: subscription already finished (common for short-lived subs).
147        if handle.is_finished() {
148            let _ = handle.join();
149            tracing::trace!(
150                sub_id = self.id,
151                panicked = self.has_panicked(),
152                elapsed_us = start.elapsed().as_micros() as u64,
153                "subscription join (fast path)"
154            );
155            return None;
156        }
157
158        // Slow path: bounded poll loop (bd-1f2aw).
159        while !handle.is_finished() {
160            if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
161                tracing::warn!(
162                    sub_id = self.id,
163                    panicked = self.has_panicked(),
164                    timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
165                    "subscription join timed out, detaching thread"
166                );
167                return Some(handle);
168            }
169            thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
170        }
171
172        let _ = handle.join();
173        tracing::trace!(
174            sub_id = self.id,
175            panicked = self.has_panicked(),
176            elapsed_us = start.elapsed().as_micros() as u64,
177            "subscription join (slow path)"
178        );
179        None
180    }
181
182    /// Stop the subscription and join its thread if it exits promptly.
183    ///
184    /// Convenience method combining signal + join for single-subscription stops.
185    /// Used by tests and external callers that stop a single subscription.
186    #[cfg_attr(not(test), allow(dead_code))]
187    pub(crate) fn stop(mut self) {
188        self.trigger.stop();
189        if let Some(handle) = self.thread.take() {
190            let start = Instant::now();
191            // Fast path: subscription already finished (common for short-lived subs).
192            if handle.is_finished() {
193                let _ = handle.join();
194                tracing::trace!(
195                    sub_id = self.id,
196                    panicked = self.has_panicked(),
197                    elapsed_us = start.elapsed().as_micros() as u64,
198                    "subscription stop (fast path)"
199                );
200                return;
201            }
202            // Slow path: bounded poll loop (bd-1f2aw).
203            while !handle.is_finished() {
204                if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
205                    tracing::warn!(
206                        sub_id = self.id,
207                        panicked = self.has_panicked(),
208                        timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
209                        "subscription did not stop within timeout; detaching thread"
210                    );
211                    return;
212                }
213                thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
214            }
215            let _ = handle.join();
216            tracing::trace!(
217                sub_id = self.id,
218                panicked = self.has_panicked(),
219                elapsed_us = start.elapsed().as_micros() as u64,
220                "subscription stop (slow path)"
221            );
222        }
223    }
224}
225
226impl Drop for RunningSubscription {
227    fn drop(&mut self) {
228        self.trigger.stop();
229        // Don't join in drop to avoid blocking
230    }
231}
232
233/// Manages the lifecycle of subscriptions for a program.
234pub(crate) struct SubscriptionManager<M: Send + 'static> {
235    active: Vec<RunningSubscription>,
236    sender: mpsc::Sender<M>,
237    receiver: mpsc::Receiver<M>,
238}
239
240impl<M: Send + 'static> SubscriptionManager<M> {
241    pub(crate) fn new() -> Self {
242        let (sender, receiver) = mpsc::channel();
243        Self {
244            active: Vec::new(),
245            sender,
246            receiver,
247        }
248    }
249
250    /// Update the set of active subscriptions.
251    ///
252    /// Compares the new set against currently running subscriptions:
253    /// - Starts subscriptions that are new (ID not in active set)
254    /// - Stops subscriptions that are no longer declared (ID not in new set)
255    /// - Leaves unchanged subscriptions running
256    pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
257        let reconcile_start = Instant::now();
258        let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
259        let active_count_before = self.active.len();
260
261        crate::debug_trace!(
262            "reconcile: new_ids={:?}, active_before={}",
263            new_ids,
264            active_count_before
265        );
266        tracing::trace!(
267            new_id_count = new_ids.len(),
268            active_before = active_count_before,
269            new_ids = ?new_ids,
270            "subscription reconcile starting"
271        );
272
273        // Stop subscriptions that are no longer active (two-phase: bd-1f2aw).
274        let mut remaining = Vec::new();
275        let mut to_stop = Vec::new();
276        for running in self.active.drain(..) {
277            if new_ids.contains(&running.id) {
278                remaining.push(running);
279            } else {
280                crate::debug_trace!("stopping subscription: id={}", running.id);
281                tracing::debug!(sub_id = running.id, "Stopping subscription");
282                crate::effect_system::record_subscription_stop("subscription", running.id, 0);
283                crate::effect_system::record_dynamics_sub_stop();
284                to_stop.push(running);
285            }
286        }
287        // Phase 1: Signal all removals.
288        for running in &to_stop {
289            running.signal_stop();
290        }
291        // Phase 2: Join with bounded timeout.
292        for running in to_stop {
293            let _ = running.join_bounded();
294        }
295        self.active = remaining;
296
297        // Start new subscriptions
298        let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
299        for sub in subscriptions {
300            let id = sub.id();
301            if !active_ids.insert(id) {
302                continue;
303            }
304
305            crate::debug_trace!("starting subscription: id={}", id);
306            tracing::debug!(sub_id = id, "Starting subscription");
307            crate::effect_system::record_subscription_start("subscription", id);
308            crate::effect_system::record_dynamics_sub_start();
309            let (signal, trigger) = StopSignal::new();
310            let sender = self.sender.clone();
311            let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
312            let panicked_flag = panicked.clone();
313            let sub_id_for_thread = id;
314
315            let thread = thread::spawn(move || {
316                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
317                    sub.run(sender, signal);
318                }));
319                if let Err(payload) = result {
320                    panicked_flag.store(true, std::sync::atomic::Ordering::Release);
321                    crate::effect_system::record_dynamics_sub_panic();
322                    let panic_msg = match payload.downcast_ref::<&str>() {
323                        Some(s) => (*s).to_string(),
324                        None => match payload.downcast_ref::<String>() {
325                            Some(s) => s.clone(),
326                            None => "unknown panic payload".to_string(),
327                        },
328                    };
329                    crate::effect_system::error_effect_panic(
330                        "subscription",
331                        &format!("sub_id={sub_id_for_thread}: {panic_msg}"),
332                    );
333                }
334            });
335
336            self.active.push(RunningSubscription {
337                id,
338                trigger,
339                thread: Some(thread),
340                panicked,
341            });
342        }
343
344        let active_count_after = self.active.len();
345        let reconcile_elapsed_us = reconcile_start.elapsed().as_micros() as u64;
346        crate::effect_system::record_dynamics_reconcile(reconcile_elapsed_us);
347        crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
348        tracing::trace!(
349            active_before = active_count_before,
350            active_after = active_count_after,
351            started = active_count_after.saturating_sub(active_count_before),
352            stopped = active_count_before.saturating_sub(active_count_after),
353            reconcile_us = reconcile_elapsed_us,
354            "subscription reconcile complete"
355        );
356    }
357
358    /// Drain pending messages from subscriptions.
359    pub(crate) fn drain_messages(&self) -> Vec<M> {
360        let mut messages = Vec::new();
361        while let Ok(msg) = self.receiver.try_recv() {
362            messages.push(msg);
363        }
364        messages
365    }
366
367    /// Return the number of active subscriptions.
368    #[inline]
369    pub(crate) fn active_count(&self) -> usize {
370        self.active.len()
371    }
372
373    /// Stop all running subscriptions using two-phase parallel shutdown (bd-1f2aw).
374    ///
375    /// Phase 1: Signal all subscriptions to stop (non-blocking).
376    /// Phase 2: Join all threads with bounded timeout.
377    ///
378    /// This is significantly faster than sequential stop when multiple
379    /// subscriptions are active, because all threads begin winding down
380    /// simultaneously rather than waiting for each to finish in turn.
381    pub(crate) fn stop_all(&mut self) {
382        let count = self.active.len();
383        if count == 0 {
384            return;
385        }
386        let start = Instant::now();
387
388        // Phase 1: Signal all subscriptions to stop (parallel).
389        for running in &self.active {
390            running.signal_stop();
391        }
392
393        let signal_elapsed_us = start.elapsed().as_micros() as u64;
394        tracing::trace!(
395            target: "ftui.runtime",
396            count,
397            signal_elapsed_us,
398            "subscription stop_all phase 1 (signal) complete"
399        );
400
401        // Phase 2: Join all threads with bounded timeout.
402        let mut panicked_count = 0_usize;
403        let mut timed_out_count = 0_usize;
404        for running in self.active.drain(..) {
405            if running.has_panicked() {
406                panicked_count += 1;
407            }
408            if running.join_bounded().is_some() {
409                timed_out_count += 1;
410            }
411        }
412
413        let shutdown_elapsed_us = start.elapsed().as_micros() as u64;
414        crate::effect_system::record_dynamics_shutdown(shutdown_elapsed_us, timed_out_count as u64);
415        tracing::debug!(
416            target: "ftui.runtime",
417            count,
418            panicked_count,
419            timed_out_count,
420            elapsed_us = shutdown_elapsed_us,
421            "subscription stop_all complete"
422        );
423    }
424}
425
426impl<M: Send + 'static> Drop for SubscriptionManager<M> {
427    fn drop(&mut self) {
428        self.stop_all();
429    }
430}
431
432// --- Built-in subscriptions ---
433
434/// A subscription that fires at a fixed interval.
435///
436/// # Example
437///
438/// ```ignore
439/// fn subscriptions(&self) -> Vec<Box<dyn Subscription<MyMsg>>> {
440///     vec![Box::new(Every::new(Duration::from_secs(1), || MyMsg::Tick))]
441/// }
442/// ```
443pub struct Every<M: Send + 'static> {
444    id: SubId,
445    interval: Duration,
446    make_msg: Box<dyn Fn() -> M + Send + Sync>,
447}
448
449impl<M: Send + 'static> Every<M> {
450    /// Create a tick subscription with the given interval and message factory.
451    pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
452        // Generate a stable ID from the interval to allow deduplication
453        let id = interval.as_nanos() as u64 ^ 0x5449_434B; // "TICK" magic
454        Self {
455            id,
456            interval,
457            make_msg: Box::new(make_msg),
458        }
459    }
460
461    /// Create a tick subscription with an explicit ID.
462    pub fn with_id(
463        id: SubId,
464        interval: Duration,
465        make_msg: impl Fn() -> M + Send + Sync + 'static,
466    ) -> Self {
467        Self {
468            id,
469            interval,
470            make_msg: Box::new(make_msg),
471        }
472    }
473}
474
475impl<M: Send + 'static> Subscription<M> for Every<M> {
476    fn id(&self) -> SubId {
477        self.id
478    }
479
480    fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
481        let mut tick_count: u64 = 0;
482        crate::debug_trace!(
483            "Every subscription started: id={}, interval={:?}",
484            self.id,
485            self.interval
486        );
487        loop {
488            if stop.wait_timeout(self.interval) {
489                crate::debug_trace!(
490                    "Every subscription stopped: id={}, sent {} ticks",
491                    self.id,
492                    tick_count
493                );
494                break;
495            }
496            tick_count += 1;
497            let msg = (self.make_msg)();
498            if sender.send(msg).is_err() {
499                crate::debug_trace!(
500                    "Every subscription channel closed: id={}, sent {} ticks",
501                    self.id,
502                    tick_count
503                );
504                break;
505            }
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[derive(Debug, Clone, PartialEq)]
515    enum TestMsg {
516        Tick,
517        Value(i32),
518    }
519
520    struct ChannelSubscription<M: Send + 'static> {
521        id: SubId,
522        receiver: mpsc::Receiver<M>,
523        poll: Duration,
524    }
525
526    impl<M: Send + 'static> ChannelSubscription<M> {
527        fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
528            Self {
529                id,
530                receiver,
531                poll: Duration::from_millis(5),
532            }
533        }
534    }
535
536    impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
537        fn id(&self) -> SubId {
538            self.id
539        }
540
541        fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
542            loop {
543                if stop.is_stopped() {
544                    break;
545                }
546                match self.receiver.recv_timeout(self.poll) {
547                    Ok(msg) => {
548                        if sender.send(msg).is_err() {
549                            break;
550                        }
551                    }
552                    Err(mpsc::RecvTimeoutError::Timeout) => {}
553                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
554                }
555            }
556        }
557    }
558
559    fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
560        let (tx, rx) = mpsc::channel();
561        (ChannelSubscription::new(id, rx), tx)
562    }
563
564    #[test]
565    fn stop_signal_starts_false() {
566        let (signal, _trigger) = StopSignal::new();
567        assert!(!signal.is_stopped());
568    }
569
570    #[test]
571    fn stop_signal_becomes_true_after_trigger() {
572        let (signal, trigger) = StopSignal::new();
573        trigger.stop();
574        assert!(signal.is_stopped());
575    }
576
577    #[test]
578    fn stop_signal_wait_returns_true_when_stopped() {
579        let (signal, trigger) = StopSignal::new();
580        trigger.stop();
581        assert!(signal.wait_timeout(Duration::from_millis(100)));
582    }
583
584    #[test]
585    fn stop_signal_wait_returns_false_on_timeout() {
586        let (signal, _trigger) = StopSignal::new();
587        assert!(!signal.wait_timeout(Duration::from_millis(10)));
588    }
589
590    #[test]
591    fn channel_subscription_forwards_messages() {
592        let (sub, event_tx) = channel_subscription(1);
593        let (tx, rx) = mpsc::channel();
594        let (signal, trigger) = StopSignal::new();
595
596        let handle = thread::spawn(move || {
597            sub.run(tx, signal);
598        });
599
600        event_tx.send(TestMsg::Value(1)).unwrap();
601        event_tx.send(TestMsg::Value(2)).unwrap();
602        thread::sleep(Duration::from_millis(10));
603        trigger.stop();
604        handle.join().unwrap();
605
606        let msgs: Vec<_> = rx.try_iter().collect();
607        assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
608    }
609
610    #[test]
611    fn every_subscription_fires() {
612        let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
613        let (tx, rx) = mpsc::channel();
614        let (signal, trigger) = StopSignal::new();
615
616        let handle = thread::spawn(move || {
617            sub.run(tx, signal);
618        });
619
620        // Wait for a few ticks
621        thread::sleep(Duration::from_millis(50));
622        trigger.stop();
623        handle.join().unwrap();
624
625        let msgs: Vec<_> = rx.try_iter().collect();
626        assert!(!msgs.is_empty(), "Should have received at least one tick");
627        assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
628    }
629
630    #[test]
631    fn every_subscription_uses_stable_id() {
632        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
633        let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
634        assert_eq!(sub1.id(), sub2.id());
635    }
636
637    #[test]
638    fn every_subscription_different_intervals_different_ids() {
639        let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
640        let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
641        assert_ne!(sub1.id(), sub2.id());
642    }
643
644    #[test]
645    fn subscription_manager_starts_subscriptions() {
646        let mut mgr = SubscriptionManager::<TestMsg>::new();
647        let (sub, event_tx) = channel_subscription(1);
648        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
649
650        mgr.reconcile(subs);
651        event_tx.send(TestMsg::Value(42)).unwrap();
652
653        // Give the thread a moment to send
654        thread::sleep(Duration::from_millis(20));
655
656        let msgs = mgr.drain_messages();
657        assert_eq!(msgs, vec![TestMsg::Value(42)]);
658    }
659
660    #[test]
661    fn subscription_manager_dedupes_duplicate_ids() {
662        let mut mgr = SubscriptionManager::<TestMsg>::new();
663        let (sub_a, tx_a) = channel_subscription(7);
664        let (sub_b, tx_b) = channel_subscription(7);
665        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
666
667        mgr.reconcile(subs);
668
669        tx_a.send(TestMsg::Value(1)).unwrap();
670        assert!(
671            tx_b.send(TestMsg::Value(2)).is_err(),
672            "Duplicate subscription should be dropped"
673        );
674
675        thread::sleep(Duration::from_millis(20));
676        let msgs = mgr.drain_messages();
677        assert_eq!(msgs, vec![TestMsg::Value(1)]);
678    }
679
680    #[test]
681    fn subscription_manager_stops_removed() {
682        let mut mgr = SubscriptionManager::<TestMsg>::new();
683
684        // Start with one subscription
685        mgr.reconcile(vec![Box::new(Every::with_id(
686            99,
687            Duration::from_millis(5),
688            || TestMsg::Tick,
689        ))]);
690
691        thread::sleep(Duration::from_millis(20));
692        let msgs_before = mgr.drain_messages();
693        assert!(!msgs_before.is_empty());
694
695        // Remove it
696        mgr.reconcile(vec![]);
697
698        // Drain any remaining buffered messages
699        thread::sleep(Duration::from_millis(20));
700        let _ = mgr.drain_messages();
701
702        // After stopping, no more messages should arrive
703        thread::sleep(Duration::from_millis(30));
704        let msgs_after = mgr.drain_messages();
705        assert!(
706            msgs_after.is_empty(),
707            "Should stop receiving after reconcile with empty set"
708        );
709    }
710
711    #[test]
712    fn subscription_manager_keeps_unchanged() {
713        let mut mgr = SubscriptionManager::<TestMsg>::new();
714
715        // Start subscription
716        mgr.reconcile(vec![Box::new(Every::with_id(
717            50,
718            Duration::from_millis(10),
719            || TestMsg::Tick,
720        ))]);
721
722        thread::sleep(Duration::from_millis(30));
723        let _ = mgr.drain_messages();
724
725        // Reconcile with same ID - should keep running
726        mgr.reconcile(vec![Box::new(Every::with_id(
727            50,
728            Duration::from_millis(10),
729            || TestMsg::Tick,
730        ))]);
731
732        thread::sleep(Duration::from_millis(30));
733        let msgs = mgr.drain_messages();
734        assert!(!msgs.is_empty(), "Subscription should still be running");
735    }
736
737    #[test]
738    fn subscription_manager_stop_all() {
739        let mut mgr = SubscriptionManager::<TestMsg>::new();
740
741        mgr.reconcile(vec![
742            Box::new(Every::with_id(1, Duration::from_millis(5), || {
743                TestMsg::Value(1)
744            })),
745            Box::new(Every::with_id(2, Duration::from_millis(5), || {
746                TestMsg::Value(2)
747            })),
748        ]);
749
750        thread::sleep(Duration::from_millis(20));
751        mgr.stop_all();
752
753        thread::sleep(Duration::from_millis(20));
754        let _ = mgr.drain_messages();
755        thread::sleep(Duration::from_millis(30));
756        let msgs = mgr.drain_messages();
757        assert!(msgs.is_empty());
758    }
759
760    // =========================================================================
761    // ADDITIONAL TESTS - Cmd sequencing + Subscriptions (bd-2nu8.10.2)
762    // =========================================================================
763
764    #[test]
765    fn stop_signal_is_cloneable() {
766        let (signal, trigger) = StopSignal::new();
767        let signal_clone = signal.clone();
768
769        assert!(!signal.is_stopped());
770        assert!(!signal_clone.is_stopped());
771
772        trigger.stop();
773
774        assert!(signal.is_stopped());
775        assert!(signal_clone.is_stopped());
776    }
777
778    #[test]
779    fn stop_signal_wait_wakes_immediately_when_already_stopped() {
780        let (signal, trigger) = StopSignal::new();
781        trigger.stop();
782
783        // Should return immediately, not wait for timeout
784        let start = Instant::now();
785        let stopped = signal.wait_timeout(Duration::from_secs(10));
786        let elapsed = start.elapsed();
787
788        assert!(stopped);
789        assert!(elapsed < Duration::from_millis(100));
790    }
791
792    #[test]
793    fn stop_signal_wait_is_interrupted_by_trigger() {
794        let (signal, trigger) = StopSignal::new();
795
796        let signal_clone = signal.clone();
797        let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
798
799        // Give thread time to start waiting
800        thread::sleep(Duration::from_millis(20));
801        trigger.stop();
802
803        let stopped = handle.join().unwrap();
804        assert!(stopped);
805    }
806
807    #[test]
808    fn channel_subscription_no_messages_without_events() {
809        let (sub, _event_tx) = channel_subscription(1);
810        let (tx, rx) = mpsc::channel();
811        let (signal, trigger) = StopSignal::new();
812
813        let handle = thread::spawn(move || {
814            sub.run(tx, signal);
815        });
816
817        thread::sleep(Duration::from_millis(10));
818        trigger.stop();
819        handle.join().unwrap();
820
821        let msgs: Vec<_> = rx.try_iter().collect();
822        assert!(msgs.is_empty());
823    }
824
825    #[test]
826    fn channel_subscription_id_is_preserved() {
827        let (sub, _tx) = channel_subscription(42);
828        assert_eq!(sub.id(), 42);
829    }
830
831    #[test]
832    fn channel_subscription_stops_on_disconnected_receiver() {
833        let (sub, event_tx) = channel_subscription(1);
834        let (tx, _rx) = mpsc::channel();
835        let (signal, _trigger) = StopSignal::new();
836
837        drop(event_tx);
838
839        let handle = thread::spawn(move || {
840            sub.run(tx, signal);
841        });
842
843        let result = handle.join();
844        assert!(result.is_ok());
845    }
846
847    #[test]
848    fn every_with_id_preserves_custom_id() {
849        let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
850        assert_eq!(sub.id(), 12345);
851    }
852
853    #[test]
854    fn every_stops_on_disconnected_receiver() {
855        let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
856        let (tx, rx) = mpsc::channel();
857        let (signal, _trigger) = StopSignal::new();
858
859        // Drop receiver before running
860        drop(rx);
861
862        // Should exit the loop when send fails
863        let handle = thread::spawn(move || {
864            sub.run(tx, signal);
865        });
866
867        // Should complete quickly, not hang
868        let result = handle.join();
869        assert!(result.is_ok());
870    }
871
872    #[test]
873    fn every_respects_interval() {
874        let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
875        let (tx, rx) = mpsc::channel();
876        let (signal, trigger) = StopSignal::new();
877
878        let start = Instant::now();
879        let handle = thread::spawn(move || {
880            sub.run(tx, signal);
881        });
882
883        // Wait for 3 ticks worth of time
884        thread::sleep(Duration::from_millis(160));
885        trigger.stop();
886        handle.join().unwrap();
887
888        let msgs: Vec<_> = rx.try_iter().collect();
889        let elapsed = start.elapsed();
890
891        // Should have approximately 3 ticks (at 50ms intervals over 160ms)
892        assert!(
893            msgs.len() >= 2,
894            "Expected at least 2 ticks, got {}",
895            msgs.len()
896        );
897        assert!(
898            msgs.len() <= 4,
899            "Expected at most 4 ticks, got {}",
900            msgs.len()
901        );
902        assert!(elapsed >= Duration::from_millis(150));
903    }
904
905    #[test]
906    fn subscription_manager_empty_reconcile() {
907        let mut mgr = SubscriptionManager::<TestMsg>::new();
908
909        // Reconcile with empty list should not panic
910        mgr.reconcile(vec![]);
911        let msgs = mgr.drain_messages();
912        assert!(msgs.is_empty());
913    }
914
915    #[test]
916    fn subscription_manager_drain_messages_returns_all() {
917        let mut mgr = SubscriptionManager::<TestMsg>::new();
918        let (sub, event_tx) = channel_subscription(1);
919        let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
920
921        mgr.reconcile(subs);
922        event_tx.send(TestMsg::Value(1)).unwrap();
923        event_tx.send(TestMsg::Value(2)).unwrap();
924        thread::sleep(Duration::from_millis(20));
925
926        let msgs = mgr.drain_messages();
927        assert_eq!(msgs.len(), 2);
928        assert_eq!(msgs[0], TestMsg::Value(1));
929        assert_eq!(msgs[1], TestMsg::Value(2));
930
931        // Second drain should be empty
932        let msgs2 = mgr.drain_messages();
933        assert!(msgs2.is_empty());
934    }
935
936    #[test]
937    fn subscription_manager_replaces_subscription_with_different_id() {
938        let mut mgr = SubscriptionManager::<TestMsg>::new();
939        let (sub1, tx1) = channel_subscription(1);
940
941        // Start with ID 1
942        mgr.reconcile(vec![Box::new(sub1)]);
943        tx1.send(TestMsg::Value(1)).unwrap();
944        thread::sleep(Duration::from_millis(20));
945        let msgs1 = mgr.drain_messages();
946        assert_eq!(msgs1, vec![TestMsg::Value(1)]);
947
948        // Replace with ID 2
949        let (sub2, tx2) = channel_subscription(2);
950        mgr.reconcile(vec![Box::new(sub2)]);
951        tx2.send(TestMsg::Value(2)).unwrap();
952        thread::sleep(Duration::from_millis(20));
953        let msgs2 = mgr.drain_messages();
954        assert_eq!(msgs2, vec![TestMsg::Value(2)]);
955    }
956
957    #[test]
958    fn subscription_manager_multiple_subscriptions() {
959        let mut mgr = SubscriptionManager::<TestMsg>::new();
960        let (sub1, tx1) = channel_subscription(1);
961        let (sub2, tx2) = channel_subscription(2);
962        let (sub3, tx3) = channel_subscription(3);
963        let subs: Vec<Box<dyn Subscription<TestMsg>>> =
964            vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
965
966        mgr.reconcile(subs);
967        tx1.send(TestMsg::Value(10)).unwrap();
968        tx2.send(TestMsg::Value(20)).unwrap();
969        tx3.send(TestMsg::Value(30)).unwrap();
970        thread::sleep(Duration::from_millis(30));
971
972        let mut msgs = mgr.drain_messages();
973        msgs.sort_by_key(|m| match m {
974            TestMsg::Value(v) => *v,
975            _ => 0,
976        });
977
978        assert_eq!(msgs.len(), 3);
979        assert_eq!(msgs[0], TestMsg::Value(10));
980        assert_eq!(msgs[1], TestMsg::Value(20));
981        assert_eq!(msgs[2], TestMsg::Value(30));
982    }
983
984    #[test]
985    fn subscription_manager_partial_update() {
986        let mut mgr = SubscriptionManager::<TestMsg>::new();
987
988        // Start with 3 subscriptions
989        mgr.reconcile(vec![
990            Box::new(Every::with_id(1, Duration::from_millis(10), || {
991                TestMsg::Value(1)
992            })),
993            Box::new(Every::with_id(2, Duration::from_millis(10), || {
994                TestMsg::Value(2)
995            })),
996            Box::new(Every::with_id(3, Duration::from_millis(10), || {
997                TestMsg::Value(3)
998            })),
999        ]);
1000
1001        thread::sleep(Duration::from_millis(30));
1002        let _ = mgr.drain_messages();
1003
1004        // Remove subscription 2, keep 1 and 3
1005        mgr.reconcile(vec![
1006            Box::new(Every::with_id(1, Duration::from_millis(10), || {
1007                TestMsg::Value(1)
1008            })),
1009            Box::new(Every::with_id(3, Duration::from_millis(10), || {
1010                TestMsg::Value(3)
1011            })),
1012        ]);
1013
1014        // Drain any in-flight messages that were sent before the stop signal was processed.
1015        // This clears the race window between stop signal and message send.
1016        let _ = mgr.drain_messages();
1017
1018        // Now wait for new messages from the remaining subscriptions
1019        thread::sleep(Duration::from_millis(30));
1020        let msgs = mgr.drain_messages();
1021
1022        // Should only have values 1 and 3, not 2
1023        let values: Vec<i32> = msgs
1024            .iter()
1025            .filter_map(|m| match m {
1026                TestMsg::Value(v) => Some(*v),
1027                _ => None,
1028            })
1029            .collect();
1030
1031        assert!(
1032            values.contains(&1),
1033            "Should still receive from subscription 1"
1034        );
1035        assert!(
1036            values.contains(&3),
1037            "Should still receive from subscription 3"
1038        );
1039        assert!(
1040            !values.contains(&2),
1041            "Should not receive from stopped subscription 2"
1042        );
1043    }
1044
1045    #[test]
1046    fn subscription_manager_drop_stops_all() {
1047        let (_signal, _) = StopSignal::new();
1048        let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1049        let flag_clone = flag.clone();
1050
1051        struct FlagSubscription {
1052            id: SubId,
1053            flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
1054        }
1055
1056        impl Subscription<TestMsg> for FlagSubscription {
1057            fn id(&self) -> SubId {
1058                self.id
1059            }
1060
1061            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1062                while !stop.is_stopped() {
1063                    thread::sleep(Duration::from_millis(5));
1064                }
1065                self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
1066            }
1067        }
1068
1069        {
1070            let mut mgr = SubscriptionManager::<TestMsg>::new();
1071            mgr.reconcile(vec![Box::new(FlagSubscription {
1072                id: 1,
1073                flag: flag_clone,
1074            })]);
1075
1076            thread::sleep(Duration::from_millis(20));
1077            // mgr drops here, should stop all subscriptions
1078        }
1079
1080        thread::sleep(Duration::from_millis(50));
1081        assert!(
1082            flag.load(std::sync::atomic::Ordering::SeqCst),
1083            "Subscription should have stopped on drop"
1084        );
1085    }
1086
1087    #[test]
1088    fn running_subscription_stop_joins_thread() {
1089        use std::sync::atomic::{AtomicBool, Ordering};
1090
1091        let completed = std::sync::Arc::new(AtomicBool::new(false));
1092        let completed_clone = completed.clone();
1093
1094        let (signal, trigger) = StopSignal::new();
1095        let (_tx, _rx) = mpsc::channel::<TestMsg>();
1096
1097        let thread = thread::spawn(move || {
1098            while !signal.is_stopped() {
1099                thread::sleep(Duration::from_millis(5));
1100            }
1101            completed_clone.store(true, Ordering::SeqCst);
1102        });
1103
1104        let running = RunningSubscription {
1105            id: 1,
1106            trigger,
1107            thread: Some(thread),
1108            panicked: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1109        };
1110
1111        running.stop();
1112        assert!(completed.load(Ordering::SeqCst));
1113    }
1114
1115    #[test]
1116    fn running_subscription_stop_times_out_for_uncooperative_thread() {
1117        use std::sync::atomic::{AtomicBool, Ordering};
1118
1119        let completed = std::sync::Arc::new(AtomicBool::new(false));
1120        let completed_clone = completed.clone();
1121
1122        let (_signal, trigger) = StopSignal::new();
1123        let thread = thread::spawn(move || {
1124            thread::sleep(Duration::from_millis(500));
1125            completed_clone.store(true, Ordering::SeqCst);
1126        });
1127
1128        let running = RunningSubscription {
1129            id: 7,
1130            trigger,
1131            thread: Some(thread),
1132            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1133        };
1134
1135        let start = Instant::now();
1136        running.stop();
1137        assert!(
1138            start.elapsed() < Duration::from_millis(400),
1139            "stop() should not block behind an uncooperative subscription thread"
1140        );
1141
1142        thread::sleep(Duration::from_millis(550));
1143        assert!(completed.load(Ordering::SeqCst));
1144    }
1145
1146    #[test]
1147    fn every_id_stable_across_instances() {
1148        // Same interval should produce same ID
1149        let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1150        let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1151        let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
1152
1153        assert_eq!(sub1.id(), sub2.id());
1154        assert_eq!(sub2.id(), sub3.id()); // ID is based on interval, not message factory
1155    }
1156
1157    #[test]
1158    fn drain_messages_preserves_order() {
1159        let mut mgr = SubscriptionManager::<TestMsg>::new();
1160
1161        // Use a custom subscription that sends messages in order
1162        struct OrderedSubscription {
1163            values: Vec<i32>,
1164        }
1165
1166        impl Subscription<TestMsg> for OrderedSubscription {
1167            fn id(&self) -> SubId {
1168                999
1169            }
1170
1171            fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1172                for v in &self.values {
1173                    let _ = sender.send(TestMsg::Value(*v));
1174                    thread::sleep(Duration::from_millis(1));
1175                }
1176            }
1177        }
1178
1179        mgr.reconcile(vec![Box::new(OrderedSubscription {
1180            values: vec![1, 2, 3, 4, 5],
1181        })]);
1182
1183        thread::sleep(Duration::from_millis(30));
1184        let msgs = mgr.drain_messages();
1185
1186        let values: Vec<i32> = msgs
1187            .iter()
1188            .filter_map(|m| match m {
1189                TestMsg::Value(v) => Some(*v),
1190                _ => None,
1191            })
1192            .collect();
1193
1194        assert_eq!(values, vec![1, 2, 3, 4, 5]);
1195    }
1196
1197    #[test]
1198    fn subscription_manager_new_is_empty() {
1199        let mgr = SubscriptionManager::<TestMsg>::new();
1200        let msgs = mgr.drain_messages();
1201        assert!(msgs.is_empty());
1202    }
1203
1204    // =========================================================================
1205    // LIFECYCLE CONTRACT TESTS (bd-1dg21)
1206    //
1207    // These tests capture the observable behavioral contract of the subscription
1208    // system that MUST be preserved during the Asupersync migration. Each test
1209    // documents a specific guarantee that downstream code relies on.
1210    // =========================================================================
1211
1212    /// CONTRACT: StopSignal backed by CancellationToken must remain functional
1213    /// even after concurrent thread panics. The AtomicBool-based implementation
1214    /// is inherently poison-resistant.
1215    #[test]
1216    fn contract_stop_signal_resilient_to_thread_panics() {
1217        let (signal, trigger) = StopSignal::new();
1218        let signal_clone = signal.clone();
1219
1220        // Panic in a thread that holds a clone of the signal
1221        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1222            assert!(!signal_clone.is_stopped());
1223            panic!("intentional panic while holding signal clone");
1224        }));
1225        assert!(result.is_err());
1226
1227        // Signal must still be checkable and triggerable after thread panic
1228        assert!(
1229            !signal.is_stopped(),
1230            "signal should still report not-stopped"
1231        );
1232        trigger.stop();
1233        assert!(
1234            signal.is_stopped(),
1235            "signal should report stopped after trigger"
1236        );
1237        assert!(
1238            signal.wait_timeout(Duration::from_millis(10)),
1239            "wait_timeout should return true when stopped"
1240        );
1241    }
1242
1243    /// CONTRACT: StopSignal exposes its underlying CancellationToken for
1244    /// Asupersync integration.
1245    #[test]
1246    fn contract_stop_signal_exposes_cancellation_token() {
1247        let (signal, trigger) = StopSignal::new();
1248        let token = signal.cancellation_token();
1249        assert!(!token.is_cancelled(), "token should start uncancelled");
1250        trigger.stop();
1251        assert!(token.is_cancelled(), "token should be cancelled after stop");
1252    }
1253
1254    /// CONTRACT: stop_all() must complete within a bounded time even if subscription
1255    /// threads are uncooperative. The 250ms join timeout per subscription is the
1256    /// upper bound.
1257    #[test]
1258    fn contract_stop_all_bounded_time_with_uncooperative_subscriptions() {
1259        let mut mgr = SubscriptionManager::<TestMsg>::new();
1260
1261        // Create subscriptions that ignore the stop signal
1262        struct UncooperativeSub {
1263            id: SubId,
1264        }
1265
1266        impl Subscription<TestMsg> for UncooperativeSub {
1267            fn id(&self) -> SubId {
1268                self.id
1269            }
1270
1271            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1272                // Ignore stop signal entirely, sleep for a long time
1273                thread::sleep(Duration::from_secs(5));
1274            }
1275        }
1276
1277        mgr.reconcile(vec![
1278            Box::new(UncooperativeSub { id: 100 }),
1279            Box::new(UncooperativeSub { id: 200 }),
1280        ]);
1281
1282        thread::sleep(Duration::from_millis(20)); // let threads start
1283
1284        let start = Instant::now();
1285        mgr.stop_all();
1286        let elapsed = start.elapsed();
1287
1288        // 2 subscriptions * 250ms timeout each = 500ms max, plus some margin
1289        assert!(
1290            elapsed < Duration::from_millis(800),
1291            "stop_all took {elapsed:?}, expected < 800ms for 2 uncooperative subscriptions"
1292        );
1293    }
1294
1295    /// CONTRACT: reconcile() must not start a new subscription for an ID that is
1296    /// already active, even if the subscription object is different.
1297    #[test]
1298    fn contract_reconcile_deduplicates_by_id_not_identity() {
1299        let mut mgr = SubscriptionManager::<TestMsg>::new();
1300        let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1301
1302        struct CountingSub {
1303            id: SubId,
1304            counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
1305        }
1306
1307        impl Subscription<TestMsg> for CountingSub {
1308            fn id(&self) -> SubId {
1309                self.id
1310            }
1311
1312            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1313                self.counter
1314                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1315                while !stop.is_stopped() {
1316                    thread::sleep(Duration::from_millis(5));
1317                }
1318            }
1319        }
1320
1321        // First reconcile starts one thread
1322        mgr.reconcile(vec![Box::new(CountingSub {
1323            id: 42,
1324            counter: counter.clone(),
1325        })]);
1326        thread::sleep(Duration::from_millis(20));
1327        assert_eq!(
1328            counter.load(std::sync::atomic::Ordering::SeqCst),
1329            1,
1330            "first reconcile should start exactly 1 thread"
1331        );
1332
1333        // Second reconcile with same ID must NOT start another thread
1334        mgr.reconcile(vec![Box::new(CountingSub {
1335            id: 42,
1336            counter: counter.clone(),
1337        })]);
1338        thread::sleep(Duration::from_millis(20));
1339        assert_eq!(
1340            counter.load(std::sync::atomic::Ordering::SeqCst),
1341            1,
1342            "second reconcile with same ID must not start another thread"
1343        );
1344
1345        mgr.stop_all();
1346    }
1347
1348    /// CONTRACT: When a subscription is removed via reconcile(), messages it sent
1349    /// before being stopped may still be in the channel. drain_messages() must
1350    /// return these buffered messages.
1351    #[test]
1352    fn contract_buffered_messages_available_after_subscription_stopped() {
1353        let mut mgr = SubscriptionManager::<TestMsg>::new();
1354
1355        struct BurstSub;
1356
1357        impl Subscription<TestMsg> for BurstSub {
1358            fn id(&self) -> SubId {
1359                77
1360            }
1361
1362            fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1363                // Send a burst of messages immediately
1364                for i in 0..10 {
1365                    let _ = sender.send(TestMsg::Value(i));
1366                }
1367                // Then wait for stop
1368                while !stop.is_stopped() {
1369                    thread::sleep(Duration::from_millis(5));
1370                }
1371            }
1372        }
1373
1374        mgr.reconcile(vec![Box::new(BurstSub)]);
1375        thread::sleep(Duration::from_millis(30));
1376
1377        // Remove the subscription
1378        mgr.reconcile(vec![]);
1379
1380        // Messages sent before stop should still be drainable
1381        let msgs = mgr.drain_messages();
1382        let values: Vec<i32> = msgs
1383            .iter()
1384            .filter_map(|m| match m {
1385                TestMsg::Value(v) => Some(*v),
1386                _ => None,
1387            })
1388            .collect();
1389
1390        assert!(
1391            values.len() >= 5,
1392            "Expected at least 5 buffered messages after stop, got {}",
1393            values.len()
1394        );
1395    }
1396
1397    /// CONTRACT: active_count() must accurately reflect the number of running
1398    /// subscriptions at all times.
1399    #[test]
1400    fn contract_active_count_tracks_running_subscriptions() {
1401        let mut mgr = SubscriptionManager::<TestMsg>::new();
1402
1403        assert_eq!(mgr.active_count(), 0, "empty manager");
1404
1405        mgr.reconcile(vec![
1406            Box::new(Every::with_id(1, Duration::from_millis(50), || {
1407                TestMsg::Tick
1408            })),
1409            Box::new(Every::with_id(2, Duration::from_millis(50), || {
1410                TestMsg::Tick
1411            })),
1412        ]);
1413        assert_eq!(mgr.active_count(), 2, "after starting 2");
1414
1415        mgr.reconcile(vec![Box::new(Every::with_id(
1416            1,
1417            Duration::from_millis(50),
1418            || TestMsg::Tick,
1419        ))]);
1420        assert_eq!(mgr.active_count(), 1, "after removing 1");
1421
1422        mgr.stop_all();
1423        assert_eq!(mgr.active_count(), 0, "after stop_all");
1424    }
1425
1426    /// CONTRACT: The Every subscription ID must be derived from interval only,
1427    /// not from the message factory closure. Two Every subscriptions with the
1428    /// same interval MUST have the same ID regardless of message content.
1429    #[test]
1430    fn contract_every_id_derived_from_interval_only() {
1431        let sub_a = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1432        let sub_b = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(999));
1433        assert_eq!(
1434            sub_a.id(),
1435            sub_b.id(),
1436            "Every ID must depend only on interval, not message factory"
1437        );
1438
1439        let sub_c = Every::<TestMsg>::new(Duration::from_millis(200), || TestMsg::Tick);
1440        assert_ne!(
1441            sub_a.id(),
1442            sub_c.id(),
1443            "Different intervals must produce different IDs"
1444        );
1445    }
1446
1447    /// CONTRACT: The Every subscription ID formula must remain stable across
1448    /// versions. This captures the exact formula: interval_nanos XOR 0x5449_434B.
1449    #[test]
1450    fn contract_every_id_formula_is_stable() {
1451        let interval = Duration::from_millis(100);
1452        let expected_id = interval.as_nanos() as u64 ^ 0x5449_434B;
1453        let sub = Every::<TestMsg>::new(interval, || TestMsg::Tick);
1454        assert_eq!(
1455            sub.id(),
1456            expected_id,
1457            "Every ID formula must be: interval.as_nanos() as u64 ^ 0x5449_434B"
1458        );
1459    }
1460
1461    /// CONTRACT: Drop on SubscriptionManager must stop all subscriptions.
1462    /// This is the safety net for cleanup even if stop_all() is not called.
1463    #[test]
1464    fn contract_drop_triggers_stop_all() {
1465        use std::sync::atomic::{AtomicUsize, Ordering};
1466
1467        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1468
1469        struct StopCountingSub {
1470            id: SubId,
1471            counter: std::sync::Arc<AtomicUsize>,
1472        }
1473
1474        impl Subscription<TestMsg> for StopCountingSub {
1475            fn id(&self) -> SubId {
1476                self.id
1477            }
1478
1479            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1480                while !stop.is_stopped() {
1481                    thread::sleep(Duration::from_millis(5));
1482                }
1483                self.counter.fetch_add(1, Ordering::SeqCst);
1484            }
1485        }
1486
1487        {
1488            let mut mgr = SubscriptionManager::<TestMsg>::new();
1489            mgr.reconcile(vec![
1490                Box::new(StopCountingSub {
1491                    id: 1,
1492                    counter: stop_count.clone(),
1493                }),
1494                Box::new(StopCountingSub {
1495                    id: 2,
1496                    counter: stop_count.clone(),
1497                }),
1498                Box::new(StopCountingSub {
1499                    id: 3,
1500                    counter: stop_count.clone(),
1501                }),
1502            ]);
1503            thread::sleep(Duration::from_millis(20));
1504            // mgr dropped here
1505        }
1506
1507        // Give threads time to notice stop signal and exit
1508        thread::sleep(Duration::from_millis(400));
1509        assert_eq!(
1510            stop_count.load(std::sync::atomic::Ordering::SeqCst),
1511            3,
1512            "all 3 subscription threads must have observed stop signal on drop"
1513        );
1514    }
1515
1516    /// CONTRACT: SUBSCRIPTION_STOP_JOIN_TIMEOUT must be exactly 250ms.
1517    /// The Asupersync migration must preserve this timeout bound.
1518    #[test]
1519    fn contract_stop_join_timeout_is_250ms() {
1520        assert_eq!(
1521            SUBSCRIPTION_STOP_JOIN_TIMEOUT,
1522            Duration::from_millis(250),
1523            "join timeout must be 250ms"
1524        );
1525        assert_eq!(
1526            SUBSCRIPTION_STOP_JOIN_POLL,
1527            Duration::from_millis(1),
1528            "join poll interval must be 1ms (bd-1f2aw)"
1529        );
1530    }
1531
1532    // =========================================================================
1533    // STRUCTURED LIFECYCLE TESTS (bd-1f2aw)
1534    //
1535    // These tests validate the structured cancellation, panic resilience,
1536    // and parallel shutdown improvements.
1537    // =========================================================================
1538
1539    /// bd-1f2aw: A panicking subscription must not crash the runtime.
1540    /// The panic is caught, the panicked flag is set, and telemetry is emitted.
1541    #[test]
1542    fn lifecycle_panic_in_subscription_is_caught() {
1543        use std::sync::atomic::Ordering;
1544
1545        let mut mgr = SubscriptionManager::<TestMsg>::new();
1546
1547        struct PanickingSub;
1548
1549        impl Subscription<TestMsg> for PanickingSub {
1550            fn id(&self) -> SubId {
1551                0xDEAD
1552            }
1553
1554            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1555                panic!("intentional test panic in subscription");
1556            }
1557        }
1558
1559        mgr.reconcile(vec![Box::new(PanickingSub)]);
1560
1561        // Give the thread time to panic and be caught.
1562        thread::sleep(Duration::from_millis(50));
1563
1564        // The manager should still be functional.
1565        assert_eq!(
1566            mgr.active_count(),
1567            1,
1568            "panicked sub still tracked as active"
1569        );
1570
1571        // The panicked flag should be set.
1572        assert!(
1573            mgr.active[0].panicked.load(Ordering::Acquire),
1574            "panicked flag should be set after subscription panic"
1575        );
1576
1577        // stop_all should not panic even with a panicked subscription.
1578        mgr.stop_all();
1579        assert_eq!(mgr.active_count(), 0);
1580    }
1581
1582    /// bd-1f2aw: A panicking subscription must not prevent other subscriptions
1583    /// from continuing to deliver messages.
1584    #[test]
1585    fn lifecycle_panic_does_not_affect_sibling_subscriptions() {
1586        let mut mgr = SubscriptionManager::<TestMsg>::new();
1587
1588        struct PanickingSub;
1589        impl Subscription<TestMsg> for PanickingSub {
1590            fn id(&self) -> SubId {
1591                0xBAD
1592            }
1593            fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1594                panic!("boom");
1595            }
1596        }
1597
1598        mgr.reconcile(vec![
1599            Box::new(PanickingSub),
1600            Box::new(Every::with_id(42, Duration::from_millis(10), || {
1601                TestMsg::Tick
1602            })),
1603        ]);
1604
1605        // Wait for panic to happen and ticks to arrive.
1606        // The tick subscription fires every 10ms; wait long enough for several.
1607        thread::sleep(Duration::from_millis(100));
1608
1609        let msgs = mgr.drain_messages();
1610        assert!(
1611            !msgs.is_empty(),
1612            "sibling subscription should still deliver messages after a panic in another sub"
1613        );
1614
1615        mgr.stop_all();
1616    }
1617
1618    /// bd-1f2aw: Parallel phased shutdown (stop_all) must be faster than
1619    /// sequential shutdown when multiple subscriptions need to wind down.
1620    #[test]
1621    fn lifecycle_stop_all_parallel_shutdown() {
1622        use std::sync::atomic::{AtomicUsize, Ordering};
1623
1624        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1625        let sub_count = 4;
1626
1627        struct SlowStopSub {
1628            id: SubId,
1629            counter: std::sync::Arc<AtomicUsize>,
1630        }
1631
1632        impl Subscription<TestMsg> for SlowStopSub {
1633            fn id(&self) -> SubId {
1634                self.id
1635            }
1636
1637            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1638                // Wait for stop, then simulate slow cleanup (50ms).
1639                while !stop.is_stopped() {
1640                    thread::sleep(Duration::from_millis(5));
1641                }
1642                thread::sleep(Duration::from_millis(50));
1643                self.counter.fetch_add(1, Ordering::SeqCst);
1644            }
1645        }
1646
1647        let mut mgr = SubscriptionManager::<TestMsg>::new();
1648        let subs: Vec<Box<dyn Subscription<TestMsg>>> = (0..sub_count)
1649            .map(|i| -> Box<dyn Subscription<TestMsg>> {
1650                Box::new(SlowStopSub {
1651                    id: 1000 + i,
1652                    counter: stop_count.clone(),
1653                })
1654            })
1655            .collect();
1656
1657        mgr.reconcile(subs);
1658        thread::sleep(Duration::from_millis(20));
1659
1660        let start = Instant::now();
1661        mgr.stop_all();
1662        let elapsed = start.elapsed();
1663
1664        // With parallel signal, all 4 subs start their 50ms cleanup
1665        // simultaneously. Sequential would take ~200ms (4 * 50ms).
1666        // Parallel should complete in ~50ms + join overhead.
1667        // Use 150ms as a generous bound (well under 200ms sequential).
1668        assert!(
1669            elapsed < Duration::from_millis(150),
1670            "parallel stop_all took {elapsed:?}, expected < 150ms \
1671             (sequential would be ~{expected_sequential}ms)",
1672            expected_sequential = sub_count * 50
1673        );
1674
1675        // All subscriptions should have completed cleanup.
1676        thread::sleep(Duration::from_millis(20));
1677        assert_eq!(
1678            stop_count.load(Ordering::SeqCst),
1679            sub_count as usize,
1680            "all subscriptions should have completed their cleanup"
1681        );
1682    }
1683
1684    /// bd-1f2aw: Two-phase signal+join in reconcile should allow parallel
1685    /// wind-down when removing multiple subscriptions at once.
1686    #[test]
1687    fn lifecycle_reconcile_removal_uses_parallel_stop() {
1688        use std::sync::atomic::{AtomicUsize, Ordering};
1689
1690        let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1691
1692        struct SlowStopSub {
1693            id: SubId,
1694            counter: std::sync::Arc<AtomicUsize>,
1695        }
1696
1697        impl Subscription<TestMsg> for SlowStopSub {
1698            fn id(&self) -> SubId {
1699                self.id
1700            }
1701
1702            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1703                while !stop.is_stopped() {
1704                    thread::sleep(Duration::from_millis(5));
1705                }
1706                thread::sleep(Duration::from_millis(40));
1707                self.counter.fetch_add(1, Ordering::SeqCst);
1708            }
1709        }
1710
1711        let mut mgr = SubscriptionManager::<TestMsg>::new();
1712        mgr.reconcile(vec![
1713            Box::new(SlowStopSub {
1714                id: 2000,
1715                counter: stop_count.clone(),
1716            }),
1717            Box::new(SlowStopSub {
1718                id: 2001,
1719                counter: stop_count.clone(),
1720            }),
1721            Box::new(SlowStopSub {
1722                id: 2002,
1723                counter: stop_count.clone(),
1724            }),
1725        ]);
1726        thread::sleep(Duration::from_millis(20));
1727
1728        // Remove all subscriptions via reconcile.
1729        let start = Instant::now();
1730        mgr.reconcile(vec![]);
1731        let elapsed = start.elapsed();
1732
1733        // Parallel: ~40ms + overhead. Sequential would be ~120ms.
1734        assert!(
1735            elapsed < Duration::from_millis(100),
1736            "reconcile removal took {elapsed:?}, expected < 100ms with parallel stop"
1737        );
1738
1739        thread::sleep(Duration::from_millis(20));
1740        assert_eq!(stop_count.load(Ordering::SeqCst), 3);
1741    }
1742
1743    /// bd-1f2aw: has_panicked() must reflect the actual panic state of the thread.
1744    #[test]
1745    fn lifecycle_has_panicked_tracks_state() {
1746        let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1747        let panicked_flag = panicked.clone();
1748
1749        let (signal, trigger) = StopSignal::new();
1750        let thread = thread::spawn(move || {
1751            signal.wait_timeout(Duration::from_secs(10));
1752        });
1753
1754        let running = RunningSubscription {
1755            id: 999,
1756            trigger,
1757            thread: Some(thread),
1758            panicked,
1759        };
1760
1761        assert!(!running.has_panicked(), "should not be panicked initially");
1762
1763        // Simulate a panic flag (normally set by the catch_unwind wrapper).
1764        panicked_flag.store(true, std::sync::atomic::Ordering::Release);
1765        assert!(running.has_panicked(), "should reflect panicked state");
1766
1767        running.stop();
1768    }
1769
1770    /// bd-1f2aw: signal_stop + join_bounded should work correctly as a two-phase
1771    /// shutdown for individual subscriptions.
1772    #[test]
1773    fn lifecycle_signal_then_join_works() {
1774        use std::sync::atomic::{AtomicBool, Ordering};
1775
1776        let completed = std::sync::Arc::new(AtomicBool::new(false));
1777        let completed_clone = completed.clone();
1778
1779        let (signal, trigger) = StopSignal::new();
1780        let thread = thread::spawn(move || {
1781            while !signal.is_stopped() {
1782                thread::sleep(Duration::from_millis(5));
1783            }
1784            completed_clone.store(true, Ordering::SeqCst);
1785        });
1786
1787        let running = RunningSubscription {
1788            id: 888,
1789            trigger,
1790            thread: Some(thread),
1791            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1792        };
1793
1794        running.signal_stop();
1795        let leftover = running.join_bounded();
1796        assert!(
1797            leftover.is_none(),
1798            "cooperative thread should join within timeout"
1799        );
1800        assert!(
1801            completed.load(Ordering::SeqCst),
1802            "thread should have completed"
1803        );
1804    }
1805
1806    /// bd-1f2aw: join_bounded must return the handle for uncooperative threads.
1807    #[test]
1808    fn lifecycle_join_bounded_returns_handle_for_uncooperative() {
1809        use std::sync::atomic::AtomicBool;
1810
1811        let (_signal, trigger) = StopSignal::new();
1812        let thread = thread::spawn(move || {
1813            thread::sleep(Duration::from_millis(500));
1814        });
1815
1816        let running = RunningSubscription {
1817            id: 777,
1818            trigger,
1819            thread: Some(thread),
1820            panicked: std::sync::Arc::new(AtomicBool::new(false)),
1821        };
1822
1823        running.signal_stop();
1824        let start = Instant::now();
1825        let leftover = running.join_bounded();
1826        let elapsed = start.elapsed();
1827
1828        assert!(
1829            leftover.is_some(),
1830            "uncooperative thread should not join within timeout"
1831        );
1832        assert!(
1833            elapsed < Duration::from_millis(400),
1834            "join_bounded should respect the 250ms timeout, took {elapsed:?}"
1835        );
1836    }
1837
1838    /// bd-1f2aw: Restart semantics — a subscription that was stopped via
1839    /// reconcile can be re-started by including it in a subsequent reconcile.
1840    #[test]
1841    fn lifecycle_restart_after_stop() {
1842        let mut mgr = SubscriptionManager::<TestMsg>::new();
1843
1844        // Start subscription.
1845        mgr.reconcile(vec![Box::new(Every::with_id(
1846            300,
1847            Duration::from_millis(10),
1848            || TestMsg::Tick,
1849        ))]);
1850        thread::sleep(Duration::from_millis(30));
1851        let msgs = mgr.drain_messages();
1852        assert!(!msgs.is_empty(), "should receive ticks");
1853
1854        // Remove it.
1855        mgr.reconcile(vec![]);
1856        thread::sleep(Duration::from_millis(20));
1857        let _ = mgr.drain_messages();
1858        thread::sleep(Duration::from_millis(30));
1859        let msgs = mgr.drain_messages();
1860        assert!(msgs.is_empty(), "should stop receiving after removal");
1861
1862        // Restart with same ID.
1863        mgr.reconcile(vec![Box::new(Every::with_id(
1864            300,
1865            Duration::from_millis(10),
1866            || TestMsg::Value(99),
1867        ))]);
1868        thread::sleep(Duration::from_millis(30));
1869        let msgs = mgr.drain_messages();
1870        assert!(
1871            !msgs.is_empty(),
1872            "should receive messages again after restart"
1873        );
1874        assert!(
1875            msgs.iter().any(|m| matches!(m, TestMsg::Value(99))),
1876            "restarted sub should use the new message factory"
1877        );
1878
1879        mgr.stop_all();
1880    }
1881
1882    /// bd-1f2aw: Non-interference contract — subscriptions communicate
1883    /// exclusively through the mpsc channel. They have no access to terminal
1884    /// state, frame buffers, or render surfaces.
1885    ///
1886    /// This test verifies the architectural invariant by demonstrating that
1887    /// subscription threads only interact with the manager through messages,
1888    /// and that the manager's state is consistent after concurrent operations.
1889    #[test]
1890    fn lifecycle_non_interference_with_manager_state() {
1891        use std::sync::atomic::{AtomicUsize, Ordering};
1892
1893        let msg_count = std::sync::Arc::new(AtomicUsize::new(0));
1894
1895        struct CountingSub {
1896            id: SubId,
1897            counter: std::sync::Arc<AtomicUsize>,
1898        }
1899
1900        impl Subscription<TestMsg> for CountingSub {
1901            fn id(&self) -> SubId {
1902                self.id
1903            }
1904
1905            fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1906                while !stop.is_stopped() {
1907                    if sender.send(TestMsg::Tick).is_err() {
1908                        break;
1909                    }
1910                    self.counter.fetch_add(1, Ordering::SeqCst);
1911                    thread::sleep(Duration::from_millis(5));
1912                }
1913            }
1914        }
1915
1916        let mut mgr = SubscriptionManager::<TestMsg>::new();
1917
1918        // Start multiple subscriptions.
1919        mgr.reconcile(vec![
1920            Box::new(CountingSub {
1921                id: 400,
1922                counter: msg_count.clone(),
1923            }),
1924            Box::new(CountingSub {
1925                id: 401,
1926                counter: msg_count.clone(),
1927            }),
1928        ]);
1929
1930        thread::sleep(Duration::from_millis(50));
1931
1932        // Manager state is consistent while subscriptions are running.
1933        assert_eq!(mgr.active_count(), 2);
1934        let drained = mgr.drain_messages();
1935        let sent_count = msg_count.load(Ordering::SeqCst);
1936        assert!(sent_count > 0, "subscriptions should have sent messages");
1937        assert!(
1938            drained.len() <= sent_count,
1939            "drained {} but only {} sent",
1940            drained.len(),
1941            sent_count
1942        );
1943
1944        // Stop all — manager state is consistent after shutdown.
1945        mgr.stop_all();
1946        assert_eq!(mgr.active_count(), 0);
1947
1948        // Drain remaining buffered messages.
1949        let remaining = mgr.drain_messages();
1950        let total_drained = drained.len() + remaining.len();
1951        let final_sent = msg_count.load(Ordering::SeqCst);
1952        assert!(
1953            total_drained <= final_sent,
1954            "total drained ({total_drained}) must not exceed total sent ({final_sent})"
1955        );
1956    }
1957
1958    /// bd-1f2aw: Shutdown ordering contract — stop_all() must signal all
1959    /// subscriptions before joining any. Verify by checking that all subs
1960    /// observe the stop signal approximately simultaneously.
1961    #[test]
1962    fn lifecycle_shutdown_signal_ordering() {
1963        use std::sync::atomic::{AtomicU64, Ordering};
1964
1965        let signal_times =
1966            std::sync::Arc::new([AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)]);
1967        let epoch = Instant::now();
1968
1969        struct TimingStopSub {
1970            id: SubId,
1971            index: usize,
1972            signal_times: std::sync::Arc<[AtomicU64; 3]>,
1973            epoch: Instant,
1974        }
1975
1976        impl Subscription<TestMsg> for TimingStopSub {
1977            fn id(&self) -> SubId {
1978                self.id
1979            }
1980
1981            fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1982                while !stop.is_stopped() {
1983                    thread::sleep(Duration::from_millis(1));
1984                }
1985                let elapsed_us = self.epoch.elapsed().as_micros() as u64;
1986                self.signal_times[self.index].store(elapsed_us, Ordering::SeqCst);
1987            }
1988        }
1989
1990        let mut mgr = SubscriptionManager::<TestMsg>::new();
1991        mgr.reconcile(vec![
1992            Box::new(TimingStopSub {
1993                id: 500,
1994                index: 0,
1995                signal_times: signal_times.clone(),
1996                epoch,
1997            }),
1998            Box::new(TimingStopSub {
1999                id: 501,
2000                index: 1,
2001                signal_times: signal_times.clone(),
2002                epoch,
2003            }),
2004            Box::new(TimingStopSub {
2005                id: 502,
2006                index: 2,
2007                signal_times: signal_times.clone(),
2008                epoch,
2009            }),
2010        ]);
2011        thread::sleep(Duration::from_millis(20));
2012
2013        mgr.stop_all();
2014
2015        // All three should have observed the stop signal at approximately
2016        // the same time (within 10ms of each other), because phase 1 signals
2017        // all before phase 2 joins any.
2018        let t0 = signal_times[0].load(Ordering::SeqCst);
2019        let t1 = signal_times[1].load(Ordering::SeqCst);
2020        let t2 = signal_times[2].load(Ordering::SeqCst);
2021
2022        assert!(
2023            t0 > 0 && t1 > 0 && t2 > 0,
2024            "all subs should have recorded stop time"
2025        );
2026
2027        let max_t = t0.max(t1).max(t2);
2028        let min_t = t0.min(t1).min(t2);
2029        let spread_us = max_t - min_t;
2030
2031        assert!(
2032            spread_us < 10_000, // 10ms
2033            "stop signal spread should be < 10ms for parallel signaling, got {spread_us}us \
2034             (t0={t0}, t1={t1}, t2={t2})"
2035        );
2036    }
2037}