1#![forbid(unsafe_code)]
2
3use std::collections::HashSet;
18use std::sync::mpsc;
19use std::thread;
20use std::time::Duration;
21
22pub type SubId = u64;
27
28pub trait Subscription<M: Send + 'static>: Send {
33 fn id(&self) -> SubId;
38
39 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal);
45}
46
47#[derive(Clone)]
52pub struct StopSignal {
53 inner: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
54}
55
56impl StopSignal {
57 pub(crate) fn new() -> (Self, StopTrigger) {
59 let inner = std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
60 let signal = Self {
61 inner: inner.clone(),
62 };
63 let trigger = StopTrigger { inner };
64 (signal, trigger)
65 }
66
67 pub fn is_stopped(&self) -> bool {
69 let (lock, _) = &*self.inner;
70 *lock.lock().unwrap_or_else(|e| e.into_inner())
72 }
73
74 pub fn wait_timeout(&self, duration: Duration) -> bool {
80 let (lock, cvar) = &*self.inner;
81 let mut stopped = lock.lock().unwrap_or_else(|e| e.into_inner());
83 if *stopped {
84 return true;
85 }
86
87 let start = std::time::Instant::now();
88 let mut remaining = duration;
89
90 loop {
91 let (guard, result) = cvar
93 .wait_timeout(stopped, remaining)
94 .unwrap_or_else(|e| e.into_inner());
95 stopped = guard;
96 if *stopped {
97 return true;
98 }
99 if result.timed_out() {
100 return false;
101 }
102 let elapsed = start.elapsed();
104 if elapsed >= duration {
105 return false;
106 }
107 remaining = duration - elapsed;
108 }
109 }
110}
111
112pub(crate) struct StopTrigger {
114 inner: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
115}
116
117impl StopTrigger {
118 pub(crate) fn stop(&self) {
120 let (lock, cvar) = &*self.inner;
121 let mut stopped = lock.lock().unwrap_or_else(|e| e.into_inner());
123 *stopped = true;
124 cvar.notify_all();
125 }
126}
127
128pub(crate) struct RunningSubscription {
130 pub(crate) id: SubId,
131 trigger: StopTrigger,
132 thread: Option<thread::JoinHandle<()>>,
133}
134
135impl RunningSubscription {
136 pub(crate) fn stop(mut self) {
138 self.trigger.stop();
139 if let Some(handle) = self.thread.take() {
140 let _ = handle.join();
142 }
143 }
144}
145
146impl Drop for RunningSubscription {
147 fn drop(&mut self) {
148 self.trigger.stop();
149 }
151}
152
153pub(crate) struct SubscriptionManager<M: Send + 'static> {
155 active: Vec<RunningSubscription>,
156 sender: mpsc::Sender<M>,
157 receiver: mpsc::Receiver<M>,
158}
159
160impl<M: Send + 'static> SubscriptionManager<M> {
161 pub(crate) fn new() -> Self {
162 let (sender, receiver) = mpsc::channel();
163 Self {
164 active: Vec::new(),
165 sender,
166 receiver,
167 }
168 }
169
170 pub(crate) fn reconcile(&mut self, subscriptions: Vec<Box<dyn Subscription<M>>>) {
177 let new_ids: HashSet<SubId> = subscriptions.iter().map(|s| s.id()).collect();
178 let active_count_before = self.active.len();
179
180 crate::debug_trace!(
181 "reconcile: new_ids={:?}, active_before={}",
182 new_ids,
183 active_count_before
184 );
185 tracing::trace!(
186 new_id_count = new_ids.len(),
187 active_before = active_count_before,
188 new_ids = ?new_ids,
189 "subscription reconcile starting"
190 );
191
192 let mut remaining = Vec::new();
194 for running in self.active.drain(..) {
195 if new_ids.contains(&running.id) {
196 remaining.push(running);
197 } else {
198 crate::debug_trace!("stopping subscription: id={}", running.id);
199 tracing::debug!(sub_id = running.id, "Stopping subscription");
200 running.stop();
201 }
202 }
203 self.active = remaining;
204
205 let mut active_ids: HashSet<SubId> = self.active.iter().map(|r| r.id).collect();
207 for sub in subscriptions {
208 let id = sub.id();
209 if !active_ids.insert(id) {
210 continue;
211 }
212
213 crate::debug_trace!("starting subscription: id={}", id);
214 tracing::debug!(sub_id = id, "Starting subscription");
215 let (signal, trigger) = StopSignal::new();
216 let sender = self.sender.clone();
217
218 let thread = thread::spawn(move || {
219 sub.run(sender, signal);
220 });
221
222 self.active.push(RunningSubscription {
223 id,
224 trigger,
225 thread: Some(thread),
226 });
227 }
228
229 let active_count_after = self.active.len();
230 crate::debug_trace!("reconcile complete: active_after={}", active_count_after);
231 tracing::trace!(
232 active_before = active_count_before,
233 active_after = active_count_after,
234 started = active_count_after.saturating_sub(active_count_before),
235 stopped = active_count_before.saturating_sub(active_count_after),
236 "subscription reconcile complete"
237 );
238 }
239
240 pub(crate) fn drain_messages(&self) -> Vec<M> {
242 let mut messages = Vec::new();
243 while let Ok(msg) = self.receiver.try_recv() {
244 messages.push(msg);
245 }
246 messages
247 }
248
249 #[inline]
251 pub(crate) fn active_count(&self) -> usize {
252 self.active.len()
253 }
254
255 pub(crate) fn stop_all(&mut self) {
257 for running in self.active.drain(..) {
258 running.stop();
259 }
260 }
261}
262
263impl<M: Send + 'static> Drop for SubscriptionManager<M> {
264 fn drop(&mut self) {
265 self.stop_all();
266 }
267}
268
269pub struct Every<M: Send + 'static> {
281 id: SubId,
282 interval: Duration,
283 make_msg: Box<dyn Fn() -> M + Send + Sync>,
284}
285
286impl<M: Send + 'static> Every<M> {
287 pub fn new(interval: Duration, make_msg: impl Fn() -> M + Send + Sync + 'static) -> Self {
289 let id = interval.as_nanos() as u64 ^ 0x5449_434B; Self {
292 id,
293 interval,
294 make_msg: Box::new(make_msg),
295 }
296 }
297
298 pub fn with_id(
300 id: SubId,
301 interval: Duration,
302 make_msg: impl Fn() -> M + Send + Sync + 'static,
303 ) -> Self {
304 Self {
305 id,
306 interval,
307 make_msg: Box::new(make_msg),
308 }
309 }
310}
311
312impl<M: Send + 'static> Subscription<M> for Every<M> {
313 fn id(&self) -> SubId {
314 self.id
315 }
316
317 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
318 let mut tick_count: u64 = 0;
319 crate::debug_trace!(
320 "Every subscription started: id={}, interval={:?}",
321 self.id,
322 self.interval
323 );
324 loop {
325 if stop.wait_timeout(self.interval) {
326 crate::debug_trace!(
327 "Every subscription stopped: id={}, sent {} ticks",
328 self.id,
329 tick_count
330 );
331 break;
332 }
333 tick_count += 1;
334 let msg = (self.make_msg)();
335 if sender.send(msg).is_err() {
336 crate::debug_trace!(
337 "Every subscription channel closed: id={}, sent {} ticks",
338 self.id,
339 tick_count
340 );
341 break;
342 }
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[derive(Debug, Clone, PartialEq)]
352 enum TestMsg {
353 Tick,
354 Value(i32),
355 }
356
357 struct ChannelSubscription<M: Send + 'static> {
358 id: SubId,
359 receiver: mpsc::Receiver<M>,
360 poll: Duration,
361 }
362
363 impl<M: Send + 'static> ChannelSubscription<M> {
364 fn new(id: SubId, receiver: mpsc::Receiver<M>) -> Self {
365 Self {
366 id,
367 receiver,
368 poll: Duration::from_millis(5),
369 }
370 }
371 }
372
373 impl<M: Send + 'static> Subscription<M> for ChannelSubscription<M> {
374 fn id(&self) -> SubId {
375 self.id
376 }
377
378 fn run(&self, sender: mpsc::Sender<M>, stop: StopSignal) {
379 loop {
380 if stop.is_stopped() {
381 break;
382 }
383 match self.receiver.recv_timeout(self.poll) {
384 Ok(msg) => {
385 if sender.send(msg).is_err() {
386 break;
387 }
388 }
389 Err(mpsc::RecvTimeoutError::Timeout) => {}
390 Err(mpsc::RecvTimeoutError::Disconnected) => break,
391 }
392 }
393 }
394 }
395
396 fn channel_subscription(id: SubId) -> (ChannelSubscription<TestMsg>, mpsc::Sender<TestMsg>) {
397 let (tx, rx) = mpsc::channel();
398 (ChannelSubscription::new(id, rx), tx)
399 }
400
401 #[test]
402 fn stop_signal_starts_false() {
403 let (signal, _trigger) = StopSignal::new();
404 assert!(!signal.is_stopped());
405 }
406
407 #[test]
408 fn stop_signal_becomes_true_after_trigger() {
409 let (signal, trigger) = StopSignal::new();
410 trigger.stop();
411 assert!(signal.is_stopped());
412 }
413
414 #[test]
415 fn stop_signal_wait_returns_true_when_stopped() {
416 let (signal, trigger) = StopSignal::new();
417 trigger.stop();
418 assert!(signal.wait_timeout(Duration::from_millis(100)));
419 }
420
421 #[test]
422 fn stop_signal_wait_returns_false_on_timeout() {
423 let (signal, _trigger) = StopSignal::new();
424 assert!(!signal.wait_timeout(Duration::from_millis(10)));
425 }
426
427 #[test]
428 fn channel_subscription_forwards_messages() {
429 let (sub, event_tx) = channel_subscription(1);
430 let (tx, rx) = mpsc::channel();
431 let (signal, trigger) = StopSignal::new();
432
433 let handle = thread::spawn(move || {
434 sub.run(tx, signal);
435 });
436
437 event_tx.send(TestMsg::Value(1)).unwrap();
438 event_tx.send(TestMsg::Value(2)).unwrap();
439 thread::sleep(Duration::from_millis(10));
440 trigger.stop();
441 handle.join().unwrap();
442
443 let msgs: Vec<_> = rx.try_iter().collect();
444 assert_eq!(msgs, vec![TestMsg::Value(1), TestMsg::Value(2)]);
445 }
446
447 #[test]
448 fn every_subscription_fires() {
449 let sub = Every::new(Duration::from_millis(10), || TestMsg::Tick);
450 let (tx, rx) = mpsc::channel();
451 let (signal, trigger) = StopSignal::new();
452
453 let handle = thread::spawn(move || {
454 sub.run(tx, signal);
455 });
456
457 thread::sleep(Duration::from_millis(50));
459 trigger.stop();
460 handle.join().unwrap();
461
462 let msgs: Vec<_> = rx.try_iter().collect();
463 assert!(!msgs.is_empty(), "Should have received at least one tick");
464 assert!(msgs.iter().all(|m| *m == TestMsg::Tick));
465 }
466
467 #[test]
468 fn every_subscription_uses_stable_id() {
469 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
470 let sub2 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
471 assert_eq!(sub1.id(), sub2.id());
472 }
473
474 #[test]
475 fn every_subscription_different_intervals_different_ids() {
476 let sub1 = Every::<TestMsg>::new(Duration::from_secs(1), || TestMsg::Tick);
477 let sub2 = Every::<TestMsg>::new(Duration::from_secs(2), || TestMsg::Tick);
478 assert_ne!(sub1.id(), sub2.id());
479 }
480
481 #[test]
482 fn subscription_manager_starts_subscriptions() {
483 let mut mgr = SubscriptionManager::<TestMsg>::new();
484 let (sub, event_tx) = channel_subscription(1);
485 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
486
487 mgr.reconcile(subs);
488 event_tx.send(TestMsg::Value(42)).unwrap();
489
490 thread::sleep(Duration::from_millis(20));
492
493 let msgs = mgr.drain_messages();
494 assert_eq!(msgs, vec![TestMsg::Value(42)]);
495 }
496
497 #[test]
498 fn subscription_manager_dedupes_duplicate_ids() {
499 let mut mgr = SubscriptionManager::<TestMsg>::new();
500 let (sub_a, tx_a) = channel_subscription(7);
501 let (sub_b, tx_b) = channel_subscription(7);
502 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub_a), Box::new(sub_b)];
503
504 mgr.reconcile(subs);
505
506 tx_a.send(TestMsg::Value(1)).unwrap();
507 assert!(
508 tx_b.send(TestMsg::Value(2)).is_err(),
509 "Duplicate subscription should be dropped"
510 );
511
512 thread::sleep(Duration::from_millis(20));
513 let msgs = mgr.drain_messages();
514 assert_eq!(msgs, vec![TestMsg::Value(1)]);
515 }
516
517 #[test]
518 fn subscription_manager_stops_removed() {
519 let mut mgr = SubscriptionManager::<TestMsg>::new();
520
521 mgr.reconcile(vec![Box::new(Every::with_id(
523 99,
524 Duration::from_millis(5),
525 || TestMsg::Tick,
526 ))]);
527
528 thread::sleep(Duration::from_millis(20));
529 let msgs_before = mgr.drain_messages();
530 assert!(!msgs_before.is_empty());
531
532 mgr.reconcile(vec![]);
534
535 thread::sleep(Duration::from_millis(20));
537 let _ = mgr.drain_messages();
538
539 thread::sleep(Duration::from_millis(30));
541 let msgs_after = mgr.drain_messages();
542 assert!(
543 msgs_after.is_empty(),
544 "Should stop receiving after reconcile with empty set"
545 );
546 }
547
548 #[test]
549 fn subscription_manager_keeps_unchanged() {
550 let mut mgr = SubscriptionManager::<TestMsg>::new();
551
552 mgr.reconcile(vec![Box::new(Every::with_id(
554 50,
555 Duration::from_millis(10),
556 || TestMsg::Tick,
557 ))]);
558
559 thread::sleep(Duration::from_millis(30));
560 let _ = mgr.drain_messages();
561
562 mgr.reconcile(vec![Box::new(Every::with_id(
564 50,
565 Duration::from_millis(10),
566 || TestMsg::Tick,
567 ))]);
568
569 thread::sleep(Duration::from_millis(30));
570 let msgs = mgr.drain_messages();
571 assert!(!msgs.is_empty(), "Subscription should still be running");
572 }
573
574 #[test]
575 fn subscription_manager_stop_all() {
576 let mut mgr = SubscriptionManager::<TestMsg>::new();
577
578 mgr.reconcile(vec![
579 Box::new(Every::with_id(1, Duration::from_millis(5), || {
580 TestMsg::Value(1)
581 })),
582 Box::new(Every::with_id(2, Duration::from_millis(5), || {
583 TestMsg::Value(2)
584 })),
585 ]);
586
587 thread::sleep(Duration::from_millis(20));
588 mgr.stop_all();
589
590 thread::sleep(Duration::from_millis(20));
591 let _ = mgr.drain_messages();
592 thread::sleep(Duration::from_millis(30));
593 let msgs = mgr.drain_messages();
594 assert!(msgs.is_empty());
595 }
596
597 #[test]
602 fn stop_signal_is_cloneable() {
603 let (signal, trigger) = StopSignal::new();
604 let signal_clone = signal.clone();
605
606 assert!(!signal.is_stopped());
607 assert!(!signal_clone.is_stopped());
608
609 trigger.stop();
610
611 assert!(signal.is_stopped());
612 assert!(signal_clone.is_stopped());
613 }
614
615 #[test]
616 fn stop_signal_wait_wakes_immediately_when_already_stopped() {
617 let (signal, trigger) = StopSignal::new();
618 trigger.stop();
619
620 let start = std::time::Instant::now();
622 let stopped = signal.wait_timeout(Duration::from_secs(10));
623 let elapsed = start.elapsed();
624
625 assert!(stopped);
626 assert!(elapsed < Duration::from_millis(100));
627 }
628
629 #[test]
630 fn stop_signal_wait_is_interrupted_by_trigger() {
631 let (signal, trigger) = StopSignal::new();
632
633 let signal_clone = signal.clone();
634 let handle = thread::spawn(move || signal_clone.wait_timeout(Duration::from_secs(10)));
635
636 thread::sleep(Duration::from_millis(20));
638 trigger.stop();
639
640 let stopped = handle.join().unwrap();
641 assert!(stopped);
642 }
643
644 #[test]
645 fn channel_subscription_no_messages_without_events() {
646 let (sub, _event_tx) = channel_subscription(1);
647 let (tx, rx) = mpsc::channel();
648 let (signal, trigger) = StopSignal::new();
649
650 let handle = thread::spawn(move || {
651 sub.run(tx, signal);
652 });
653
654 thread::sleep(Duration::from_millis(10));
655 trigger.stop();
656 handle.join().unwrap();
657
658 let msgs: Vec<_> = rx.try_iter().collect();
659 assert!(msgs.is_empty());
660 }
661
662 #[test]
663 fn channel_subscription_id_is_preserved() {
664 let (sub, _tx) = channel_subscription(42);
665 assert_eq!(sub.id(), 42);
666 }
667
668 #[test]
669 fn channel_subscription_stops_on_disconnected_receiver() {
670 let (sub, event_tx) = channel_subscription(1);
671 let (tx, _rx) = mpsc::channel();
672 let (signal, _trigger) = StopSignal::new();
673
674 drop(event_tx);
675
676 let handle = thread::spawn(move || {
677 sub.run(tx, signal);
678 });
679
680 let result = handle.join();
681 assert!(result.is_ok());
682 }
683
684 #[test]
685 fn every_with_id_preserves_custom_id() {
686 let sub = Every::<TestMsg>::with_id(12345, Duration::from_secs(1), || TestMsg::Tick);
687 assert_eq!(sub.id(), 12345);
688 }
689
690 #[test]
691 fn every_stops_on_disconnected_receiver() {
692 let sub = Every::new(Duration::from_millis(5), || TestMsg::Tick);
693 let (tx, rx) = mpsc::channel();
694 let (signal, _trigger) = StopSignal::new();
695
696 drop(rx);
698
699 let handle = thread::spawn(move || {
701 sub.run(tx, signal);
702 });
703
704 let result = handle.join();
706 assert!(result.is_ok());
707 }
708
709 #[test]
710 fn every_respects_interval() {
711 let sub = Every::with_id(1, Duration::from_millis(50), || TestMsg::Tick);
712 let (tx, rx) = mpsc::channel();
713 let (signal, trigger) = StopSignal::new();
714
715 let start = std::time::Instant::now();
716 let handle = thread::spawn(move || {
717 sub.run(tx, signal);
718 });
719
720 thread::sleep(Duration::from_millis(160));
722 trigger.stop();
723 handle.join().unwrap();
724
725 let msgs: Vec<_> = rx.try_iter().collect();
726 let elapsed = start.elapsed();
727
728 assert!(
730 msgs.len() >= 2,
731 "Expected at least 2 ticks, got {}",
732 msgs.len()
733 );
734 assert!(
735 msgs.len() <= 4,
736 "Expected at most 4 ticks, got {}",
737 msgs.len()
738 );
739 assert!(elapsed >= Duration::from_millis(150));
740 }
741
742 #[test]
743 fn subscription_manager_empty_reconcile() {
744 let mut mgr = SubscriptionManager::<TestMsg>::new();
745
746 mgr.reconcile(vec![]);
748 let msgs = mgr.drain_messages();
749 assert!(msgs.is_empty());
750 }
751
752 #[test]
753 fn subscription_manager_drain_messages_returns_all() {
754 let mut mgr = SubscriptionManager::<TestMsg>::new();
755 let (sub, event_tx) = channel_subscription(1);
756 let subs: Vec<Box<dyn Subscription<TestMsg>>> = vec![Box::new(sub)];
757
758 mgr.reconcile(subs);
759 event_tx.send(TestMsg::Value(1)).unwrap();
760 event_tx.send(TestMsg::Value(2)).unwrap();
761 thread::sleep(Duration::from_millis(20));
762
763 let msgs = mgr.drain_messages();
764 assert_eq!(msgs.len(), 2);
765 assert_eq!(msgs[0], TestMsg::Value(1));
766 assert_eq!(msgs[1], TestMsg::Value(2));
767
768 let msgs2 = mgr.drain_messages();
770 assert!(msgs2.is_empty());
771 }
772
773 #[test]
774 fn subscription_manager_replaces_subscription_with_different_id() {
775 let mut mgr = SubscriptionManager::<TestMsg>::new();
776 let (sub1, tx1) = channel_subscription(1);
777
778 mgr.reconcile(vec![Box::new(sub1)]);
780 tx1.send(TestMsg::Value(1)).unwrap();
781 thread::sleep(Duration::from_millis(20));
782 let msgs1 = mgr.drain_messages();
783 assert_eq!(msgs1, vec![TestMsg::Value(1)]);
784
785 let (sub2, tx2) = channel_subscription(2);
787 mgr.reconcile(vec![Box::new(sub2)]);
788 tx2.send(TestMsg::Value(2)).unwrap();
789 thread::sleep(Duration::from_millis(20));
790 let msgs2 = mgr.drain_messages();
791 assert_eq!(msgs2, vec![TestMsg::Value(2)]);
792 }
793
794 #[test]
795 fn subscription_manager_multiple_subscriptions() {
796 let mut mgr = SubscriptionManager::<TestMsg>::new();
797 let (sub1, tx1) = channel_subscription(1);
798 let (sub2, tx2) = channel_subscription(2);
799 let (sub3, tx3) = channel_subscription(3);
800 let subs: Vec<Box<dyn Subscription<TestMsg>>> =
801 vec![Box::new(sub1), Box::new(sub2), Box::new(sub3)];
802
803 mgr.reconcile(subs);
804 tx1.send(TestMsg::Value(10)).unwrap();
805 tx2.send(TestMsg::Value(20)).unwrap();
806 tx3.send(TestMsg::Value(30)).unwrap();
807 thread::sleep(Duration::from_millis(30));
808
809 let mut msgs = mgr.drain_messages();
810 msgs.sort_by_key(|m| match m {
811 TestMsg::Value(v) => *v,
812 _ => 0,
813 });
814
815 assert_eq!(msgs.len(), 3);
816 assert_eq!(msgs[0], TestMsg::Value(10));
817 assert_eq!(msgs[1], TestMsg::Value(20));
818 assert_eq!(msgs[2], TestMsg::Value(30));
819 }
820
821 #[test]
822 fn subscription_manager_partial_update() {
823 let mut mgr = SubscriptionManager::<TestMsg>::new();
824
825 mgr.reconcile(vec![
827 Box::new(Every::with_id(1, Duration::from_millis(10), || {
828 TestMsg::Value(1)
829 })),
830 Box::new(Every::with_id(2, Duration::from_millis(10), || {
831 TestMsg::Value(2)
832 })),
833 Box::new(Every::with_id(3, Duration::from_millis(10), || {
834 TestMsg::Value(3)
835 })),
836 ]);
837
838 thread::sleep(Duration::from_millis(30));
839 let _ = mgr.drain_messages();
840
841 mgr.reconcile(vec![
843 Box::new(Every::with_id(1, Duration::from_millis(10), || {
844 TestMsg::Value(1)
845 })),
846 Box::new(Every::with_id(3, Duration::from_millis(10), || {
847 TestMsg::Value(3)
848 })),
849 ]);
850
851 let _ = mgr.drain_messages();
854
855 thread::sleep(Duration::from_millis(30));
857 let msgs = mgr.drain_messages();
858
859 let values: Vec<i32> = msgs
861 .iter()
862 .filter_map(|m| match m {
863 TestMsg::Value(v) => Some(*v),
864 _ => None,
865 })
866 .collect();
867
868 assert!(
869 values.contains(&1),
870 "Should still receive from subscription 1"
871 );
872 assert!(
873 values.contains(&3),
874 "Should still receive from subscription 3"
875 );
876 assert!(
877 !values.contains(&2),
878 "Should not receive from stopped subscription 2"
879 );
880 }
881
882 #[test]
883 fn subscription_manager_drop_stops_all() {
884 let (_signal, _) = StopSignal::new();
885 let flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
886 let flag_clone = flag.clone();
887
888 struct FlagSubscription {
889 id: SubId,
890 flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
891 }
892
893 impl Subscription<TestMsg> for FlagSubscription {
894 fn id(&self) -> SubId {
895 self.id
896 }
897
898 fn run(&self, _sender: mpsc::Sender<TestMsg>, stop: StopSignal) {
899 while !stop.is_stopped() {
900 thread::sleep(Duration::from_millis(5));
901 }
902 self.flag.store(true, std::sync::atomic::Ordering::SeqCst);
903 }
904 }
905
906 {
907 let mut mgr = SubscriptionManager::<TestMsg>::new();
908 mgr.reconcile(vec![Box::new(FlagSubscription {
909 id: 1,
910 flag: flag_clone,
911 })]);
912
913 thread::sleep(Duration::from_millis(20));
914 }
916
917 thread::sleep(Duration::from_millis(50));
918 assert!(
919 flag.load(std::sync::atomic::Ordering::SeqCst),
920 "Subscription should have stopped on drop"
921 );
922 }
923
924 #[test]
925 fn running_subscription_stop_joins_thread() {
926 use std::sync::atomic::{AtomicBool, Ordering};
927
928 let completed = std::sync::Arc::new(AtomicBool::new(false));
929 let completed_clone = completed.clone();
930
931 let (signal, trigger) = StopSignal::new();
932 let (_tx, _rx) = mpsc::channel::<TestMsg>();
933
934 let thread = thread::spawn(move || {
935 while !signal.is_stopped() {
936 thread::sleep(Duration::from_millis(5));
937 }
938 completed_clone.store(true, Ordering::SeqCst);
939 });
940
941 let running = RunningSubscription {
942 id: 1,
943 trigger,
944 thread: Some(thread),
945 };
946
947 running.stop();
948 assert!(completed.load(Ordering::SeqCst));
949 }
950
951 #[test]
952 fn every_id_stable_across_instances() {
953 let sub1 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
955 let sub2 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Tick);
956 let sub3 = Every::<TestMsg>::new(Duration::from_millis(100), || TestMsg::Value(1));
957
958 assert_eq!(sub1.id(), sub2.id());
959 assert_eq!(sub2.id(), sub3.id()); }
961
962 #[test]
963 fn drain_messages_preserves_order() {
964 let mut mgr = SubscriptionManager::<TestMsg>::new();
965
966 struct OrderedSubscription {
968 values: Vec<i32>,
969 }
970
971 impl Subscription<TestMsg> for OrderedSubscription {
972 fn id(&self) -> SubId {
973 999
974 }
975
976 fn run(&self, sender: mpsc::Sender<TestMsg>, _stop: StopSignal) {
977 for v in &self.values {
978 let _ = sender.send(TestMsg::Value(*v));
979 thread::sleep(Duration::from_millis(1));
980 }
981 }
982 }
983
984 mgr.reconcile(vec![Box::new(OrderedSubscription {
985 values: vec![1, 2, 3, 4, 5],
986 })]);
987
988 thread::sleep(Duration::from_millis(30));
989 let msgs = mgr.drain_messages();
990
991 let values: Vec<i32> = msgs
992 .iter()
993 .filter_map(|m| match m {
994 TestMsg::Value(v) => Some(*v),
995 _ => None,
996 })
997 .collect();
998
999 assert_eq!(values, vec![1, 2, 3, 4, 5]);
1000 }
1001
1002 #[test]
1003 fn subscription_manager_new_is_empty() {
1004 let mgr = SubscriptionManager::<TestMsg>::new();
1005 let msgs = mgr.drain_messages();
1006 assert!(msgs.is_empty());
1007 }
1008}