1#![forbid(unsafe_code)]
2
3use crate::cancellation::{CancellationSource, CancellationToken};
18use std::collections::HashSet;
19use std::sync::mpsc;
20use std::thread;
21use web_time::{Duration, Instant};
22
23pub type SubId = u64;
28
29pub trait Subscription<M: Send + 'static>: Send {
34 fn id(&self) -> SubId;
39
40 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
46}
47
48#[derive(Clone)]
55pub struct StopSignal {
56 token: CancellationToken,
57}
58
59impl StopSignal {
60 pub(crate) fn new() -> (Self, StopTrigger) {
62 let source = CancellationSource::new();
63 let signal = Self {
64 token: source.token(),
65 };
66 let trigger = StopTrigger { source };
67 (signal, trigger)
68 }
69
70 pub fn is_stopped(&self) -> bool {
72 self.token.is_cancelled()
73 }
74
75 pub fn wait_timeout(&self, duration: Duration) -> bool {
81 self.token.wait_timeout(duration)
82 }
83
84 pub fn cancellation_token(&self) -> &CancellationToken {
89 &self.token
90 }
91}
92
93pub(crate) struct StopTrigger {
97 source: CancellationSource,
98}
99
100impl StopTrigger {
101 pub(crate) fn stop(&self) {
103 self.source.cancel();
104 }
105}
106
107pub(crate) struct RunningSubscription {
109 pub(crate) id: SubId,
110 trigger: StopTrigger,
111 thread: Option<thread::JoinHandle<()>>,
112 panicked: std::sync::Arc<std::sync::atomic::AtomicBool>,
114}
115
116const SUBSCRIPTION_STOP_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
117const SUBSCRIPTION_STOP_JOIN_POLL: Duration = Duration::from_millis(1);
123
124impl RunningSubscription {
125 pub(crate) fn has_panicked(&self) -> bool {
127 self.panicked.load(std::sync::atomic::Ordering::Acquire)
128 }
129
130 pub(crate) fn signal_stop(&self) {
135 self.trigger.stop();
136 }
137
138 pub(crate) fn join_bounded(mut self) -> Option<thread::JoinHandle<()>> {
143 let handle = self.thread.take()?;
144 let start = Instant::now();
145
146 if handle.is_finished() {
148 let _ = handle.join();
149 tracing::trace!(
150 sub_id = self.id,
151 panicked = self.has_panicked(),
152 elapsed_us = start.elapsed().as_micros() as u64,
153 "subscription join (fast path)"
154 );
155 return None;
156 }
157
158 while !handle.is_finished() {
160 if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
161 tracing::warn!(
162 sub_id = self.id,
163 panicked = self.has_panicked(),
164 timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
165 "subscription join timed out, detaching thread"
166 );
167 return Some(handle);
168 }
169 thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
170 }
171
172 let _ = handle.join();
173 tracing::trace!(
174 sub_id = self.id,
175 panicked = self.has_panicked(),
176 elapsed_us = start.elapsed().as_micros() as u64,
177 "subscription join (slow path)"
178 );
179 None
180 }
181
182 #[cfg_attr(not(test), allow(dead_code))]
187 pub(crate) fn stop(mut self) {
188 self.trigger.stop();
189 if let Some(handle) = self.thread.take() {
190 let start = Instant::now();
191 if handle.is_finished() {
193 let _ = handle.join();
194 tracing::trace!(
195 sub_id = self.id,
196 panicked = self.has_panicked(),
197 elapsed_us = start.elapsed().as_micros() as u64,
198 "subscription stop (fast path)"
199 );
200 return;
201 }
202 while !handle.is_finished() {
204 if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
205 tracing::warn!(
206 sub_id = self.id,
207 panicked = self.has_panicked(),
208 timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
209 "subscription did not stop within timeout; detaching thread"
210 );
211 return;
212 }
213 thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
214 }
215 let _ = handle.join();
216 tracing::trace!(
217 sub_id = self.id,
218 panicked = self.has_panicked(),
219 elapsed_us = start.elapsed().as_micros() as u64,
220 "subscription stop (slow path)"
221 );
222 }
223 }
224}
225
226impl Drop for RunningSubscription {
227 fn drop(&mut self) {
228 self.trigger.stop();
229 }
231}
232
233pub(crate) struct SubscriptionManager<M: Send + 'static> {
235 active: Vec<RunningSubscription>,
236 sender: mpsc::Sender<M>,
237 receiver: mpsc::Receiver<M>,
238}
239
240impl<M: Send + 'static> SubscriptionManager<M> {
241 pub(crate) fn new() -> Self {
242 let (sender, receiver) = mpsc::channel();
243 Self {
244 active: Vec::new(),
245 sender,
246 receiver,
247 }
248 }
249
250 pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
257 let reconcile_start = Instant::now();
258 let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
259 let active_count_before = self.active.len();
260
261 crate::debug_trace!(
262 "reconcile: new_ids={:?}, active_before={}",
263 new_ids,
264 active_count_before
265 );
266 tracing::trace!(
267 new_id_count = new_ids.len(),
268 active_before = active_count_before,
269 new_ids = ?new_ids,
270 "subscription reconcile starting"
271 );
272
273 let mut remaining = Vec::new();
275 let mut to_stop = Vec::new();
276 for running in self.active.drain(..) {
277 if new_ids.contains(&running.id) {
278 remaining.push(running);
279 } else {
280 crate::debug_trace!("stopping subscription: id={}", running.id);
281 tracing::debug!(sub_id = running.id, "Stopping subscription");
282 crate::effect_system::record_subscription_stop("subscription", running.id, 0);
283 crate::effect_system::record_dynamics_sub_stop();
284 to_stop.push(running);
285 }
286 }
287 for running in &to_stop {
289 running.signal_stop();
290 }
291 let stopped_count = to_stop.len();
292 for running in to_stop {
294 let _ = running.join_bounded();
295 }
296 self.active = remaining;
297
298 let mut started_count = 0usize;
300 let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
301 for sub in subscriptions {
302 let id = sub.id();
303 if !active_ids.insert(id) {
304 continue;
305 }
306 started_count += 1;
307
308 crate::debug_trace!("starting subscription: id={}", id);
309 tracing::debug!(sub_id = id, "Starting subscription");
310 crate::effect_system::record_subscription_start("subscription", id);
311 crate::effect_system::record_dynamics_sub_start();
312 let (signal, trigger) = StopSignal::new();
313 let sender = self.sender.clone();
314 let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
315 let panicked_flag = panicked.clone();
316 let sub_id_for_thread = id;
317
318 let thread = thread::spawn(move || {
319 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
320 sub.run(sender, signal);
321 }));
322 if let Err(payload) = result {
323 panicked_flag.store(true, std::sync::atomic::Ordering::Release);
324 crate::effect_system::record_dynamics_sub_panic();
325 let panic_msg = match payload.downcast_ref::<&str>() {
326 Some(s) => (*s).to_string(),
327 None => match payload.downcast_ref::<String>() {
328 Some(s) => s.clone(),
329 None => "unknown panic payload".to_string(),
330 },
331 };
332 crate::effect_system::error_effect_panic(
333 "subscription",
334 &format!("sub_id={sub_id_for_thread}: {panic_msg}"),
335 );
336 }
337 });
338
339 self.active.push(RunningSubscription {
340 id,
341 trigger,
342 thread: Some(thread),
343 panicked,
344 });
345 }
346
347 let active_count_after = self.active.len();
348 let reconcile_elapsed_us = reconcile_start.elapsed().as_micros() as u64;
349 crate::effect_system::record_dynamics_reconcile(reconcile_elapsed_us);
350 crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
351 tracing::trace!(
352 active_before = active_count_before,
353 active_after = active_count_after,
354 started = started_count,
355 stopped = stopped_count,
356 reconcile_us = reconcile_elapsed_us,
357 "subscription reconcile complete"
358 );
359 }
360
361 pub(crate) fn drain_messages(&self) -> Vec<M> {
363 let mut messages = Vec::new();
364 while let Ok(msg) = self.receiver.try_recv() {
365 messages.push(msg);
366 }
367 messages
368 }
369
370 #[inline]
372 pub(crate) fn active_count(&self) -> usize {
373 self.active.len()
374 }
375
376 pub(crate) fn stop_all(&mut self) {
385 let count = self.active.len();
386 if count == 0 {
387 return;
388 }
389 let start = Instant::now();
390
391 for running in &self.active {
393 running.signal_stop();
394 }
395
396 let signal_elapsed_us = start.elapsed().as_micros() as u64;
397 tracing::trace!(
398 target: "ftui.runtime",
399 count,
400 signal_elapsed_us,
401 "subscription stop_all phase 1 (signal) complete"
402 );
403
404 let mut panicked_count = 0_usize;
406 let mut timed_out_count = 0_usize;
407 for running in self.active.drain(..) {
408 if running.has_panicked() {
409 panicked_count += 1;
410 }
411 if running.join_bounded().is_some() {
412 timed_out_count += 1;
413 }
414 }
415
416 let shutdown_elapsed_us = start.elapsed().as_micros() as u64;
417 crate::effect_system::record_dynamics_shutdown(shutdown_elapsed_us, timed_out_count as u64);
418 tracing::debug!(
419 target: "ftui.runtime",
420 count,
421 panicked_count,
422 timed_out_count,
423 elapsed_us = shutdown_elapsed_us,
424 "subscription stop_all complete"
425 );
426 }
427}
428
429impl<M: Send + 'static> Drop for SubscriptionManager<M> {
430 fn drop(&mut self) {
431 self.stop_all();
432 }
433}
434
435pub struct Every<M: Send + 'static> {
447 id: SubId,
448 interval: Duration,
449 make_msg: Box<dyn Fn() -> M + Send + Sync>,
450}
451
452impl<M: Send + 'static> Every<M> {
453 pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
455 let id = interval.as_nanos() as u64 ^ 0x5449_434B; Self {
458 id,
459 interval,
460 make_msg: Box::new(make_msg),
461 }
462 }
463
464 pub fn with_id(
466 id: SubId,
467 interval: Duration,
468 make_msg: impl Fn() -> M + Send + Sync + 'static,
469 ) -> Self {
470 Self {
471 id,
472 interval,
473 make_msg: Box::new(make_msg),
474 }
475 }
476}
477
478impl<M: Send + 'static> Subscription<M> for Every<M> {
479 fn id(&self) -> SubId {
480 self.id
481 }
482
483 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
484 let mut tick_count: u64 = 0;
485 crate::debug_trace!(
486 "Every subscription started: id={}, interval={:?}",
487 self.id,
488 self.interval
489 );
490 loop {
491 if stop.wait_timeout(self.interval) {
492 crate::debug_trace!(
493 "Every subscription stopped: id={}, sent {} ticks",
494 self.id,
495 tick_count
496 );
497 break;
498 }
499 tick_count += 1;
500 let msg = (self.make_msg)();
501 if sender.send(msg).is_err() {
502 crate::debug_trace!(
503 "Every subscription channel closed: id={}, sent {} ticks",
504 self.id,
505 tick_count
506 );
507 break;
508 }
509 }
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[derive(Debug, Clone, PartialEq)]
518 enum TestMsg {
519 Tick,
520 Value(i32),
521 }
522
523 struct ChannelSubscription<M: Send + 'static> {
524 id: SubId,
525 receiver: mpsc::Receiver<M>,
526 poll: Duration,
527 }
528
529 impl<M: Send + 'static> ChannelSubscription<M> {
530 fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
531 Self {
532 id,
533 receiver,
534 poll: Duration::from_millis(5),
535 }
536 }
537 }
538
539 impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
540 fn id(&self) -> SubId {
541 self.id
542 }
543
544 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
545 loop {
546 if stop.is_stopped() {
547 break;
548 }
549 match self.receiver.recv_timeout(self.poll) {
550 Ok(msg) => {
551 if sender.send(msg).is_err() {
552 break;
553 }
554 }
555 Err(mpsc::RecvTimeoutError::Timeout) => {}
556 Err(mpsc::RecvTimeoutError::Disconnected) => break,
557 }
558 }
559 }
560 }
561
562 fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
563 let (tx, rx) = mpsc::channel();
564 (ChannelSubscription::new(id, rx), tx)
565 }
566
567 #[test]
568 fn stop_signal_starts_false() {
569 let (signal, _trigger) = StopSignal::new();
570 assert!(!signal.is_stopped());
571 }
572
573 #[test]
574 fn stop_signal_becomes_true_after_trigger() {
575 let (signal, trigger) = StopSignal::new();
576 trigger.stop();
577 assert!(signal.is_stopped());
578 }
579
580 #[test]
581 fn stop_signal_wait_returns_true_when_stopped() {
582 let (signal, trigger) = StopSignal::new();
583 trigger.stop();
584 assert!(signal.wait_timeout(Duration::from_millis(100)));
585 }
586
587 #[test]
588 fn stop_signal_wait_returns_false_on_timeout() {
589 let (signal, _trigger) = StopSignal::new();
590 assert!(!signal.wait_timeout(Duration::from_millis(10)));
591 }
592
593 #[test]
594 fn channel_subscription_forwards_messages() {
595 let (sub, event_tx) = channel_subscription(1);
596 let (tx, rx) = mpsc::channel();
597 let (signal, trigger) = StopSignal::new();
598
599 let handle = thread::spawn(move || {
600 sub.run(tx, signal);
601 });
602
603 event_tx.send(TestMsg::Value(1)).unwrap();
604 event_tx.send(TestMsg::Value(2)).unwrap();
605 thread::sleep(Duration::from_millis(10));
606 trigger.stop();
607 handle.join().unwrap();
608
609 let msgs: Vec<_> = rx.try_iter().collect();
610 assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
611 }
612
613 #[test]
614 fn every_subscription_fires() {
615 let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
616 let (tx, rx) = mpsc::channel();
617 let (signal, trigger) = StopSignal::new();
618
619 let handle = thread::spawn(move || {
620 sub.run(tx, signal);
621 });
622
623 thread::sleep(Duration::from_millis(50));
625 trigger.stop();
626 handle.join().unwrap();
627
628 let msgs: Vec<_> = rx.try_iter().collect();
629 assert!(!msgs.is_empty(), "Should have received at least one tick");
630 assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
631 }
632
633 #[test]
634 fn every_subscription_uses_stable_id() {
635 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
636 let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
637 assert_eq!(sub1.id(), sub2.id());
638 }
639
640 #[test]
641 fn every_subscription_different_intervals_different_ids() {
642 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
643 let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
644 assert_ne!(sub1.id(), sub2.id());
645 }
646
647 #[test]
648 fn subscription_manager_starts_subscriptions() {
649 let mut mgr = SubscriptionManager::<TestMsg>::new();
650 let (sub, event_tx) = channel_subscription(1);
651 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
652
653 mgr.reconcile(subs);
654 event_tx.send(TestMsg::Value(42)).unwrap();
655
656 thread::sleep(Duration::from_millis(20));
658
659 let msgs = mgr.drain_messages();
660 assert_eq!(msgs, vec![TestMsg::Value(42)]);
661 }
662
663 #[test]
664 fn subscription_manager_dedupes_duplicate_ids() {
665 let mut mgr = SubscriptionManager::<TestMsg>::new();
666 let (sub_a, tx_a) = channel_subscription(7);
667 let (sub_b, tx_b) = channel_subscription(7);
668 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
669
670 mgr.reconcile(subs);
671
672 tx_a.send(TestMsg::Value(1)).unwrap();
673 assert!(
674 tx_b.send(TestMsg::Value(2)).is_err(),
675 "Duplicate subscription should be dropped"
676 );
677
678 thread::sleep(Duration::from_millis(20));
679 let msgs = mgr.drain_messages();
680 assert_eq!(msgs, vec![TestMsg::Value(1)]);
681 }
682
683 #[test]
684 fn subscription_manager_stops_removed() {
685 let mut mgr = SubscriptionManager::<TestMsg>::new();
686
687 mgr.reconcile(vec![Box::new(Every::with_id(
689 99,
690 Duration::from_millis(5),
691 || TestMsg::Tick,
692 ))]);
693
694 thread::sleep(Duration::from_millis(20));
695 let msgs_before = mgr.drain_messages();
696 assert!(!msgs_before.is_empty());
697
698 mgr.reconcile(vec![]);
700
701 thread::sleep(Duration::from_millis(20));
703 let _ = mgr.drain_messages();
704
705 thread::sleep(Duration::from_millis(30));
707 let msgs_after = mgr.drain_messages();
708 assert!(
709 msgs_after.is_empty(),
710 "Should stop receiving after reconcile with empty set"
711 );
712 }
713
714 #[test]
715 fn subscription_manager_keeps_unchanged() {
716 let mut mgr = SubscriptionManager::<TestMsg>::new();
717
718 mgr.reconcile(vec![Box::new(Every::with_id(
720 50,
721 Duration::from_millis(10),
722 || TestMsg::Tick,
723 ))]);
724
725 thread::sleep(Duration::from_millis(30));
726 let _ = mgr.drain_messages();
727
728 mgr.reconcile(vec![Box::new(Every::with_id(
730 50,
731 Duration::from_millis(10),
732 || TestMsg::Tick,
733 ))]);
734
735 thread::sleep(Duration::from_millis(30));
736 let msgs = mgr.drain_messages();
737 assert!(!msgs.is_empty(), "Subscription should still be running");
738 }
739
740 #[test]
741 fn subscription_manager_stop_all() {
742 let mut mgr = SubscriptionManager::<TestMsg>::new();
743
744 mgr.reconcile(vec![
745 Box::new(Every::with_id(1, Duration::from_millis(5), || {
746 TestMsg::Value(1)
747 })),
748 Box::new(Every::with_id(2, Duration::from_millis(5), || {
749 TestMsg::Value(2)
750 })),
751 ]);
752
753 thread::sleep(Duration::from_millis(20));
754 mgr.stop_all();
755
756 thread::sleep(Duration::from_millis(20));
757 let _ = mgr.drain_messages();
758 thread::sleep(Duration::from_millis(30));
759 let msgs = mgr.drain_messages();
760 assert!(msgs.is_empty());
761 }
762
763 #[test]
768 fn stop_signal_is_cloneable() {
769 let (signal, trigger) = StopSignal::new();
770 let signal_clone = signal.clone();
771
772 assert!(!signal.is_stopped());
773 assert!(!signal_clone.is_stopped());
774
775 trigger.stop();
776
777 assert!(signal.is_stopped());
778 assert!(signal_clone.is_stopped());
779 }
780
781 #[test]
782 fn stop_signal_wait_wakes_immediately_when_already_stopped() {
783 let (signal, trigger) = StopSignal::new();
784 trigger.stop();
785
786 let start = Instant::now();
788 let stopped = signal.wait_timeout(Duration::from_secs(10));
789 let elapsed = start.elapsed();
790
791 assert!(stopped);
792 assert!(elapsed < Duration::from_millis(100));
793 }
794
795 #[test]
796 fn stop_signal_wait_is_interrupted_by_trigger() {
797 let (signal, trigger) = StopSignal::new();
798
799 let signal_clone = signal.clone();
800 let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
801
802 thread::sleep(Duration::from_millis(20));
804 trigger.stop();
805
806 let stopped = handle.join().unwrap();
807 assert!(stopped);
808 }
809
810 #[test]
811 fn channel_subscription_no_messages_without_events() {
812 let (sub, _event_tx) = channel_subscription(1);
813 let (tx, rx) = mpsc::channel();
814 let (signal, trigger) = StopSignal::new();
815
816 let handle = thread::spawn(move || {
817 sub.run(tx, signal);
818 });
819
820 thread::sleep(Duration::from_millis(10));
821 trigger.stop();
822 handle.join().unwrap();
823
824 let msgs: Vec<_> = rx.try_iter().collect();
825 assert!(msgs.is_empty());
826 }
827
828 #[test]
829 fn channel_subscription_id_is_preserved() {
830 let (sub, _tx) = channel_subscription(42);
831 assert_eq!(sub.id(), 42);
832 }
833
834 #[test]
835 fn channel_subscription_stops_on_disconnected_receiver() {
836 let (sub, event_tx) = channel_subscription(1);
837 let (tx, _rx) = mpsc::channel();
838 let (signal, _trigger) = StopSignal::new();
839
840 drop(event_tx);
841
842 let handle = thread::spawn(move || {
843 sub.run(tx, signal);
844 });
845
846 let result = handle.join();
847 assert!(result.is_ok());
848 }
849
850 #[test]
851 fn every_with_id_preserves_custom_id() {
852 let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
853 assert_eq!(sub.id(), 12345);
854 }
855
856 #[test]
857 fn every_stops_on_disconnected_receiver() {
858 let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
859 let (tx, rx) = mpsc::channel();
860 let (signal, _trigger) = StopSignal::new();
861
862 drop(rx);
864
865 let handle = thread::spawn(move || {
867 sub.run(tx, signal);
868 });
869
870 let result = handle.join();
872 assert!(result.is_ok());
873 }
874
875 #[test]
876 fn every_respects_interval() {
877 let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
878 let (tx, rx) = mpsc::channel();
879 let (signal, trigger) = StopSignal::new();
880
881 let start = Instant::now();
882 let handle = thread::spawn(move || {
883 sub.run(tx, signal);
884 });
885
886 thread::sleep(Duration::from_millis(160));
888 trigger.stop();
889 handle.join().unwrap();
890
891 let msgs: Vec<_> = rx.try_iter().collect();
892 let elapsed = start.elapsed();
893
894 assert!(
896 msgs.len() >= 2,
897 "Expected at least 2 ticks, got {}",
898 msgs.len()
899 );
900 assert!(
901 msgs.len() <= 4,
902 "Expected at most 4 ticks, got {}",
903 msgs.len()
904 );
905 assert!(elapsed >= Duration::from_millis(150));
906 }
907
908 #[test]
909 fn subscription_manager_empty_reconcile() {
910 let mut mgr = SubscriptionManager::<TestMsg>::new();
911
912 mgr.reconcile(vec![]);
914 let msgs = mgr.drain_messages();
915 assert!(msgs.is_empty());
916 }
917
918 #[test]
919 fn subscription_manager_drain_messages_returns_all() {
920 let mut mgr = SubscriptionManager::<TestMsg>::new();
921 let (sub, event_tx) = channel_subscription(1);
922 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
923
924 mgr.reconcile(subs);
925 event_tx.send(TestMsg::Value(1)).unwrap();
926 event_tx.send(TestMsg::Value(2)).unwrap();
927 thread::sleep(Duration::from_millis(20));
928
929 let msgs = mgr.drain_messages();
930 assert_eq!(msgs.len(), 2);
931 assert_eq!(msgs[0], TestMsg::Value(1));
932 assert_eq!(msgs[1], TestMsg::Value(2));
933
934 let msgs2 = mgr.drain_messages();
936 assert!(msgs2.is_empty());
937 }
938
939 #[test]
940 fn subscription_manager_replaces_subscription_with_different_id() {
941 let mut mgr = SubscriptionManager::<TestMsg>::new();
942 let (sub1, tx1) = channel_subscription(1);
943
944 mgr.reconcile(vec![Box::new(sub1)]);
946 tx1.send(TestMsg::Value(1)).unwrap();
947 thread::sleep(Duration::from_millis(20));
948 let msgs1 = mgr.drain_messages();
949 assert_eq!(msgs1, vec![TestMsg::Value(1)]);
950
951 let (sub2, tx2) = channel_subscription(2);
953 mgr.reconcile(vec![Box::new(sub2)]);
954 tx2.send(TestMsg::Value(2)).unwrap();
955 thread::sleep(Duration::from_millis(20));
956 let msgs2 = mgr.drain_messages();
957 assert_eq!(msgs2, vec![TestMsg::Value(2)]);
958 }
959
960 #[test]
961 fn subscription_manager_multiple_subscriptions() {
962 let mut mgr = SubscriptionManager::<TestMsg>::new();
963 let (sub1, tx1) = channel_subscription(1);
964 let (sub2, tx2) = channel_subscription(2);
965 let (sub3, tx3) = channel_subscription(3);
966 let subs: Vec<Box<dyn Subscription<TestMsg>>> =
967 vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
968
969 mgr.reconcile(subs);
970 tx1.send(TestMsg::Value(10)).unwrap();
971 tx2.send(TestMsg::Value(20)).unwrap();
972 tx3.send(TestMsg::Value(30)).unwrap();
973 thread::sleep(Duration::from_millis(30));
974
975 let mut msgs = mgr.drain_messages();
976 msgs.sort_by_key(|m| match m {
977 TestMsg::Value(v) => *v,
978 _ => 0,
979 });
980
981 assert_eq!(msgs.len(), 3);
982 assert_eq!(msgs[0], TestMsg::Value(10));
983 assert_eq!(msgs[1], TestMsg::Value(20));
984 assert_eq!(msgs[2], TestMsg::Value(30));
985 }
986
987 #[test]
988 fn subscription_manager_partial_update() {
989 let mut mgr = SubscriptionManager::<TestMsg>::new();
990
991 mgr.reconcile(vec![
993 Box::new(Every::with_id(1, Duration::from_millis(10), || {
994 TestMsg::Value(1)
995 })),
996 Box::new(Every::with_id(2, Duration::from_millis(10), || {
997 TestMsg::Value(2)
998 })),
999 Box::new(Every::with_id(3, Duration::from_millis(10), || {
1000 TestMsg::Value(3)
1001 })),
1002 ]);
1003
1004 thread::sleep(Duration::from_millis(30));
1005 let _ = mgr.drain_messages();
1006
1007 mgr.reconcile(vec![
1009 Box::new(Every::with_id(1, Duration::from_millis(10), || {
1010 TestMsg::Value(1)
1011 })),
1012 Box::new(Every::with_id(3, Duration::from_millis(10), || {
1013 TestMsg::Value(3)
1014 })),
1015 ]);
1016
1017 let _ = mgr.drain_messages();
1020
1021 thread::sleep(Duration::from_millis(30));
1023 let msgs = mgr.drain_messages();
1024
1025 let values: Vec<i32> = msgs
1027 .iter()
1028 .filter_map(|m| match m {
1029 TestMsg::Value(v) => Some(*v),
1030 _ => None,
1031 })
1032 .collect();
1033
1034 assert!(
1035 values.contains(&1),
1036 "Should still receive from subscription 1"
1037 );
1038 assert!(
1039 values.contains(&3),
1040 "Should still receive from subscription 3"
1041 );
1042 assert!(
1043 !values.contains(&2),
1044 "Should not receive from stopped subscription 2"
1045 );
1046 }
1047
1048 #[test]
1049 fn subscription_manager_drop_stops_all() {
1050 let (_signal, _) = StopSignal::new();
1051 let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1052 let flag_clone = flag.clone();
1053
1054 struct FlagSubscription {
1055 id: SubId,
1056 flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
1057 }
1058
1059 impl Subscription<TestMsg> for FlagSubscription {
1060 fn id(&self) -> SubId {
1061 self.id
1062 }
1063
1064 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1065 while !stop.is_stopped() {
1066 thread::sleep(Duration::from_millis(5));
1067 }
1068 self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
1069 }
1070 }
1071
1072 {
1073 let mut mgr = SubscriptionManager::<TestMsg>::new();
1074 mgr.reconcile(vec![Box::new(FlagSubscription {
1075 id: 1,
1076 flag: flag_clone,
1077 })]);
1078
1079 thread::sleep(Duration::from_millis(20));
1080 }
1082
1083 thread::sleep(Duration::from_millis(50));
1084 assert!(
1085 flag.load(std::sync::atomic::Ordering::SeqCst),
1086 "Subscription should have stopped on drop"
1087 );
1088 }
1089
1090 #[test]
1091 fn running_subscription_stop_joins_thread() {
1092 use std::sync::atomic::{AtomicBool, Ordering};
1093
1094 let completed = std::sync::Arc::new(AtomicBool::new(false));
1095 let completed_clone = completed.clone();
1096
1097 let (signal, trigger) = StopSignal::new();
1098 let (_tx, _rx) = mpsc::channel::<TestMsg>();
1099
1100 let thread = thread::spawn(move || {
1101 while !signal.is_stopped() {
1102 thread::sleep(Duration::from_millis(5));
1103 }
1104 completed_clone.store(true, Ordering::SeqCst);
1105 });
1106
1107 let running = RunningSubscription {
1108 id: 1,
1109 trigger,
1110 thread: Some(thread),
1111 panicked: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1112 };
1113
1114 running.stop();
1115 assert!(completed.load(Ordering::SeqCst));
1116 }
1117
1118 #[test]
1119 fn running_subscription_stop_times_out_for_uncooperative_thread() {
1120 use std::sync::atomic::{AtomicBool, Ordering};
1121
1122 let completed = std::sync::Arc::new(AtomicBool::new(false));
1123 let completed_clone = completed.clone();
1124
1125 let (_signal, trigger) = StopSignal::new();
1126 let thread = thread::spawn(move || {
1127 thread::sleep(Duration::from_millis(500));
1128 completed_clone.store(true, Ordering::SeqCst);
1129 });
1130
1131 let running = RunningSubscription {
1132 id: 7,
1133 trigger,
1134 thread: Some(thread),
1135 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1136 };
1137
1138 let start = Instant::now();
1139 running.stop();
1140 assert!(
1141 start.elapsed() < Duration::from_millis(400),
1142 "stop() should not block behind an uncooperative subscription thread"
1143 );
1144
1145 thread::sleep(Duration::from_millis(550));
1146 assert!(completed.load(Ordering::SeqCst));
1147 }
1148
1149 #[test]
1150 fn every_id_stable_across_instances() {
1151 let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1153 let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1154 let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
1155
1156 assert_eq!(sub1.id(), sub2.id());
1157 assert_eq!(sub2.id(), sub3.id()); }
1159
1160 #[test]
1161 fn drain_messages_preserves_order() {
1162 let mut mgr = SubscriptionManager::<TestMsg>::new();
1163
1164 struct OrderedSubscription {
1166 values: Vec<i32>,
1167 }
1168
1169 impl Subscription<TestMsg> for OrderedSubscription {
1170 fn id(&self) -> SubId {
1171 999
1172 }
1173
1174 fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1175 for v in &self.values {
1176 let _ = sender.send(TestMsg::Value(*v));
1177 thread::sleep(Duration::from_millis(1));
1178 }
1179 }
1180 }
1181
1182 mgr.reconcile(vec![Box::new(OrderedSubscription {
1183 values: vec![1, 2, 3, 4, 5],
1184 })]);
1185
1186 thread::sleep(Duration::from_millis(30));
1187 let msgs = mgr.drain_messages();
1188
1189 let values: Vec<i32> = msgs
1190 .iter()
1191 .filter_map(|m| match m {
1192 TestMsg::Value(v) => Some(*v),
1193 _ => None,
1194 })
1195 .collect();
1196
1197 assert_eq!(values, vec![1, 2, 3, 4, 5]);
1198 }
1199
1200 #[test]
1201 fn subscription_manager_new_is_empty() {
1202 let mgr = SubscriptionManager::<TestMsg>::new();
1203 let msgs = mgr.drain_messages();
1204 assert!(msgs.is_empty());
1205 }
1206
1207 #[test]
1219 fn contract_stop_signal_resilient_to_thread_panics() {
1220 let (signal, trigger) = StopSignal::new();
1221 let signal_clone = signal.clone();
1222
1223 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1225 assert!(!signal_clone.is_stopped());
1226 panic!("intentional panic while holding signal clone");
1227 }));
1228 assert!(result.is_err());
1229
1230 assert!(
1232 !signal.is_stopped(),
1233 "signal should still report not-stopped"
1234 );
1235 trigger.stop();
1236 assert!(
1237 signal.is_stopped(),
1238 "signal should report stopped after trigger"
1239 );
1240 assert!(
1241 signal.wait_timeout(Duration::from_millis(10)),
1242 "wait_timeout should return true when stopped"
1243 );
1244 }
1245
1246 #[test]
1249 fn contract_stop_signal_exposes_cancellation_token() {
1250 let (signal, trigger) = StopSignal::new();
1251 let token = signal.cancellation_token();
1252 assert!(!token.is_cancelled(), "token should start uncancelled");
1253 trigger.stop();
1254 assert!(token.is_cancelled(), "token should be cancelled after stop");
1255 }
1256
1257 #[test]
1261 fn contract_stop_all_bounded_time_with_uncooperative_subscriptions() {
1262 let mut mgr = SubscriptionManager::<TestMsg>::new();
1263
1264 struct UncooperativeSub {
1266 id: SubId,
1267 }
1268
1269 impl Subscription<TestMsg> for UncooperativeSub {
1270 fn id(&self) -> SubId {
1271 self.id
1272 }
1273
1274 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1275 thread::sleep(Duration::from_secs(5));
1277 }
1278 }
1279
1280 mgr.reconcile(vec![
1281 Box::new(UncooperativeSub { id: 100 }),
1282 Box::new(UncooperativeSub { id: 200 }),
1283 ]);
1284
1285 thread::sleep(Duration::from_millis(20)); let start = Instant::now();
1288 mgr.stop_all();
1289 let elapsed = start.elapsed();
1290
1291 assert!(
1293 elapsed < Duration::from_millis(800),
1294 "stop_all took {elapsed:?}, expected < 800ms for 2 uncooperative subscriptions"
1295 );
1296 }
1297
1298 #[test]
1301 fn contract_reconcile_deduplicates_by_id_not_identity() {
1302 let mut mgr = SubscriptionManager::<TestMsg>::new();
1303 let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1304
1305 struct CountingSub {
1306 id: SubId,
1307 counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
1308 }
1309
1310 impl Subscription<TestMsg> for CountingSub {
1311 fn id(&self) -> SubId {
1312 self.id
1313 }
1314
1315 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1316 self.counter
1317 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1318 while !stop.is_stopped() {
1319 thread::sleep(Duration::from_millis(5));
1320 }
1321 }
1322 }
1323
1324 mgr.reconcile(vec![Box::new(CountingSub {
1326 id: 42,
1327 counter: counter.clone(),
1328 })]);
1329 thread::sleep(Duration::from_millis(20));
1330 assert_eq!(
1331 counter.load(std::sync::atomic::Ordering::SeqCst),
1332 1,
1333 "first reconcile should start exactly 1 thread"
1334 );
1335
1336 mgr.reconcile(vec![Box::new(CountingSub {
1338 id: 42,
1339 counter: counter.clone(),
1340 })]);
1341 thread::sleep(Duration::from_millis(20));
1342 assert_eq!(
1343 counter.load(std::sync::atomic::Ordering::SeqCst),
1344 1,
1345 "second reconcile with same ID must not start another thread"
1346 );
1347
1348 mgr.stop_all();
1349 }
1350
1351 #[test]
1355 fn contract_buffered_messages_available_after_subscription_stopped() {
1356 let mut mgr = SubscriptionManager::<TestMsg>::new();
1357
1358 struct BurstSub;
1359
1360 impl Subscription<TestMsg> for BurstSub {
1361 fn id(&self) -> SubId {
1362 77
1363 }
1364
1365 fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1366 for i in 0..10 {
1368 let _ = sender.send(TestMsg::Value(i));
1369 }
1370 while !stop.is_stopped() {
1372 thread::sleep(Duration::from_millis(5));
1373 }
1374 }
1375 }
1376
1377 mgr.reconcile(vec![Box::new(BurstSub)]);
1378 thread::sleep(Duration::from_millis(30));
1379
1380 mgr.reconcile(vec![]);
1382
1383 let msgs = mgr.drain_messages();
1385 let values: Vec<i32> = msgs
1386 .iter()
1387 .filter_map(|m| match m {
1388 TestMsg::Value(v) => Some(*v),
1389 _ => None,
1390 })
1391 .collect();
1392
1393 assert!(
1394 values.len() >= 5,
1395 "Expected at least 5 buffered messages after stop, got {}",
1396 values.len()
1397 );
1398 }
1399
1400 #[test]
1403 fn contract_active_count_tracks_running_subscriptions() {
1404 let mut mgr = SubscriptionManager::<TestMsg>::new();
1405
1406 assert_eq!(mgr.active_count(), 0, "empty manager");
1407
1408 mgr.reconcile(vec![
1409 Box::new(Every::with_id(1, Duration::from_millis(50), || {
1410 TestMsg::Tick
1411 })),
1412 Box::new(Every::with_id(2, Duration::from_millis(50), || {
1413 TestMsg::Tick
1414 })),
1415 ]);
1416 assert_eq!(mgr.active_count(), 2, "after starting 2");
1417
1418 mgr.reconcile(vec![Box::new(Every::with_id(
1419 1,
1420 Duration::from_millis(50),
1421 || TestMsg::Tick,
1422 ))]);
1423 assert_eq!(mgr.active_count(), 1, "after removing 1");
1424
1425 mgr.stop_all();
1426 assert_eq!(mgr.active_count(), 0, "after stop_all");
1427 }
1428
1429 #[test]
1433 fn contract_every_id_derived_from_interval_only() {
1434 let sub_a = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1435 let sub_b = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(999));
1436 assert_eq!(
1437 sub_a.id(),
1438 sub_b.id(),
1439 "Every ID must depend only on interval, not message factory"
1440 );
1441
1442 let sub_c = Every::<TestMsg>::new(Duration::from_millis(200), || TestMsg::Tick);
1443 assert_ne!(
1444 sub_a.id(),
1445 sub_c.id(),
1446 "Different intervals must produce different IDs"
1447 );
1448 }
1449
1450 #[test]
1453 fn contract_every_id_formula_is_stable() {
1454 let interval = Duration::from_millis(100);
1455 let expected_id = interval.as_nanos() as u64 ^ 0x5449_434B;
1456 let sub = Every::<TestMsg>::new(interval, || TestMsg::Tick);
1457 assert_eq!(
1458 sub.id(),
1459 expected_id,
1460 "Every ID formula must be: interval.as_nanos() as u64 ^ 0x5449_434B"
1461 );
1462 }
1463
1464 #[test]
1467 fn contract_drop_triggers_stop_all() {
1468 use std::sync::atomic::{AtomicUsize, Ordering};
1469
1470 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1471
1472 struct StopCountingSub {
1473 id: SubId,
1474 counter: std::sync::Arc<AtomicUsize>,
1475 }
1476
1477 impl Subscription<TestMsg> for StopCountingSub {
1478 fn id(&self) -> SubId {
1479 self.id
1480 }
1481
1482 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1483 while !stop.is_stopped() {
1484 thread::sleep(Duration::from_millis(5));
1485 }
1486 self.counter.fetch_add(1, Ordering::SeqCst);
1487 }
1488 }
1489
1490 {
1491 let mut mgr = SubscriptionManager::<TestMsg>::new();
1492 mgr.reconcile(vec![
1493 Box::new(StopCountingSub {
1494 id: 1,
1495 counter: stop_count.clone(),
1496 }),
1497 Box::new(StopCountingSub {
1498 id: 2,
1499 counter: stop_count.clone(),
1500 }),
1501 Box::new(StopCountingSub {
1502 id: 3,
1503 counter: stop_count.clone(),
1504 }),
1505 ]);
1506 thread::sleep(Duration::from_millis(20));
1507 }
1509
1510 thread::sleep(Duration::from_millis(400));
1512 assert_eq!(
1513 stop_count.load(std::sync::atomic::Ordering::SeqCst),
1514 3,
1515 "all 3 subscription threads must have observed stop signal on drop"
1516 );
1517 }
1518
1519 #[test]
1522 fn contract_stop_join_timeout_is_250ms() {
1523 assert_eq!(
1524 SUBSCRIPTION_STOP_JOIN_TIMEOUT,
1525 Duration::from_millis(250),
1526 "join timeout must be 250ms"
1527 );
1528 assert_eq!(
1529 SUBSCRIPTION_STOP_JOIN_POLL,
1530 Duration::from_millis(1),
1531 "join poll interval must be 1ms (bd-1f2aw)"
1532 );
1533 }
1534
1535 #[test]
1545 fn lifecycle_panic_in_subscription_is_caught() {
1546 use std::sync::atomic::Ordering;
1547
1548 let mut mgr = SubscriptionManager::<TestMsg>::new();
1549
1550 struct PanickingSub;
1551
1552 impl Subscription<TestMsg> for PanickingSub {
1553 fn id(&self) -> SubId {
1554 0xDEAD
1555 }
1556
1557 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1558 panic!("intentional test panic in subscription");
1559 }
1560 }
1561
1562 mgr.reconcile(vec![Box::new(PanickingSub)]);
1563
1564 thread::sleep(Duration::from_millis(50));
1566
1567 assert_eq!(
1569 mgr.active_count(),
1570 1,
1571 "panicked sub still tracked as active"
1572 );
1573
1574 assert!(
1576 mgr.active[0].panicked.load(Ordering::Acquire),
1577 "panicked flag should be set after subscription panic"
1578 );
1579
1580 mgr.stop_all();
1582 assert_eq!(mgr.active_count(), 0);
1583 }
1584
1585 #[test]
1588 fn lifecycle_panic_does_not_affect_sibling_subscriptions() {
1589 let mut mgr = SubscriptionManager::<TestMsg>::new();
1590
1591 struct PanickingSub;
1592 impl Subscription<TestMsg> for PanickingSub {
1593 fn id(&self) -> SubId {
1594 0xBAD
1595 }
1596 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1597 panic!("boom");
1598 }
1599 }
1600
1601 mgr.reconcile(vec![
1602 Box::new(PanickingSub),
1603 Box::new(Every::with_id(42, Duration::from_millis(10), || {
1604 TestMsg::Tick
1605 })),
1606 ]);
1607
1608 thread::sleep(Duration::from_millis(100));
1611
1612 let msgs = mgr.drain_messages();
1613 assert!(
1614 !msgs.is_empty(),
1615 "sibling subscription should still deliver messages after a panic in another sub"
1616 );
1617
1618 mgr.stop_all();
1619 }
1620
1621 #[test]
1624 fn lifecycle_stop_all_parallel_shutdown() {
1625 use std::sync::atomic::{AtomicUsize, Ordering};
1626
1627 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1628 let sub_count = 4;
1629
1630 struct SlowStopSub {
1631 id: SubId,
1632 counter: std::sync::Arc<AtomicUsize>,
1633 }
1634
1635 impl Subscription<TestMsg> for SlowStopSub {
1636 fn id(&self) -> SubId {
1637 self.id
1638 }
1639
1640 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1641 while !stop.is_stopped() {
1643 thread::sleep(Duration::from_millis(5));
1644 }
1645 thread::sleep(Duration::from_millis(50));
1646 self.counter.fetch_add(1, Ordering::SeqCst);
1647 }
1648 }
1649
1650 let mut mgr = SubscriptionManager::<TestMsg>::new();
1651 let subs: Vec<Box<dyn Subscription<TestMsg>>> = (0..sub_count)
1652 .map(|i| -> Box<dyn Subscription<TestMsg>> {
1653 Box::new(SlowStopSub {
1654 id: 1000 + i,
1655 counter: stop_count.clone(),
1656 })
1657 })
1658 .collect();
1659
1660 mgr.reconcile(subs);
1661 thread::sleep(Duration::from_millis(20));
1662
1663 let start = Instant::now();
1664 mgr.stop_all();
1665 let elapsed = start.elapsed();
1666
1667 assert!(
1672 elapsed < Duration::from_millis(150),
1673 "parallel stop_all took {elapsed:?}, expected < 150ms \
1674 (sequential would be ~{expected_sequential}ms)",
1675 expected_sequential = sub_count * 50
1676 );
1677
1678 thread::sleep(Duration::from_millis(20));
1680 assert_eq!(
1681 stop_count.load(Ordering::SeqCst),
1682 sub_count as usize,
1683 "all subscriptions should have completed their cleanup"
1684 );
1685 }
1686
1687 #[test]
1690 fn lifecycle_reconcile_removal_uses_parallel_stop() {
1691 use std::sync::atomic::{AtomicUsize, Ordering};
1692
1693 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1694
1695 struct SlowStopSub {
1696 id: SubId,
1697 counter: std::sync::Arc<AtomicUsize>,
1698 }
1699
1700 impl Subscription<TestMsg> for SlowStopSub {
1701 fn id(&self) -> SubId {
1702 self.id
1703 }
1704
1705 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1706 while !stop.is_stopped() {
1707 thread::sleep(Duration::from_millis(5));
1708 }
1709 thread::sleep(Duration::from_millis(40));
1710 self.counter.fetch_add(1, Ordering::SeqCst);
1711 }
1712 }
1713
1714 let mut mgr = SubscriptionManager::<TestMsg>::new();
1715 mgr.reconcile(vec![
1716 Box::new(SlowStopSub {
1717 id: 2000,
1718 counter: stop_count.clone(),
1719 }),
1720 Box::new(SlowStopSub {
1721 id: 2001,
1722 counter: stop_count.clone(),
1723 }),
1724 Box::new(SlowStopSub {
1725 id: 2002,
1726 counter: stop_count.clone(),
1727 }),
1728 ]);
1729 thread::sleep(Duration::from_millis(20));
1730
1731 let start = Instant::now();
1733 mgr.reconcile(vec![]);
1734 let elapsed = start.elapsed();
1735
1736 assert!(
1738 elapsed < Duration::from_millis(100),
1739 "reconcile removal took {elapsed:?}, expected < 100ms with parallel stop"
1740 );
1741
1742 thread::sleep(Duration::from_millis(20));
1743 assert_eq!(stop_count.load(Ordering::SeqCst), 3);
1744 }
1745
1746 #[test]
1748 fn lifecycle_has_panicked_tracks_state() {
1749 let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1750 let panicked_flag = panicked.clone();
1751
1752 let (signal, trigger) = StopSignal::new();
1753 let thread = thread::spawn(move || {
1754 signal.wait_timeout(Duration::from_secs(10));
1755 });
1756
1757 let running = RunningSubscription {
1758 id: 999,
1759 trigger,
1760 thread: Some(thread),
1761 panicked,
1762 };
1763
1764 assert!(!running.has_panicked(), "should not be panicked initially");
1765
1766 panicked_flag.store(true, std::sync::atomic::Ordering::Release);
1768 assert!(running.has_panicked(), "should reflect panicked state");
1769
1770 running.stop();
1771 }
1772
1773 #[test]
1776 fn lifecycle_signal_then_join_works() {
1777 use std::sync::atomic::{AtomicBool, Ordering};
1778
1779 let completed = std::sync::Arc::new(AtomicBool::new(false));
1780 let completed_clone = completed.clone();
1781
1782 let (signal, trigger) = StopSignal::new();
1783 let thread = thread::spawn(move || {
1784 while !signal.is_stopped() {
1785 thread::sleep(Duration::from_millis(5));
1786 }
1787 completed_clone.store(true, Ordering::SeqCst);
1788 });
1789
1790 let running = RunningSubscription {
1791 id: 888,
1792 trigger,
1793 thread: Some(thread),
1794 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1795 };
1796
1797 running.signal_stop();
1798 let leftover = running.join_bounded();
1799 assert!(
1800 leftover.is_none(),
1801 "cooperative thread should join within timeout"
1802 );
1803 assert!(
1804 completed.load(Ordering::SeqCst),
1805 "thread should have completed"
1806 );
1807 }
1808
1809 #[test]
1811 fn lifecycle_join_bounded_returns_handle_for_uncooperative() {
1812 use std::sync::atomic::AtomicBool;
1813
1814 let (_signal, trigger) = StopSignal::new();
1815 let thread = thread::spawn(move || {
1816 thread::sleep(Duration::from_millis(500));
1817 });
1818
1819 let running = RunningSubscription {
1820 id: 777,
1821 trigger,
1822 thread: Some(thread),
1823 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1824 };
1825
1826 running.signal_stop();
1827 let start = Instant::now();
1828 let leftover = running.join_bounded();
1829 let elapsed = start.elapsed();
1830
1831 assert!(
1832 leftover.is_some(),
1833 "uncooperative thread should not join within timeout"
1834 );
1835 assert!(
1836 elapsed < Duration::from_millis(400),
1837 "join_bounded should respect the 250ms timeout, took {elapsed:?}"
1838 );
1839 }
1840
1841 #[test]
1844 fn lifecycle_restart_after_stop() {
1845 let mut mgr = SubscriptionManager::<TestMsg>::new();
1846
1847 mgr.reconcile(vec![Box::new(Every::with_id(
1849 300,
1850 Duration::from_millis(10),
1851 || TestMsg::Tick,
1852 ))]);
1853 thread::sleep(Duration::from_millis(30));
1854 let msgs = mgr.drain_messages();
1855 assert!(!msgs.is_empty(), "should receive ticks");
1856
1857 mgr.reconcile(vec![]);
1859 thread::sleep(Duration::from_millis(20));
1860 let _ = mgr.drain_messages();
1861 thread::sleep(Duration::from_millis(30));
1862 let msgs = mgr.drain_messages();
1863 assert!(msgs.is_empty(), "should stop receiving after removal");
1864
1865 mgr.reconcile(vec![Box::new(Every::with_id(
1867 300,
1868 Duration::from_millis(10),
1869 || TestMsg::Value(99),
1870 ))]);
1871 thread::sleep(Duration::from_millis(30));
1872 let msgs = mgr.drain_messages();
1873 assert!(
1874 !msgs.is_empty(),
1875 "should receive messages again after restart"
1876 );
1877 assert!(
1878 msgs.iter().any(|m| matches!(m, TestMsg::Value(99))),
1879 "restarted sub should use the new message factory"
1880 );
1881
1882 mgr.stop_all();
1883 }
1884
1885 #[test]
1893 fn lifecycle_non_interference_with_manager_state() {
1894 use std::sync::atomic::{AtomicUsize, Ordering};
1895
1896 let msg_count = std::sync::Arc::new(AtomicUsize::new(0));
1897
1898 struct CountingSub {
1899 id: SubId,
1900 counter: std::sync::Arc<AtomicUsize>,
1901 }
1902
1903 impl Subscription<TestMsg> for CountingSub {
1904 fn id(&self) -> SubId {
1905 self.id
1906 }
1907
1908 fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1909 while !stop.is_stopped() {
1910 if sender.send(TestMsg::Tick).is_err() {
1911 break;
1912 }
1913 self.counter.fetch_add(1, Ordering::SeqCst);
1914 thread::sleep(Duration::from_millis(5));
1915 }
1916 }
1917 }
1918
1919 let mut mgr = SubscriptionManager::<TestMsg>::new();
1920
1921 mgr.reconcile(vec![
1923 Box::new(CountingSub {
1924 id: 400,
1925 counter: msg_count.clone(),
1926 }),
1927 Box::new(CountingSub {
1928 id: 401,
1929 counter: msg_count.clone(),
1930 }),
1931 ]);
1932
1933 thread::sleep(Duration::from_millis(50));
1934
1935 assert_eq!(mgr.active_count(), 2);
1937 let drained = mgr.drain_messages();
1938 let sent_count = msg_count.load(Ordering::SeqCst);
1939 assert!(sent_count > 0, "subscriptions should have sent messages");
1940 assert!(
1941 drained.len() <= sent_count,
1942 "drained {} but only {} sent",
1943 drained.len(),
1944 sent_count
1945 );
1946
1947 mgr.stop_all();
1949 assert_eq!(mgr.active_count(), 0);
1950
1951 let remaining = mgr.drain_messages();
1953 let total_drained = drained.len() + remaining.len();
1954 let final_sent = msg_count.load(Ordering::SeqCst);
1955 assert!(
1956 total_drained <= final_sent,
1957 "total drained ({total_drained}) must not exceed total sent ({final_sent})"
1958 );
1959 }
1960
1961 #[test]
1965 fn lifecycle_shutdown_signal_ordering() {
1966 use std::sync::atomic::{AtomicU64, Ordering};
1967
1968 let signal_times =
1969 std::sync::Arc::new([AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)]);
1970 let epoch = Instant::now();
1971
1972 struct TimingStopSub {
1973 id: SubId,
1974 index: usize,
1975 signal_times: std::sync::Arc<[AtomicU64; 3]>,
1976 epoch: Instant,
1977 }
1978
1979 impl Subscription<TestMsg> for TimingStopSub {
1980 fn id(&self) -> SubId {
1981 self.id
1982 }
1983
1984 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1985 while !stop.is_stopped() {
1986 thread::sleep(Duration::from_millis(1));
1987 }
1988 let elapsed_us = self.epoch.elapsed().as_micros() as u64;
1989 self.signal_times[self.index].store(elapsed_us, Ordering::SeqCst);
1990 }
1991 }
1992
1993 let mut mgr = SubscriptionManager::<TestMsg>::new();
1994 mgr.reconcile(vec![
1995 Box::new(TimingStopSub {
1996 id: 500,
1997 index: 0,
1998 signal_times: signal_times.clone(),
1999 epoch,
2000 }),
2001 Box::new(TimingStopSub {
2002 id: 501,
2003 index: 1,
2004 signal_times: signal_times.clone(),
2005 epoch,
2006 }),
2007 Box::new(TimingStopSub {
2008 id: 502,
2009 index: 2,
2010 signal_times: signal_times.clone(),
2011 epoch,
2012 }),
2013 ]);
2014 thread::sleep(Duration::from_millis(20));
2015
2016 mgr.stop_all();
2017
2018 let t0 = signal_times[0].load(Ordering::SeqCst);
2022 let t1 = signal_times[1].load(Ordering::SeqCst);
2023 let t2 = signal_times[2].load(Ordering::SeqCst);
2024
2025 assert!(
2026 t0 > 0 && t1 > 0 && t2 > 0,
2027 "all subs should have recorded stop time"
2028 );
2029
2030 let max_t = t0.max(t1).max(t2);
2031 let min_t = t0.min(t1).min(t2);
2032 let spread_us = max_t - min_t;
2033
2034 assert!(
2035 spread_us < 10_000, "stop signal spread should be < 10ms for parallel signaling, got {spread_us}us \
2037 (t0={t0}, t1={t1}, t2={t2})"
2038 );
2039 }
2040}