1#![forbid(unsafe_code)]
2
3use crate::cancellation::{CancellationSource, CancellationToken};
18use std::collections::HashSet;
19use std::sync::mpsc;
20use std::thread;
21use web_time::{Duration, Instant};
22
23pub type SubId = u64;
28
29pub trait Subscription<M: Send + 'static>: Send {
34 fn id(&self) -> SubId;
39
40 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
46}
47
48#[derive(Clone)]
55pub struct StopSignal {
56 token: CancellationToken,
57}
58
59impl StopSignal {
60 pub(crate) fn new() -> (Self, StopTrigger) {
62 let source = CancellationSource::new();
63 let signal = Self {
64 token: source.token(),
65 };
66 let trigger = StopTrigger { source };
67 (signal, trigger)
68 }
69
70 pub fn is_stopped(&self) -> bool {
72 self.token.is_cancelled()
73 }
74
75 pub fn wait_timeout(&self, duration: Duration) -> bool {
81 self.token.wait_timeout(duration)
82 }
83
84 pub fn cancellation_token(&self) -> &CancellationToken {
89 &self.token
90 }
91}
92
93pub(crate) struct StopTrigger {
97 source: CancellationSource,
98}
99
100impl StopTrigger {
101 pub(crate) fn stop(&self) {
103 self.source.cancel();
104 }
105}
106
107pub(crate) struct RunningSubscription {
109 pub(crate) id: SubId,
110 trigger: StopTrigger,
111 thread: Option<thread::JoinHandle<()>>,
112 panicked: std::sync::Arc<std::sync::atomic::AtomicBool>,
114}
115
116const SUBSCRIPTION_STOP_JOIN_TIMEOUT: Duration = Duration::from_millis(250);
117const SUBSCRIPTION_STOP_JOIN_POLL: Duration = Duration::from_millis(1);
123
124impl RunningSubscription {
125 pub(crate) fn has_panicked(&self) -> bool {
127 self.panicked.load(std::sync::atomic::Ordering::Acquire)
128 }
129
130 pub(crate) fn signal_stop(&self) {
135 self.trigger.stop();
136 }
137
138 pub(crate) fn join_bounded(mut self) -> Option<thread::JoinHandle<()>> {
143 let handle = self.thread.take()?;
144 let start = Instant::now();
145
146 if handle.is_finished() {
148 let _ = handle.join();
149 tracing::trace!(
150 sub_id = self.id,
151 panicked = self.has_panicked(),
152 elapsed_us = start.elapsed().as_micros() as u64,
153 "subscription join (fast path)"
154 );
155 return None;
156 }
157
158 while !handle.is_finished() {
160 if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
161 tracing::warn!(
162 sub_id = self.id,
163 panicked = self.has_panicked(),
164 timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
165 "subscription join timed out, detaching thread"
166 );
167 return Some(handle);
168 }
169 thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
170 }
171
172 let _ = handle.join();
173 tracing::trace!(
174 sub_id = self.id,
175 panicked = self.has_panicked(),
176 elapsed_us = start.elapsed().as_micros() as u64,
177 "subscription join (slow path)"
178 );
179 None
180 }
181
182 #[cfg_attr(not(test), allow(dead_code))]
187 pub(crate) fn stop(mut self) {
188 self.trigger.stop();
189 if let Some(handle) = self.thread.take() {
190 let start = Instant::now();
191 if handle.is_finished() {
193 let _ = handle.join();
194 tracing::trace!(
195 sub_id = self.id,
196 panicked = self.has_panicked(),
197 elapsed_us = start.elapsed().as_micros() as u64,
198 "subscription stop (fast path)"
199 );
200 return;
201 }
202 while !handle.is_finished() {
204 if start.elapsed() >= SUBSCRIPTION_STOP_JOIN_TIMEOUT {
205 tracing::warn!(
206 sub_id = self.id,
207 panicked = self.has_panicked(),
208 timeout_ms = SUBSCRIPTION_STOP_JOIN_TIMEOUT.as_millis() as u64,
209 "subscription did not stop within timeout; detaching thread"
210 );
211 return;
212 }
213 thread::sleep(SUBSCRIPTION_STOP_JOIN_POLL);
214 }
215 let _ = handle.join();
216 tracing::trace!(
217 sub_id = self.id,
218 panicked = self.has_panicked(),
219 elapsed_us = start.elapsed().as_micros() as u64,
220 "subscription stop (slow path)"
221 );
222 }
223 }
224}
225
226impl Drop for RunningSubscription {
227 fn drop(&mut self) {
228 self.trigger.stop();
229 }
231}
232
233pub(crate) struct SubscriptionManager<M: Send + 'static> {
235 active: Vec<RunningSubscription>,
236 sender: mpsc::Sender<M>,
237 receiver: mpsc::Receiver<M>,
238}
239
240impl<M: Send + 'static> SubscriptionManager<M> {
241 pub(crate) fn new() -> Self {
242 let (sender, receiver) = mpsc::channel();
243 Self {
244 active: Vec::new(),
245 sender,
246 receiver,
247 }
248 }
249
250 pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
257 let reconcile_start = Instant::now();
258 let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
259 let active_count_before = self.active.len();
260
261 crate::debug_trace!(
262 "reconcile: new_ids={:?}, active_before={}",
263 new_ids,
264 active_count_before
265 );
266 tracing::trace!(
267 new_id_count = new_ids.len(),
268 active_before = active_count_before,
269 new_ids = ?new_ids,
270 "subscription reconcile starting"
271 );
272
273 let mut remaining = Vec::new();
275 let mut to_stop = Vec::new();
276 for running in self.active.drain(..) {
277 if new_ids.contains(&running.id) {
278 remaining.push(running);
279 } else {
280 crate::debug_trace!("stopping subscription: id={}", running.id);
281 tracing::debug!(sub_id = running.id, "Stopping subscription");
282 crate::effect_system::record_subscription_stop("subscription", running.id, 0);
283 crate::effect_system::record_dynamics_sub_stop();
284 to_stop.push(running);
285 }
286 }
287 for running in &to_stop {
289 running.signal_stop();
290 }
291 for running in to_stop {
293 let _ = running.join_bounded();
294 }
295 self.active = remaining;
296
297 let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
299 for sub in subscriptions {
300 let id = sub.id();
301 if !active_ids.insert(id) {
302 continue;
303 }
304
305 crate::debug_trace!("starting subscription: id={}", id);
306 tracing::debug!(sub_id = id, "Starting subscription");
307 crate::effect_system::record_subscription_start("subscription", id);
308 crate::effect_system::record_dynamics_sub_start();
309 let (signal, trigger) = StopSignal::new();
310 let sender = self.sender.clone();
311 let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
312 let panicked_flag = panicked.clone();
313 let sub_id_for_thread = id;
314
315 let thread = thread::spawn(move || {
316 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
317 sub.run(sender, signal);
318 }));
319 if let Err(payload) = result {
320 panicked_flag.store(true, std::sync::atomic::Ordering::Release);
321 crate::effect_system::record_dynamics_sub_panic();
322 let panic_msg = match payload.downcast_ref::<&str>() {
323 Some(s) => (*s).to_string(),
324 None => match payload.downcast_ref::<String>() {
325 Some(s) => s.clone(),
326 None => "unknown panic payload".to_string(),
327 },
328 };
329 crate::effect_system::error_effect_panic(
330 "subscription",
331 &format!("sub_id={sub_id_for_thread}: {panic_msg}"),
332 );
333 }
334 });
335
336 self.active.push(RunningSubscription {
337 id,
338 trigger,
339 thread: Some(thread),
340 panicked,
341 });
342 }
343
344 let active_count_after = self.active.len();
345 let reconcile_elapsed_us = reconcile_start.elapsed().as_micros() as u64;
346 crate::effect_system::record_dynamics_reconcile(reconcile_elapsed_us);
347 crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
348 tracing::trace!(
349 active_before = active_count_before,
350 active_after = active_count_after,
351 started = active_count_after.saturating_sub(active_count_before),
352 stopped = active_count_before.saturating_sub(active_count_after),
353 reconcile_us = reconcile_elapsed_us,
354 "subscription reconcile complete"
355 );
356 }
357
358 pub(crate) fn drain_messages(&self) -> Vec<M> {
360 let mut messages = Vec::new();
361 while let Ok(msg) = self.receiver.try_recv() {
362 messages.push(msg);
363 }
364 messages
365 }
366
367 #[inline]
369 pub(crate) fn active_count(&self) -> usize {
370 self.active.len()
371 }
372
373 pub(crate) fn stop_all(&mut self) {
382 let count = self.active.len();
383 if count == 0 {
384 return;
385 }
386 let start = Instant::now();
387
388 for running in &self.active {
390 running.signal_stop();
391 }
392
393 let signal_elapsed_us = start.elapsed().as_micros() as u64;
394 tracing::trace!(
395 target: "ftui.runtime",
396 count,
397 signal_elapsed_us,
398 "subscription stop_all phase 1 (signal) complete"
399 );
400
401 let mut panicked_count = 0_usize;
403 let mut timed_out_count = 0_usize;
404 for running in self.active.drain(..) {
405 if running.has_panicked() {
406 panicked_count += 1;
407 }
408 if running.join_bounded().is_some() {
409 timed_out_count += 1;
410 }
411 }
412
413 let shutdown_elapsed_us = start.elapsed().as_micros() as u64;
414 crate::effect_system::record_dynamics_shutdown(shutdown_elapsed_us, timed_out_count as u64);
415 tracing::debug!(
416 target: "ftui.runtime",
417 count,
418 panicked_count,
419 timed_out_count,
420 elapsed_us = shutdown_elapsed_us,
421 "subscription stop_all complete"
422 );
423 }
424}
425
426impl<M: Send + 'static> Drop for SubscriptionManager<M> {
427 fn drop(&mut self) {
428 self.stop_all();
429 }
430}
431
432pub struct Every<M: Send + 'static> {
444 id: SubId,
445 interval: Duration,
446 make_msg: Box<dyn Fn() -> M + Send + Sync>,
447}
448
449impl<M: Send + 'static> Every<M> {
450 pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
452 let id = interval.as_nanos() as u64 ^ 0x5449_434B; Self {
455 id,
456 interval,
457 make_msg: Box::new(make_msg),
458 }
459 }
460
461 pub fn with_id(
463 id: SubId,
464 interval: Duration,
465 make_msg: impl Fn() -> M + Send + Sync + 'static,
466 ) -> Self {
467 Self {
468 id,
469 interval,
470 make_msg: Box::new(make_msg),
471 }
472 }
473}
474
475impl<M: Send + 'static> Subscription<M> for Every<M> {
476 fn id(&self) -> SubId {
477 self.id
478 }
479
480 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
481 let mut tick_count: u64 = 0;
482 crate::debug_trace!(
483 "Every subscription started: id={}, interval={:?}",
484 self.id,
485 self.interval
486 );
487 loop {
488 if stop.wait_timeout(self.interval) {
489 crate::debug_trace!(
490 "Every subscription stopped: id={}, sent {} ticks",
491 self.id,
492 tick_count
493 );
494 break;
495 }
496 tick_count += 1;
497 let msg = (self.make_msg)();
498 if sender.send(msg).is_err() {
499 crate::debug_trace!(
500 "Every subscription channel closed: id={}, sent {} ticks",
501 self.id,
502 tick_count
503 );
504 break;
505 }
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[derive(Debug, Clone, PartialEq)]
515 enum TestMsg {
516 Tick,
517 Value(i32),
518 }
519
520 struct ChannelSubscription<M: Send + 'static> {
521 id: SubId,
522 receiver: mpsc::Receiver<M>,
523 poll: Duration,
524 }
525
526 impl<M: Send + 'static> ChannelSubscription<M> {
527 fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
528 Self {
529 id,
530 receiver,
531 poll: Duration::from_millis(5),
532 }
533 }
534 }
535
536 impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
537 fn id(&self) -> SubId {
538 self.id
539 }
540
541 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
542 loop {
543 if stop.is_stopped() {
544 break;
545 }
546 match self.receiver.recv_timeout(self.poll) {
547 Ok(msg) => {
548 if sender.send(msg).is_err() {
549 break;
550 }
551 }
552 Err(mpsc::RecvTimeoutError::Timeout) => {}
553 Err(mpsc::RecvTimeoutError::Disconnected) => break,
554 }
555 }
556 }
557 }
558
559 fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
560 let (tx, rx) = mpsc::channel();
561 (ChannelSubscription::new(id, rx), tx)
562 }
563
564 #[test]
565 fn stop_signal_starts_false() {
566 let (signal, _trigger) = StopSignal::new();
567 assert!(!signal.is_stopped());
568 }
569
570 #[test]
571 fn stop_signal_becomes_true_after_trigger() {
572 let (signal, trigger) = StopSignal::new();
573 trigger.stop();
574 assert!(signal.is_stopped());
575 }
576
577 #[test]
578 fn stop_signal_wait_returns_true_when_stopped() {
579 let (signal, trigger) = StopSignal::new();
580 trigger.stop();
581 assert!(signal.wait_timeout(Duration::from_millis(100)));
582 }
583
584 #[test]
585 fn stop_signal_wait_returns_false_on_timeout() {
586 let (signal, _trigger) = StopSignal::new();
587 assert!(!signal.wait_timeout(Duration::from_millis(10)));
588 }
589
590 #[test]
591 fn channel_subscription_forwards_messages() {
592 let (sub, event_tx) = channel_subscription(1);
593 let (tx, rx) = mpsc::channel();
594 let (signal, trigger) = StopSignal::new();
595
596 let handle = thread::spawn(move || {
597 sub.run(tx, signal);
598 });
599
600 event_tx.send(TestMsg::Value(1)).unwrap();
601 event_tx.send(TestMsg::Value(2)).unwrap();
602 thread::sleep(Duration::from_millis(10));
603 trigger.stop();
604 handle.join().unwrap();
605
606 let msgs: Vec<_> = rx.try_iter().collect();
607 assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
608 }
609
610 #[test]
611 fn every_subscription_fires() {
612 let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
613 let (tx, rx) = mpsc::channel();
614 let (signal, trigger) = StopSignal::new();
615
616 let handle = thread::spawn(move || {
617 sub.run(tx, signal);
618 });
619
620 thread::sleep(Duration::from_millis(50));
622 trigger.stop();
623 handle.join().unwrap();
624
625 let msgs: Vec<_> = rx.try_iter().collect();
626 assert!(!msgs.is_empty(), "Should have received at least one tick");
627 assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
628 }
629
630 #[test]
631 fn every_subscription_uses_stable_id() {
632 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
633 let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
634 assert_eq!(sub1.id(), sub2.id());
635 }
636
637 #[test]
638 fn every_subscription_different_intervals_different_ids() {
639 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
640 let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
641 assert_ne!(sub1.id(), sub2.id());
642 }
643
644 #[test]
645 fn subscription_manager_starts_subscriptions() {
646 let mut mgr = SubscriptionManager::<TestMsg>::new();
647 let (sub, event_tx) = channel_subscription(1);
648 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
649
650 mgr.reconcile(subs);
651 event_tx.send(TestMsg::Value(42)).unwrap();
652
653 thread::sleep(Duration::from_millis(20));
655
656 let msgs = mgr.drain_messages();
657 assert_eq!(msgs, vec![TestMsg::Value(42)]);
658 }
659
660 #[test]
661 fn subscription_manager_dedupes_duplicate_ids() {
662 let mut mgr = SubscriptionManager::<TestMsg>::new();
663 let (sub_a, tx_a) = channel_subscription(7);
664 let (sub_b, tx_b) = channel_subscription(7);
665 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
666
667 mgr.reconcile(subs);
668
669 tx_a.send(TestMsg::Value(1)).unwrap();
670 assert!(
671 tx_b.send(TestMsg::Value(2)).is_err(),
672 "Duplicate subscription should be dropped"
673 );
674
675 thread::sleep(Duration::from_millis(20));
676 let msgs = mgr.drain_messages();
677 assert_eq!(msgs, vec![TestMsg::Value(1)]);
678 }
679
680 #[test]
681 fn subscription_manager_stops_removed() {
682 let mut mgr = SubscriptionManager::<TestMsg>::new();
683
684 mgr.reconcile(vec![Box::new(Every::with_id(
686 99,
687 Duration::from_millis(5),
688 || TestMsg::Tick,
689 ))]);
690
691 thread::sleep(Duration::from_millis(20));
692 let msgs_before = mgr.drain_messages();
693 assert!(!msgs_before.is_empty());
694
695 mgr.reconcile(vec![]);
697
698 thread::sleep(Duration::from_millis(20));
700 let _ = mgr.drain_messages();
701
702 thread::sleep(Duration::from_millis(30));
704 let msgs_after = mgr.drain_messages();
705 assert!(
706 msgs_after.is_empty(),
707 "Should stop receiving after reconcile with empty set"
708 );
709 }
710
711 #[test]
712 fn subscription_manager_keeps_unchanged() {
713 let mut mgr = SubscriptionManager::<TestMsg>::new();
714
715 mgr.reconcile(vec![Box::new(Every::with_id(
717 50,
718 Duration::from_millis(10),
719 || TestMsg::Tick,
720 ))]);
721
722 thread::sleep(Duration::from_millis(30));
723 let _ = mgr.drain_messages();
724
725 mgr.reconcile(vec![Box::new(Every::with_id(
727 50,
728 Duration::from_millis(10),
729 || TestMsg::Tick,
730 ))]);
731
732 thread::sleep(Duration::from_millis(30));
733 let msgs = mgr.drain_messages();
734 assert!(!msgs.is_empty(), "Subscription should still be running");
735 }
736
737 #[test]
738 fn subscription_manager_stop_all() {
739 let mut mgr = SubscriptionManager::<TestMsg>::new();
740
741 mgr.reconcile(vec![
742 Box::new(Every::with_id(1, Duration::from_millis(5), || {
743 TestMsg::Value(1)
744 })),
745 Box::new(Every::with_id(2, Duration::from_millis(5), || {
746 TestMsg::Value(2)
747 })),
748 ]);
749
750 thread::sleep(Duration::from_millis(20));
751 mgr.stop_all();
752
753 thread::sleep(Duration::from_millis(20));
754 let _ = mgr.drain_messages();
755 thread::sleep(Duration::from_millis(30));
756 let msgs = mgr.drain_messages();
757 assert!(msgs.is_empty());
758 }
759
760 #[test]
765 fn stop_signal_is_cloneable() {
766 let (signal, trigger) = StopSignal::new();
767 let signal_clone = signal.clone();
768
769 assert!(!signal.is_stopped());
770 assert!(!signal_clone.is_stopped());
771
772 trigger.stop();
773
774 assert!(signal.is_stopped());
775 assert!(signal_clone.is_stopped());
776 }
777
778 #[test]
779 fn stop_signal_wait_wakes_immediately_when_already_stopped() {
780 let (signal, trigger) = StopSignal::new();
781 trigger.stop();
782
783 let start = Instant::now();
785 let stopped = signal.wait_timeout(Duration::from_secs(10));
786 let elapsed = start.elapsed();
787
788 assert!(stopped);
789 assert!(elapsed < Duration::from_millis(100));
790 }
791
792 #[test]
793 fn stop_signal_wait_is_interrupted_by_trigger() {
794 let (signal, trigger) = StopSignal::new();
795
796 let signal_clone = signal.clone();
797 let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
798
799 thread::sleep(Duration::from_millis(20));
801 trigger.stop();
802
803 let stopped = handle.join().unwrap();
804 assert!(stopped);
805 }
806
807 #[test]
808 fn channel_subscription_no_messages_without_events() {
809 let (sub, _event_tx) = channel_subscription(1);
810 let (tx, rx) = mpsc::channel();
811 let (signal, trigger) = StopSignal::new();
812
813 let handle = thread::spawn(move || {
814 sub.run(tx, signal);
815 });
816
817 thread::sleep(Duration::from_millis(10));
818 trigger.stop();
819 handle.join().unwrap();
820
821 let msgs: Vec<_> = rx.try_iter().collect();
822 assert!(msgs.is_empty());
823 }
824
825 #[test]
826 fn channel_subscription_id_is_preserved() {
827 let (sub, _tx) = channel_subscription(42);
828 assert_eq!(sub.id(), 42);
829 }
830
831 #[test]
832 fn channel_subscription_stops_on_disconnected_receiver() {
833 let (sub, event_tx) = channel_subscription(1);
834 let (tx, _rx) = mpsc::channel();
835 let (signal, _trigger) = StopSignal::new();
836
837 drop(event_tx);
838
839 let handle = thread::spawn(move || {
840 sub.run(tx, signal);
841 });
842
843 let result = handle.join();
844 assert!(result.is_ok());
845 }
846
847 #[test]
848 fn every_with_id_preserves_custom_id() {
849 let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
850 assert_eq!(sub.id(), 12345);
851 }
852
853 #[test]
854 fn every_stops_on_disconnected_receiver() {
855 let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
856 let (tx, rx) = mpsc::channel();
857 let (signal, _trigger) = StopSignal::new();
858
859 drop(rx);
861
862 let handle = thread::spawn(move || {
864 sub.run(tx, signal);
865 });
866
867 let result = handle.join();
869 assert!(result.is_ok());
870 }
871
872 #[test]
873 fn every_respects_interval() {
874 let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
875 let (tx, rx) = mpsc::channel();
876 let (signal, trigger) = StopSignal::new();
877
878 let start = Instant::now();
879 let handle = thread::spawn(move || {
880 sub.run(tx, signal);
881 });
882
883 thread::sleep(Duration::from_millis(160));
885 trigger.stop();
886 handle.join().unwrap();
887
888 let msgs: Vec<_> = rx.try_iter().collect();
889 let elapsed = start.elapsed();
890
891 assert!(
893 msgs.len() >= 2,
894 "Expected at least 2 ticks, got {}",
895 msgs.len()
896 );
897 assert!(
898 msgs.len() <= 4,
899 "Expected at most 4 ticks, got {}",
900 msgs.len()
901 );
902 assert!(elapsed >= Duration::from_millis(150));
903 }
904
905 #[test]
906 fn subscription_manager_empty_reconcile() {
907 let mut mgr = SubscriptionManager::<TestMsg>::new();
908
909 mgr.reconcile(vec![]);
911 let msgs = mgr.drain_messages();
912 assert!(msgs.is_empty());
913 }
914
915 #[test]
916 fn subscription_manager_drain_messages_returns_all() {
917 let mut mgr = SubscriptionManager::<TestMsg>::new();
918 let (sub, event_tx) = channel_subscription(1);
919 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
920
921 mgr.reconcile(subs);
922 event_tx.send(TestMsg::Value(1)).unwrap();
923 event_tx.send(TestMsg::Value(2)).unwrap();
924 thread::sleep(Duration::from_millis(20));
925
926 let msgs = mgr.drain_messages();
927 assert_eq!(msgs.len(), 2);
928 assert_eq!(msgs[0], TestMsg::Value(1));
929 assert_eq!(msgs[1], TestMsg::Value(2));
930
931 let msgs2 = mgr.drain_messages();
933 assert!(msgs2.is_empty());
934 }
935
936 #[test]
937 fn subscription_manager_replaces_subscription_with_different_id() {
938 let mut mgr = SubscriptionManager::<TestMsg>::new();
939 let (sub1, tx1) = channel_subscription(1);
940
941 mgr.reconcile(vec![Box::new(sub1)]);
943 tx1.send(TestMsg::Value(1)).unwrap();
944 thread::sleep(Duration::from_millis(20));
945 let msgs1 = mgr.drain_messages();
946 assert_eq!(msgs1, vec![TestMsg::Value(1)]);
947
948 let (sub2, tx2) = channel_subscription(2);
950 mgr.reconcile(vec![Box::new(sub2)]);
951 tx2.send(TestMsg::Value(2)).unwrap();
952 thread::sleep(Duration::from_millis(20));
953 let msgs2 = mgr.drain_messages();
954 assert_eq!(msgs2, vec![TestMsg::Value(2)]);
955 }
956
957 #[test]
958 fn subscription_manager_multiple_subscriptions() {
959 let mut mgr = SubscriptionManager::<TestMsg>::new();
960 let (sub1, tx1) = channel_subscription(1);
961 let (sub2, tx2) = channel_subscription(2);
962 let (sub3, tx3) = channel_subscription(3);
963 let subs: Vec<Box<dyn Subscription<TestMsg>>> =
964 vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
965
966 mgr.reconcile(subs);
967 tx1.send(TestMsg::Value(10)).unwrap();
968 tx2.send(TestMsg::Value(20)).unwrap();
969 tx3.send(TestMsg::Value(30)).unwrap();
970 thread::sleep(Duration::from_millis(30));
971
972 let mut msgs = mgr.drain_messages();
973 msgs.sort_by_key(|m| match m {
974 TestMsg::Value(v) => *v,
975 _ => 0,
976 });
977
978 assert_eq!(msgs.len(), 3);
979 assert_eq!(msgs[0], TestMsg::Value(10));
980 assert_eq!(msgs[1], TestMsg::Value(20));
981 assert_eq!(msgs[2], TestMsg::Value(30));
982 }
983
984 #[test]
985 fn subscription_manager_partial_update() {
986 let mut mgr = SubscriptionManager::<TestMsg>::new();
987
988 mgr.reconcile(vec![
990 Box::new(Every::with_id(1, Duration::from_millis(10), || {
991 TestMsg::Value(1)
992 })),
993 Box::new(Every::with_id(2, Duration::from_millis(10), || {
994 TestMsg::Value(2)
995 })),
996 Box::new(Every::with_id(3, Duration::from_millis(10), || {
997 TestMsg::Value(3)
998 })),
999 ]);
1000
1001 thread::sleep(Duration::from_millis(30));
1002 let _ = mgr.drain_messages();
1003
1004 mgr.reconcile(vec![
1006 Box::new(Every::with_id(1, Duration::from_millis(10), || {
1007 TestMsg::Value(1)
1008 })),
1009 Box::new(Every::with_id(3, Duration::from_millis(10), || {
1010 TestMsg::Value(3)
1011 })),
1012 ]);
1013
1014 let _ = mgr.drain_messages();
1017
1018 thread::sleep(Duration::from_millis(30));
1020 let msgs = mgr.drain_messages();
1021
1022 let values: Vec<i32> = msgs
1024 .iter()
1025 .filter_map(|m| match m {
1026 TestMsg::Value(v) => Some(*v),
1027 _ => None,
1028 })
1029 .collect();
1030
1031 assert!(
1032 values.contains(&1),
1033 "Should still receive from subscription 1"
1034 );
1035 assert!(
1036 values.contains(&3),
1037 "Should still receive from subscription 3"
1038 );
1039 assert!(
1040 !values.contains(&2),
1041 "Should not receive from stopped subscription 2"
1042 );
1043 }
1044
1045 #[test]
1046 fn subscription_manager_drop_stops_all() {
1047 let (_signal, _) = StopSignal::new();
1048 let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1049 let flag_clone = flag.clone();
1050
1051 struct FlagSubscription {
1052 id: SubId,
1053 flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
1054 }
1055
1056 impl Subscription<TestMsg> for FlagSubscription {
1057 fn id(&self) -> SubId {
1058 self.id
1059 }
1060
1061 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1062 while !stop.is_stopped() {
1063 thread::sleep(Duration::from_millis(5));
1064 }
1065 self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
1066 }
1067 }
1068
1069 {
1070 let mut mgr = SubscriptionManager::<TestMsg>::new();
1071 mgr.reconcile(vec![Box::new(FlagSubscription {
1072 id: 1,
1073 flag: flag_clone,
1074 })]);
1075
1076 thread::sleep(Duration::from_millis(20));
1077 }
1079
1080 thread::sleep(Duration::from_millis(50));
1081 assert!(
1082 flag.load(std::sync::atomic::Ordering::SeqCst),
1083 "Subscription should have stopped on drop"
1084 );
1085 }
1086
1087 #[test]
1088 fn running_subscription_stop_joins_thread() {
1089 use std::sync::atomic::{AtomicBool, Ordering};
1090
1091 let completed = std::sync::Arc::new(AtomicBool::new(false));
1092 let completed_clone = completed.clone();
1093
1094 let (signal, trigger) = StopSignal::new();
1095 let (_tx, _rx) = mpsc::channel::<TestMsg>();
1096
1097 let thread = thread::spawn(move || {
1098 while !signal.is_stopped() {
1099 thread::sleep(Duration::from_millis(5));
1100 }
1101 completed_clone.store(true, Ordering::SeqCst);
1102 });
1103
1104 let running = RunningSubscription {
1105 id: 1,
1106 trigger,
1107 thread: Some(thread),
1108 panicked: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1109 };
1110
1111 running.stop();
1112 assert!(completed.load(Ordering::SeqCst));
1113 }
1114
1115 #[test]
1116 fn running_subscription_stop_times_out_for_uncooperative_thread() {
1117 use std::sync::atomic::{AtomicBool, Ordering};
1118
1119 let completed = std::sync::Arc::new(AtomicBool::new(false));
1120 let completed_clone = completed.clone();
1121
1122 let (_signal, trigger) = StopSignal::new();
1123 let thread = thread::spawn(move || {
1124 thread::sleep(Duration::from_millis(500));
1125 completed_clone.store(true, Ordering::SeqCst);
1126 });
1127
1128 let running = RunningSubscription {
1129 id: 7,
1130 trigger,
1131 thread: Some(thread),
1132 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1133 };
1134
1135 let start = Instant::now();
1136 running.stop();
1137 assert!(
1138 start.elapsed() < Duration::from_millis(400),
1139 "stop() should not block behind an uncooperative subscription thread"
1140 );
1141
1142 thread::sleep(Duration::from_millis(550));
1143 assert!(completed.load(Ordering::SeqCst));
1144 }
1145
1146 #[test]
1147 fn every_id_stable_across_instances() {
1148 let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1150 let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1151 let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
1152
1153 assert_eq!(sub1.id(), sub2.id());
1154 assert_eq!(sub2.id(), sub3.id()); }
1156
1157 #[test]
1158 fn drain_messages_preserves_order() {
1159 let mut mgr = SubscriptionManager::<TestMsg>::new();
1160
1161 struct OrderedSubscription {
1163 values: Vec<i32>,
1164 }
1165
1166 impl Subscription<TestMsg> for OrderedSubscription {
1167 fn id(&self) -> SubId {
1168 999
1169 }
1170
1171 fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1172 for v in &self.values {
1173 let _ = sender.send(TestMsg::Value(*v));
1174 thread::sleep(Duration::from_millis(1));
1175 }
1176 }
1177 }
1178
1179 mgr.reconcile(vec![Box::new(OrderedSubscription {
1180 values: vec![1, 2, 3, 4, 5],
1181 })]);
1182
1183 thread::sleep(Duration::from_millis(30));
1184 let msgs = mgr.drain_messages();
1185
1186 let values: Vec<i32> = msgs
1187 .iter()
1188 .filter_map(|m| match m {
1189 TestMsg::Value(v) => Some(*v),
1190 _ => None,
1191 })
1192 .collect();
1193
1194 assert_eq!(values, vec![1, 2, 3, 4, 5]);
1195 }
1196
1197 #[test]
1198 fn subscription_manager_new_is_empty() {
1199 let mgr = SubscriptionManager::<TestMsg>::new();
1200 let msgs = mgr.drain_messages();
1201 assert!(msgs.is_empty());
1202 }
1203
1204 #[test]
1216 fn contract_stop_signal_resilient_to_thread_panics() {
1217 let (signal, trigger) = StopSignal::new();
1218 let signal_clone = signal.clone();
1219
1220 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1222 assert!(!signal_clone.is_stopped());
1223 panic!("intentional panic while holding signal clone");
1224 }));
1225 assert!(result.is_err());
1226
1227 assert!(
1229 !signal.is_stopped(),
1230 "signal should still report not-stopped"
1231 );
1232 trigger.stop();
1233 assert!(
1234 signal.is_stopped(),
1235 "signal should report stopped after trigger"
1236 );
1237 assert!(
1238 signal.wait_timeout(Duration::from_millis(10)),
1239 "wait_timeout should return true when stopped"
1240 );
1241 }
1242
1243 #[test]
1246 fn contract_stop_signal_exposes_cancellation_token() {
1247 let (signal, trigger) = StopSignal::new();
1248 let token = signal.cancellation_token();
1249 assert!(!token.is_cancelled(), "token should start uncancelled");
1250 trigger.stop();
1251 assert!(token.is_cancelled(), "token should be cancelled after stop");
1252 }
1253
1254 #[test]
1258 fn contract_stop_all_bounded_time_with_uncooperative_subscriptions() {
1259 let mut mgr = SubscriptionManager::<TestMsg>::new();
1260
1261 struct UncooperativeSub {
1263 id: SubId,
1264 }
1265
1266 impl Subscription<TestMsg> for UncooperativeSub {
1267 fn id(&self) -> SubId {
1268 self.id
1269 }
1270
1271 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1272 thread::sleep(Duration::from_secs(5));
1274 }
1275 }
1276
1277 mgr.reconcile(vec![
1278 Box::new(UncooperativeSub { id: 100 }),
1279 Box::new(UncooperativeSub { id: 200 }),
1280 ]);
1281
1282 thread::sleep(Duration::from_millis(20)); let start = Instant::now();
1285 mgr.stop_all();
1286 let elapsed = start.elapsed();
1287
1288 assert!(
1290 elapsed < Duration::from_millis(800),
1291 "stop_all took {elapsed:?}, expected < 800ms for 2 uncooperative subscriptions"
1292 );
1293 }
1294
1295 #[test]
1298 fn contract_reconcile_deduplicates_by_id_not_identity() {
1299 let mut mgr = SubscriptionManager::<TestMsg>::new();
1300 let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
1301
1302 struct CountingSub {
1303 id: SubId,
1304 counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
1305 }
1306
1307 impl Subscription<TestMsg> for CountingSub {
1308 fn id(&self) -> SubId {
1309 self.id
1310 }
1311
1312 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1313 self.counter
1314 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1315 while !stop.is_stopped() {
1316 thread::sleep(Duration::from_millis(5));
1317 }
1318 }
1319 }
1320
1321 mgr.reconcile(vec![Box::new(CountingSub {
1323 id: 42,
1324 counter: counter.clone(),
1325 })]);
1326 thread::sleep(Duration::from_millis(20));
1327 assert_eq!(
1328 counter.load(std::sync::atomic::Ordering::SeqCst),
1329 1,
1330 "first reconcile should start exactly 1 thread"
1331 );
1332
1333 mgr.reconcile(vec![Box::new(CountingSub {
1335 id: 42,
1336 counter: counter.clone(),
1337 })]);
1338 thread::sleep(Duration::from_millis(20));
1339 assert_eq!(
1340 counter.load(std::sync::atomic::Ordering::SeqCst),
1341 1,
1342 "second reconcile with same ID must not start another thread"
1343 );
1344
1345 mgr.stop_all();
1346 }
1347
1348 #[test]
1352 fn contract_buffered_messages_available_after_subscription_stopped() {
1353 let mut mgr = SubscriptionManager::<TestMsg>::new();
1354
1355 struct BurstSub;
1356
1357 impl Subscription<TestMsg> for BurstSub {
1358 fn id(&self) -> SubId {
1359 77
1360 }
1361
1362 fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1363 for i in 0..10 {
1365 let _ = sender.send(TestMsg::Value(i));
1366 }
1367 while !stop.is_stopped() {
1369 thread::sleep(Duration::from_millis(5));
1370 }
1371 }
1372 }
1373
1374 mgr.reconcile(vec![Box::new(BurstSub)]);
1375 thread::sleep(Duration::from_millis(30));
1376
1377 mgr.reconcile(vec![]);
1379
1380 let msgs = mgr.drain_messages();
1382 let values: Vec<i32> = msgs
1383 .iter()
1384 .filter_map(|m| match m {
1385 TestMsg::Value(v) => Some(*v),
1386 _ => None,
1387 })
1388 .collect();
1389
1390 assert!(
1391 values.len() >= 5,
1392 "Expected at least 5 buffered messages after stop, got {}",
1393 values.len()
1394 );
1395 }
1396
1397 #[test]
1400 fn contract_active_count_tracks_running_subscriptions() {
1401 let mut mgr = SubscriptionManager::<TestMsg>::new();
1402
1403 assert_eq!(mgr.active_count(), 0, "empty manager");
1404
1405 mgr.reconcile(vec![
1406 Box::new(Every::with_id(1, Duration::from_millis(50), || {
1407 TestMsg::Tick
1408 })),
1409 Box::new(Every::with_id(2, Duration::from_millis(50), || {
1410 TestMsg::Tick
1411 })),
1412 ]);
1413 assert_eq!(mgr.active_count(), 2, "after starting 2");
1414
1415 mgr.reconcile(vec![Box::new(Every::with_id(
1416 1,
1417 Duration::from_millis(50),
1418 || TestMsg::Tick,
1419 ))]);
1420 assert_eq!(mgr.active_count(), 1, "after removing 1");
1421
1422 mgr.stop_all();
1423 assert_eq!(mgr.active_count(), 0, "after stop_all");
1424 }
1425
1426 #[test]
1430 fn contract_every_id_derived_from_interval_only() {
1431 let sub_a = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
1432 let sub_b = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(999));
1433 assert_eq!(
1434 sub_a.id(),
1435 sub_b.id(),
1436 "Every ID must depend only on interval, not message factory"
1437 );
1438
1439 let sub_c = Every::<TestMsg>::new(Duration::from_millis(200), || TestMsg::Tick);
1440 assert_ne!(
1441 sub_a.id(),
1442 sub_c.id(),
1443 "Different intervals must produce different IDs"
1444 );
1445 }
1446
1447 #[test]
1450 fn contract_every_id_formula_is_stable() {
1451 let interval = Duration::from_millis(100);
1452 let expected_id = interval.as_nanos() as u64 ^ 0x5449_434B;
1453 let sub = Every::<TestMsg>::new(interval, || TestMsg::Tick);
1454 assert_eq!(
1455 sub.id(),
1456 expected_id,
1457 "Every ID formula must be: interval.as_nanos() as u64 ^ 0x5449_434B"
1458 );
1459 }
1460
1461 #[test]
1464 fn contract_drop_triggers_stop_all() {
1465 use std::sync::atomic::{AtomicUsize, Ordering};
1466
1467 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1468
1469 struct StopCountingSub {
1470 id: SubId,
1471 counter: std::sync::Arc<AtomicUsize>,
1472 }
1473
1474 impl Subscription<TestMsg> for StopCountingSub {
1475 fn id(&self) -> SubId {
1476 self.id
1477 }
1478
1479 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1480 while !stop.is_stopped() {
1481 thread::sleep(Duration::from_millis(5));
1482 }
1483 self.counter.fetch_add(1, Ordering::SeqCst);
1484 }
1485 }
1486
1487 {
1488 let mut mgr = SubscriptionManager::<TestMsg>::new();
1489 mgr.reconcile(vec![
1490 Box::new(StopCountingSub {
1491 id: 1,
1492 counter: stop_count.clone(),
1493 }),
1494 Box::new(StopCountingSub {
1495 id: 2,
1496 counter: stop_count.clone(),
1497 }),
1498 Box::new(StopCountingSub {
1499 id: 3,
1500 counter: stop_count.clone(),
1501 }),
1502 ]);
1503 thread::sleep(Duration::from_millis(20));
1504 }
1506
1507 thread::sleep(Duration::from_millis(400));
1509 assert_eq!(
1510 stop_count.load(std::sync::atomic::Ordering::SeqCst),
1511 3,
1512 "all 3 subscription threads must have observed stop signal on drop"
1513 );
1514 }
1515
1516 #[test]
1519 fn contract_stop_join_timeout_is_250ms() {
1520 assert_eq!(
1521 SUBSCRIPTION_STOP_JOIN_TIMEOUT,
1522 Duration::from_millis(250),
1523 "join timeout must be 250ms"
1524 );
1525 assert_eq!(
1526 SUBSCRIPTION_STOP_JOIN_POLL,
1527 Duration::from_millis(1),
1528 "join poll interval must be 1ms (bd-1f2aw)"
1529 );
1530 }
1531
1532 #[test]
1542 fn lifecycle_panic_in_subscription_is_caught() {
1543 use std::sync::atomic::Ordering;
1544
1545 let mut mgr = SubscriptionManager::<TestMsg>::new();
1546
1547 struct PanickingSub;
1548
1549 impl Subscription<TestMsg> for PanickingSub {
1550 fn id(&self) -> SubId {
1551 0xDEAD
1552 }
1553
1554 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1555 panic!("intentional test panic in subscription");
1556 }
1557 }
1558
1559 mgr.reconcile(vec![Box::new(PanickingSub)]);
1560
1561 thread::sleep(Duration::from_millis(50));
1563
1564 assert_eq!(
1566 mgr.active_count(),
1567 1,
1568 "panicked sub still tracked as active"
1569 );
1570
1571 assert!(
1573 mgr.active[0].panicked.load(Ordering::Acquire),
1574 "panicked flag should be set after subscription panic"
1575 );
1576
1577 mgr.stop_all();
1579 assert_eq!(mgr.active_count(), 0);
1580 }
1581
1582 #[test]
1585 fn lifecycle_panic_does_not_affect_sibling_subscriptions() {
1586 let mut mgr = SubscriptionManager::<TestMsg>::new();
1587
1588 struct PanickingSub;
1589 impl Subscription<TestMsg> for PanickingSub {
1590 fn id(&self) -> SubId {
1591 0xBAD
1592 }
1593 fn run(&self, _sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
1594 panic!("boom");
1595 }
1596 }
1597
1598 mgr.reconcile(vec![
1599 Box::new(PanickingSub),
1600 Box::new(Every::with_id(42, Duration::from_millis(10), || {
1601 TestMsg::Tick
1602 })),
1603 ]);
1604
1605 thread::sleep(Duration::from_millis(100));
1608
1609 let msgs = mgr.drain_messages();
1610 assert!(
1611 !msgs.is_empty(),
1612 "sibling subscription should still deliver messages after a panic in another sub"
1613 );
1614
1615 mgr.stop_all();
1616 }
1617
1618 #[test]
1621 fn lifecycle_stop_all_parallel_shutdown() {
1622 use std::sync::atomic::{AtomicUsize, Ordering};
1623
1624 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1625 let sub_count = 4;
1626
1627 struct SlowStopSub {
1628 id: SubId,
1629 counter: std::sync::Arc<AtomicUsize>,
1630 }
1631
1632 impl Subscription<TestMsg> for SlowStopSub {
1633 fn id(&self) -> SubId {
1634 self.id
1635 }
1636
1637 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1638 while !stop.is_stopped() {
1640 thread::sleep(Duration::from_millis(5));
1641 }
1642 thread::sleep(Duration::from_millis(50));
1643 self.counter.fetch_add(1, Ordering::SeqCst);
1644 }
1645 }
1646
1647 let mut mgr = SubscriptionManager::<TestMsg>::new();
1648 let subs: Vec<Box<dyn Subscription<TestMsg>>> = (0..sub_count)
1649 .map(|i| -> Box<dyn Subscription<TestMsg>> {
1650 Box::new(SlowStopSub {
1651 id: 1000 + i,
1652 counter: stop_count.clone(),
1653 })
1654 })
1655 .collect();
1656
1657 mgr.reconcile(subs);
1658 thread::sleep(Duration::from_millis(20));
1659
1660 let start = Instant::now();
1661 mgr.stop_all();
1662 let elapsed = start.elapsed();
1663
1664 assert!(
1669 elapsed < Duration::from_millis(150),
1670 "parallel stop_all took {elapsed:?}, expected < 150ms \
1671 (sequential would be ~{expected_sequential}ms)",
1672 expected_sequential = sub_count * 50
1673 );
1674
1675 thread::sleep(Duration::from_millis(20));
1677 assert_eq!(
1678 stop_count.load(Ordering::SeqCst),
1679 sub_count as usize,
1680 "all subscriptions should have completed their cleanup"
1681 );
1682 }
1683
1684 #[test]
1687 fn lifecycle_reconcile_removal_uses_parallel_stop() {
1688 use std::sync::atomic::{AtomicUsize, Ordering};
1689
1690 let stop_count = std::sync::Arc::new(AtomicUsize::new(0));
1691
1692 struct SlowStopSub {
1693 id: SubId,
1694 counter: std::sync::Arc<AtomicUsize>,
1695 }
1696
1697 impl Subscription<TestMsg> for SlowStopSub {
1698 fn id(&self) -> SubId {
1699 self.id
1700 }
1701
1702 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1703 while !stop.is_stopped() {
1704 thread::sleep(Duration::from_millis(5));
1705 }
1706 thread::sleep(Duration::from_millis(40));
1707 self.counter.fetch_add(1, Ordering::SeqCst);
1708 }
1709 }
1710
1711 let mut mgr = SubscriptionManager::<TestMsg>::new();
1712 mgr.reconcile(vec![
1713 Box::new(SlowStopSub {
1714 id: 2000,
1715 counter: stop_count.clone(),
1716 }),
1717 Box::new(SlowStopSub {
1718 id: 2001,
1719 counter: stop_count.clone(),
1720 }),
1721 Box::new(SlowStopSub {
1722 id: 2002,
1723 counter: stop_count.clone(),
1724 }),
1725 ]);
1726 thread::sleep(Duration::from_millis(20));
1727
1728 let start = Instant::now();
1730 mgr.reconcile(vec![]);
1731 let elapsed = start.elapsed();
1732
1733 assert!(
1735 elapsed < Duration::from_millis(100),
1736 "reconcile removal took {elapsed:?}, expected < 100ms with parallel stop"
1737 );
1738
1739 thread::sleep(Duration::from_millis(20));
1740 assert_eq!(stop_count.load(Ordering::SeqCst), 3);
1741 }
1742
1743 #[test]
1745 fn lifecycle_has_panicked_tracks_state() {
1746 let panicked = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1747 let panicked_flag = panicked.clone();
1748
1749 let (signal, trigger) = StopSignal::new();
1750 let thread = thread::spawn(move || {
1751 signal.wait_timeout(Duration::from_secs(10));
1752 });
1753
1754 let running = RunningSubscription {
1755 id: 999,
1756 trigger,
1757 thread: Some(thread),
1758 panicked,
1759 };
1760
1761 assert!(!running.has_panicked(), "should not be panicked initially");
1762
1763 panicked_flag.store(true, std::sync::atomic::Ordering::Release);
1765 assert!(running.has_panicked(), "should reflect panicked state");
1766
1767 running.stop();
1768 }
1769
1770 #[test]
1773 fn lifecycle_signal_then_join_works() {
1774 use std::sync::atomic::{AtomicBool, Ordering};
1775
1776 let completed = std::sync::Arc::new(AtomicBool::new(false));
1777 let completed_clone = completed.clone();
1778
1779 let (signal, trigger) = StopSignal::new();
1780 let thread = thread::spawn(move || {
1781 while !signal.is_stopped() {
1782 thread::sleep(Duration::from_millis(5));
1783 }
1784 completed_clone.store(true, Ordering::SeqCst);
1785 });
1786
1787 let running = RunningSubscription {
1788 id: 888,
1789 trigger,
1790 thread: Some(thread),
1791 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1792 };
1793
1794 running.signal_stop();
1795 let leftover = running.join_bounded();
1796 assert!(
1797 leftover.is_none(),
1798 "cooperative thread should join within timeout"
1799 );
1800 assert!(
1801 completed.load(Ordering::SeqCst),
1802 "thread should have completed"
1803 );
1804 }
1805
1806 #[test]
1808 fn lifecycle_join_bounded_returns_handle_for_uncooperative() {
1809 use std::sync::atomic::AtomicBool;
1810
1811 let (_signal, trigger) = StopSignal::new();
1812 let thread = thread::spawn(move || {
1813 thread::sleep(Duration::from_millis(500));
1814 });
1815
1816 let running = RunningSubscription {
1817 id: 777,
1818 trigger,
1819 thread: Some(thread),
1820 panicked: std::sync::Arc::new(AtomicBool::new(false)),
1821 };
1822
1823 running.signal_stop();
1824 let start = Instant::now();
1825 let leftover = running.join_bounded();
1826 let elapsed = start.elapsed();
1827
1828 assert!(
1829 leftover.is_some(),
1830 "uncooperative thread should not join within timeout"
1831 );
1832 assert!(
1833 elapsed < Duration::from_millis(400),
1834 "join_bounded should respect the 250ms timeout, took {elapsed:?}"
1835 );
1836 }
1837
1838 #[test]
1841 fn lifecycle_restart_after_stop() {
1842 let mut mgr = SubscriptionManager::<TestMsg>::new();
1843
1844 mgr.reconcile(vec![Box::new(Every::with_id(
1846 300,
1847 Duration::from_millis(10),
1848 || TestMsg::Tick,
1849 ))]);
1850 thread::sleep(Duration::from_millis(30));
1851 let msgs = mgr.drain_messages();
1852 assert!(!msgs.is_empty(), "should receive ticks");
1853
1854 mgr.reconcile(vec![]);
1856 thread::sleep(Duration::from_millis(20));
1857 let _ = mgr.drain_messages();
1858 thread::sleep(Duration::from_millis(30));
1859 let msgs = mgr.drain_messages();
1860 assert!(msgs.is_empty(), "should stop receiving after removal");
1861
1862 mgr.reconcile(vec![Box::new(Every::with_id(
1864 300,
1865 Duration::from_millis(10),
1866 || TestMsg::Value(99),
1867 ))]);
1868 thread::sleep(Duration::from_millis(30));
1869 let msgs = mgr.drain_messages();
1870 assert!(
1871 !msgs.is_empty(),
1872 "should receive messages again after restart"
1873 );
1874 assert!(
1875 msgs.iter().any(|m| matches!(m, TestMsg::Value(99))),
1876 "restarted sub should use the new message factory"
1877 );
1878
1879 mgr.stop_all();
1880 }
1881
1882 #[test]
1890 fn lifecycle_non_interference_with_manager_state() {
1891 use std::sync::atomic::{AtomicUsize, Ordering};
1892
1893 let msg_count = std::sync::Arc::new(AtomicUsize::new(0));
1894
1895 struct CountingSub {
1896 id: SubId,
1897 counter: std::sync::Arc<AtomicUsize>,
1898 }
1899
1900 impl Subscription<TestMsg> for CountingSub {
1901 fn id(&self) -> SubId {
1902 self.id
1903 }
1904
1905 fn run(&self, sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1906 while !stop.is_stopped() {
1907 if sender.send(TestMsg::Tick).is_err() {
1908 break;
1909 }
1910 self.counter.fetch_add(1, Ordering::SeqCst);
1911 thread::sleep(Duration::from_millis(5));
1912 }
1913 }
1914 }
1915
1916 let mut mgr = SubscriptionManager::<TestMsg>::new();
1917
1918 mgr.reconcile(vec![
1920 Box::new(CountingSub {
1921 id: 400,
1922 counter: msg_count.clone(),
1923 }),
1924 Box::new(CountingSub {
1925 id: 401,
1926 counter: msg_count.clone(),
1927 }),
1928 ]);
1929
1930 thread::sleep(Duration::from_millis(50));
1931
1932 assert_eq!(mgr.active_count(), 2);
1934 let drained = mgr.drain_messages();
1935 let sent_count = msg_count.load(Ordering::SeqCst);
1936 assert!(sent_count > 0, "subscriptions should have sent messages");
1937 assert!(
1938 drained.len() <= sent_count,
1939 "drained {} but only {} sent",
1940 drained.len(),
1941 sent_count
1942 );
1943
1944 mgr.stop_all();
1946 assert_eq!(mgr.active_count(), 0);
1947
1948 let remaining = mgr.drain_messages();
1950 let total_drained = drained.len() + remaining.len();
1951 let final_sent = msg_count.load(Ordering::SeqCst);
1952 assert!(
1953 total_drained <= final_sent,
1954 "total drained ({total_drained}) must not exceed total sent ({final_sent})"
1955 );
1956 }
1957
1958 #[test]
1962 fn lifecycle_shutdown_signal_ordering() {
1963 use std::sync::atomic::{AtomicU64, Ordering};
1964
1965 let signal_times =
1966 std::sync::Arc::new([AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)]);
1967 let epoch = Instant::now();
1968
1969 struct TimingStopSub {
1970 id: SubId,
1971 index: usize,
1972 signal_times: std::sync::Arc<[AtomicU64; 3]>,
1973 epoch: Instant,
1974 }
1975
1976 impl Subscription<TestMsg> for TimingStopSub {
1977 fn id(&self) -> SubId {
1978 self.id
1979 }
1980
1981 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
1982 while !stop.is_stopped() {
1983 thread::sleep(Duration::from_millis(1));
1984 }
1985 let elapsed_us = self.epoch.elapsed().as_micros() as u64;
1986 self.signal_times[self.index].store(elapsed_us, Ordering::SeqCst);
1987 }
1988 }
1989
1990 let mut mgr = SubscriptionManager::<TestMsg>::new();
1991 mgr.reconcile(vec![
1992 Box::new(TimingStopSub {
1993 id: 500,
1994 index: 0,
1995 signal_times: signal_times.clone(),
1996 epoch,
1997 }),
1998 Box::new(TimingStopSub {
1999 id: 501,
2000 index: 1,
2001 signal_times: signal_times.clone(),
2002 epoch,
2003 }),
2004 Box::new(TimingStopSub {
2005 id: 502,
2006 index: 2,
2007 signal_times: signal_times.clone(),
2008 epoch,
2009 }),
2010 ]);
2011 thread::sleep(Duration::from_millis(20));
2012
2013 mgr.stop_all();
2014
2015 let t0 = signal_times[0].load(Ordering::SeqCst);
2019 let t1 = signal_times[1].load(Ordering::SeqCst);
2020 let t2 = signal_times[2].load(Ordering::SeqCst);
2021
2022 assert!(
2023 t0 > 0 && t1 > 0 && t2 > 0,
2024 "all subs should have recorded stop time"
2025 );
2026
2027 let max_t = t0.max(t1).max(t2);
2028 let min_t = t0.min(t1).min(t2);
2029 let spread_us = max_t - min_t;
2030
2031 assert!(
2032 spread_us < 10_000, "stop signal spread should be < 10ms for parallel signaling, got {spread_us}us \
2034 (t0={t0}, t1={t1}, t2={t2})"
2035 );
2036 }
2037}