1#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80 collections::VecDeque,
81 future::Future,
82 pin::Pin,
83 sync::{Arc, RwLockReadGuard, Weak},
84 task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92#[derive(Debug, Default)]
97pub struct Watchable<T> {
98 shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102 fn clone(&self) -> Self {
103 Self {
104 shared: self.shared.clone(),
105 }
106 }
107}
108
109pub trait Nullable<T> {
111 fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116 fn into_option(self) -> Option<T> {
117 self
118 }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122 fn into_option(mut self) -> Option<T> {
123 self.pop()
124 }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128 pub fn new(value: T) -> Self {
130 Self {
131 shared: Arc::new(Shared {
132 state: RwLock::new(State {
133 value,
134 epoch: INITIAL_EPOCH,
135 }),
136 wakers: Default::default(),
137 }),
138 }
139 }
140
141 pub fn set(&self, value: T) -> Result<T, T> {
148 let mut state = self.shared.state.write().expect("poisoned");
152
153 let changed = state.value != value;
155
156 let ret = if changed {
157 let old = std::mem::replace(&mut state.value, value);
158 state.epoch += 1;
159 Ok(old)
160 } else {
161 Err(value)
162 };
163 drop(state); if changed {
167 for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
168 watcher.wake();
169 }
170 }
171 ret
172 }
173
174 pub fn watch(&self) -> Direct<T> {
176 Direct {
177 state: self.shared.state().clone(),
178 shared: Some(Arc::downgrade(&self.shared)),
179 }
180 }
181
182 pub fn get(&self) -> T {
184 self.shared.get()
185 }
186
187 pub fn has_watchers(&self) -> bool {
190 Arc::weak_count(&self.shared) != 0
193 }
194}
195
196impl<T> Drop for Shared<T> {
197 fn drop(&mut self) {
198 let Ok(mut watchers) = self.wakers.lock() else {
199 return; };
201 for watcher in watchers.drain(..) {
206 watcher.wake();
207 }
208 }
209}
210
211pub trait Watcher: Clone {
231 type Value: Clone + Eq;
240
241 fn get(&mut self) -> Self::Value {
256 self.update();
257 self.peek().clone()
258 }
259
260 fn update(&mut self) -> bool;
266
267 fn peek(&self) -> &Self::Value;
276
277 fn is_connected(&self) -> bool;
281
282 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
285
286 fn updated(&mut self) -> NextFut<'_, Self> {
293 NextFut { watcher: self }
294 }
295
296 fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
307 where
308 W: Nullable<T> + Clone,
309 Self: Watcher<Value = W>,
310 {
311 InitializedFut {
312 initial: self.get().into_option(),
313 watcher: self,
314 }
315 }
316
317 fn stream(mut self) -> Stream<Self>
331 where
332 Self: Unpin,
333 {
334 Stream {
335 initial: Some(self.get()),
336 watcher: self,
337 }
338 }
339
340 fn stream_updates_only(self) -> Stream<Self>
355 where
356 Self: Unpin,
357 {
358 Stream {
359 initial: None,
360 watcher: self,
361 }
362 }
363
364 fn map<T: Clone + Eq>(
369 mut self,
370 map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
371 ) -> Map<Self, T> {
372 Map {
373 current: (map)(self.get()),
374 map: Arc::new(map),
375 watcher: self,
376 }
377 }
378
379 fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
382 Tuple::new(self, other)
383 }
384}
385
386#[derive(Debug, Clone)]
390pub struct Direct<T> {
391 state: State<T>,
392 shared: Option<Weak<Shared<T>>>,
397}
398
399impl<T: Clone + Eq> Watcher for Direct<T> {
400 type Value = T;
401
402 fn update(&mut self) -> bool {
403 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
404 self.shared = None; return false;
406 };
407 let state = shared.state();
408 if state.epoch > self.state.epoch {
409 self.state = state.clone();
410 true
411 } else {
412 false
413 }
414 }
415
416 fn peek(&self) -> &Self::Value {
417 &self.state.value
418 }
419
420 fn is_connected(&self) -> bool {
421 self.shared
422 .as_ref()
423 .and_then(|weak| weak.upgrade())
424 .is_some()
425 }
426
427 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
428 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
429 self.shared = None; return Poll::Ready(Err(Disconnected));
431 };
432 self.state = ready!(shared.poll_updated(cx, self.state.epoch));
433 Poll::Ready(Ok(()))
434 }
435}
436
437#[derive(Debug, Clone)]
438pub struct Tuple<S: Watcher, T: Watcher> {
439 inner: (S, T),
440 current: (S::Value, T::Value),
441}
442
443impl<S: Watcher, T: Watcher> Tuple<S, T> {
444 pub fn new(mut s: S, mut t: T) -> Self {
445 let current = (s.get(), t.get());
446 Self {
447 inner: (s, t),
448 current,
449 }
450 }
451}
452
453impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
454 type Value = (S::Value, T::Value);
455
456 fn update(&mut self) -> bool {
457 let s_updated = self.inner.0.update();
459 let t_updated = self.inner.1.update();
460 let updated = s_updated || t_updated;
461 if updated {
462 self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
463 }
464 updated
465 }
466
467 fn peek(&self) -> &Self::Value {
468 &self.current
469 }
470
471 fn is_connected(&self) -> bool {
472 self.inner.0.is_connected() && self.inner.1.is_connected()
473 }
474
475 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
476 let poll_0 = self.inner.0.poll_updated(cx)?;
477 let poll_1 = self.inner.1.poll_updated(cx)?;
478 if poll_0.is_pending() && poll_1.is_pending() {
479 return Poll::Pending;
480 }
481 if poll_0.is_ready() {
482 self.current.0 = self.inner.0.peek().clone();
483 }
484 if poll_1.is_ready() {
485 self.current.1 = self.inner.1.peek().clone();
486 }
487 Poll::Ready(Ok(()))
488 }
489}
490
491#[derive(Debug, Clone)]
492pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
493 inner: (S, T, U),
494 current: (S::Value, T::Value, U::Value),
495}
496
497impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
498 pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
499 let current = (s.get(), t.get(), u.get());
500 Self {
501 inner: (s, t, u),
502 current,
503 }
504 }
505}
506
507impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
508 type Value = (S::Value, T::Value, U::Value);
509
510 fn update(&mut self) -> bool {
511 let s_updated = self.inner.0.update();
513 let t_updated = self.inner.1.update();
514 let u_updated = self.inner.2.update();
515 let updated = s_updated || t_updated || u_updated;
516 if updated {
517 self.current = (
518 self.inner.0.peek().clone(),
519 self.inner.1.peek().clone(),
520 self.inner.2.peek().clone(),
521 );
522 }
523 updated
524 }
525
526 fn peek(&self) -> &Self::Value {
527 &self.current
528 }
529
530 fn is_connected(&self) -> bool {
531 self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
532 }
533
534 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
535 let poll_0 = self.inner.0.poll_updated(cx)?;
536 let poll_1 = self.inner.1.poll_updated(cx)?;
537 let poll_2 = self.inner.2.poll_updated(cx)?;
538
539 if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
540 return Poll::Pending;
541 }
542 if poll_0.is_ready() {
543 self.current.0 = self.inner.0.peek().clone();
544 }
545 if poll_1.is_ready() {
546 self.current.1 = self.inner.1.peek().clone();
547 }
548 if poll_2.is_ready() {
549 self.current.2 = self.inner.2.peek().clone();
550 }
551 Poll::Ready(Ok(()))
552 }
553}
554
555#[derive(Debug, Clone)]
557pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
558 watchers: Vec<W>,
560 current: Vec<T>,
561}
562
563impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
564 pub fn new(watchers: impl Iterator<Item = W>) -> Self {
566 let mut watchers: Vec<W> = watchers.into_iter().collect();
567
568 let mut current = Vec::with_capacity(watchers.len());
569 for watcher in &mut watchers {
570 current.push(watcher.get());
571 }
572 Self { watchers, current }
573 }
574}
575
576impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
577 type Value = Vec<T>;
578
579 fn update(&mut self) -> bool {
580 let mut any_updated = false;
581 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
582 if watcher.update() {
583 any_updated = true;
584 *value = watcher.peek().clone();
585 }
586 }
587 any_updated
588 }
589
590 fn peek(&self) -> &Self::Value {
591 &self.current
592 }
593
594 fn is_connected(&self) -> bool {
595 self.watchers.iter().all(|w| w.is_connected())
596 }
597
598 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
599 let mut any_updated = false;
600 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
601 if watcher.poll_updated(cx)?.is_ready() {
602 any_updated = true;
603 *value = watcher.peek().clone();
604 }
605 }
606
607 if any_updated {
608 Poll::Ready(Ok(()))
609 } else {
610 Poll::Pending
611 }
612 }
613}
614
615#[derive(derive_more::Debug, Clone)]
619pub struct Map<W: Watcher, T: Clone + Eq> {
620 #[debug("Arc<dyn Fn(W::Value) -> T>")]
621 map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
622 watcher: W,
623 current: T,
624}
625
626impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
627 type Value = T;
628
629 fn update(&mut self) -> bool {
630 if self.watcher.update() {
631 let new = (self.map)(self.watcher.peek().clone());
632 if new != self.current {
633 self.current = new;
634 true
635 } else {
636 false
637 }
638 } else {
639 false
640 }
641 }
642
643 fn peek(&self) -> &Self::Value {
644 &self.current
645 }
646
647 fn is_connected(&self) -> bool {
648 self.watcher.is_connected()
649 }
650
651 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
652 loop {
653 ready!(self.watcher.poll_updated(cx)?);
654 let new = (self.map)(self.watcher.peek().clone());
655 if new != self.current {
656 self.current = new;
657 return Poll::Ready(Ok(()));
658 }
659 }
660 }
661}
662
663#[derive(Debug)]
671pub struct NextFut<'a, W: Watcher> {
672 watcher: &'a mut W,
673}
674
675impl<W: Watcher> Future for NextFut<'_, W> {
676 type Output = Result<W::Value, Disconnected>;
677
678 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
679 ready!(self.watcher.poll_updated(cx))?;
680 Poll::Ready(Ok(self.watcher.peek().clone()))
681 }
682}
683
684#[derive(Debug)]
693pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
694 initial: Option<T>,
695 watcher: &'a mut W,
696}
697
698impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
699 for InitializedFut<'_, T, V, W>
700{
701 type Output = T;
702
703 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
704 let mut this = self.as_mut();
705 if let Some(value) = this.initial.take() {
706 return Poll::Ready(value);
707 }
708 loop {
709 if ready!(this.watcher.poll_updated(cx)).is_err() {
710 return Poll::Pending;
712 };
713 let value = this.watcher.peek();
714 if let Some(value) = value.clone().into_option() {
715 return Poll::Ready(value);
716 }
717 }
718 }
719}
720
721#[derive(Debug, Clone)]
729pub struct Stream<W: Watcher + Unpin> {
730 initial: Option<W::Value>,
731 watcher: W,
732}
733
734impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
735where
736 W::Value: Unpin,
737{
738 type Item = W::Value;
739
740 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
741 if let Some(value) = self.as_mut().initial.take() {
742 return Poll::Ready(Some(value));
743 }
744 match self.as_mut().watcher.poll_updated(cx) {
745 Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
746 Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
747 Poll::Pending => Poll::Pending,
748 }
749 }
750}
751
752#[derive(StackError)]
755#[error("Watcher lost connection to underlying Watchable, it was dropped")]
756pub struct Disconnected;
757
758const INITIAL_EPOCH: u64 = 1;
761
762#[derive(Debug, Default)]
764struct Shared<T> {
765 state: RwLock<State<T>>,
767 wakers: Mutex<VecDeque<Waker>>,
768}
769
770#[derive(Debug, Clone)]
771struct State<T> {
772 value: T,
773 epoch: u64,
774}
775
776impl<T: Default> Default for State<T> {
777 fn default() -> Self {
778 Self {
779 value: Default::default(),
780 epoch: INITIAL_EPOCH,
781 }
782 }
783}
784
785impl<T: Clone> Shared<T> {
786 fn get(&self) -> T {
787 self.state.read().expect("poisoned").value.clone()
788 }
789
790 fn state(&self) -> RwLockReadGuard<'_, State<T>> {
791 self.state.read().expect("poisoned")
792 }
793
794 fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
795 {
796 let state = self.state();
797
798 if last_epoch < state.epoch {
801 return Poll::Ready(state.clone());
802 }
803 }
804
805 self.add_waker(cx);
806
807 #[cfg(watcher_loom)]
808 loom::thread::yield_now();
809
810 {
812 let state = self.state();
813
814 if last_epoch < state.epoch {
815 return Poll::Ready(state.clone());
816 }
817 }
818
819 Poll::Pending
820 }
821
822 fn add_waker(&self, cx: &mut task::Context<'_>) {
823 let mut wakers = self.wakers.lock().expect("poisoned");
824 for waker in wakers.iter() {
825 if waker.will_wake(cx.waker()) {
826 return;
827 }
828 }
829 wakers.push_back(cx.waker().clone());
830 }
831}
832
833#[cfg(test)]
834mod tests {
835
836 use n0_future::{future::poll_once, StreamExt};
837 use rand::{rng, Rng};
838 use tokio::{
839 task::JoinSet,
840 time::{Duration, Instant},
841 };
842 use tokio_util::sync::CancellationToken;
843
844 use super::*;
845
846 #[tokio::test]
847 async fn test_watcher() {
848 let cancel = CancellationToken::new();
849 let watchable = Watchable::new(17);
850
851 assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
852
853 let start = Instant::now();
854 let mut tasks = JoinSet::new();
856 for i in 0..3 {
857 let mut watch = watchable.watch().stream();
858 let cancel = cancel.clone();
859 tasks.spawn(async move {
860 println!("[{i}] spawn");
861 let mut expected_value = 17;
862 loop {
863 tokio::select! {
864 biased;
865 Some(value) = &mut watch.next() => {
866 println!("{:?} [{i}] update: {value}", start.elapsed());
867 assert_eq!(value, expected_value);
868 if expected_value == 17 {
869 expected_value = 0;
870 } else {
871 expected_value += 1;
872 }
873 },
874 _ = cancel.cancelled() => {
875 println!("{:?} [{i}] cancel", start.elapsed());
876 assert_eq!(expected_value, 10);
877 break;
878 }
879 }
880 }
881 });
882 }
883 for i in 0..3 {
884 let mut watch = watchable.watch().stream_updates_only();
885 let cancel = cancel.clone();
886 tasks.spawn(async move {
887 println!("[{i}] spawn");
888 let mut expected_value = 0;
889 loop {
890 tokio::select! {
891 biased;
892 Some(value) = watch.next() => {
893 println!("{:?} [{i}] stream update: {value}", start.elapsed());
894 assert_eq!(value, expected_value);
895 expected_value += 1;
896 },
897 _ = cancel.cancelled() => {
898 println!("{:?} [{i}] cancel", start.elapsed());
899 assert_eq!(expected_value, 10);
900 break;
901 }
902 else => {
903 panic!("stream died");
904 }
905 }
906 }
907 });
908 }
909
910 for next_value in 0..10 {
912 let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
913 println!("{:?} sleep {sleep:?}", start.elapsed());
914 tokio::time::sleep(sleep).await;
915
916 let changed = watchable.set(next_value);
917 println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
918 }
919
920 println!("cancel");
921 cancel.cancel();
922 while let Some(res) = tasks.join_next().await {
923 res.expect("task failed");
924 }
925 }
926
927 #[test]
928 fn test_get() {
929 let watchable = Watchable::new(None);
930 assert!(watchable.get().is_none());
931
932 watchable.set(Some(1u8)).ok();
933 assert_eq!(watchable.get(), Some(1u8));
934 }
935
936 #[tokio::test]
937 async fn test_initialize() {
938 let watchable = Watchable::new(None);
939
940 let mut watcher = watchable.watch();
941 let mut initialized = watcher.initialized();
942
943 let poll = poll_once(&mut initialized).await;
944 assert!(poll.is_none());
945
946 watchable.set(Some(1u8)).ok();
947
948 let poll = poll_once(&mut initialized).await;
949 assert_eq!(poll.unwrap(), 1u8);
950 }
951
952 #[tokio::test]
953 async fn test_initialize_already_init() {
954 let watchable = Watchable::new(Some(1u8));
955
956 let mut watcher = watchable.watch();
957 let mut initialized = watcher.initialized();
958
959 let poll = poll_once(&mut initialized).await;
960 assert_eq!(poll.unwrap(), 1u8);
961 }
962
963 #[test]
964 fn test_initialized_always_resolves() {
965 #[cfg(not(watcher_loom))]
966 use std::thread;
967
968 #[cfg(watcher_loom)]
969 use loom::thread;
970
971 let test_case = || {
972 let watchable = Watchable::<Option<u8>>::new(None);
973
974 let mut watch = watchable.watch();
975 let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
976
977 watchable.set(Some(42)).ok();
978
979 thread::yield_now();
980
981 let value: u8 = thread.join().unwrap();
982
983 assert_eq!(value, 42);
984 };
985
986 #[cfg(watcher_loom)]
987 loom::model(test_case);
988 #[cfg(not(watcher_loom))]
989 test_case();
990 }
991
992 #[tokio::test(flavor = "multi_thread")]
993 async fn test_update_cancel_safety() {
994 let watchable = Watchable::new(0);
995 let mut watch = watchable.watch();
996 const MAX: usize = 100_000;
997
998 let handle = tokio::spawn(async move {
999 let mut last_observed = 0;
1000
1001 while last_observed != MAX {
1002 tokio::select! {
1003 val = watch.updated() => {
1004 let Ok(val) = val else {
1005 return;
1006 };
1007
1008 assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1009 last_observed = val;
1010 }
1011 _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1012 continue;
1014 }
1015 }
1016 }
1017 });
1018
1019 for i in 1..=MAX {
1020 watchable.set(i).ok();
1021 if rng().random_bool(0.2) {
1022 tokio::task::yield_now().await;
1023 }
1024 }
1025
1026 tokio::time::timeout(Duration::from_secs(10), handle)
1027 .await
1028 .unwrap()
1029 .unwrap()
1030 }
1031
1032 #[tokio::test]
1033 async fn test_join_simple() {
1034 let a = Watchable::new(1u8);
1035 let b = Watchable::new(1u8);
1036
1037 let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1038
1039 let stream = ab.clone().stream();
1040 let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1041
1042 assert_eq!(ab.get(), vec![1, 1]);
1044 a.set(2u8).unwrap();
1046 tokio::task::yield_now().await;
1047 assert_eq!(ab.get(), vec![2, 1]);
1048 b.set(3u8).unwrap();
1050 tokio::task::yield_now().await;
1051 assert_eq!(ab.get(), vec![2, 3]);
1052
1053 a.set(3u8).unwrap();
1054 tokio::task::yield_now().await;
1055 b.set(4u8).unwrap();
1056 tokio::task::yield_now().await;
1057
1058 let values = tokio::time::timeout(Duration::from_secs(5), handle)
1059 .await
1060 .unwrap()
1061 .unwrap();
1062 assert_eq!(
1063 values,
1064 vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1065 );
1066 }
1067
1068 #[tokio::test]
1069 async fn test_updated_then_disconnect_then_get() {
1070 let watchable = Watchable::new(10);
1071 let mut watcher = watchable.watch();
1072 assert_eq!(watchable.get(), 10);
1073 watchable.set(42).ok();
1074 assert_eq!(watcher.updated().await.unwrap(), 42);
1075 drop(watchable);
1076 assert_eq!(watcher.get(), 42);
1077 }
1078
1079 #[tokio::test(start_paused = true)]
1080 async fn test_update_wakeup_on_watchable_drop() {
1081 let watchable = Watchable::new(10);
1082 let mut watcher = watchable.watch();
1083
1084 let start = Instant::now();
1085 let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1086 tokio::join!(
1087 async move {
1088 tokio::time::sleep(Duration::from_secs(1)).await;
1089 drop(watchable);
1090 },
1091 async move { watcher.updated().await }
1092 )
1093 })
1094 .await
1095 .expect("watcher never updated");
1096 assert_eq!(start.elapsed(), Duration::from_secs(1));
1099 assert!(result.is_err());
1100 }
1101
1102 #[tokio::test(start_paused = true)]
1103 async fn test_update_wakeup_always_a_change() {
1104 let watchable = Watchable::new(10);
1105 let mut watcher = watchable.watch();
1106
1107 let task = tokio::spawn(async move {
1108 let mut last_value = watcher.get();
1109 let mut values = Vec::new();
1110 while let Ok(value) = watcher.updated().await {
1111 values.push(value);
1112 if last_value == value {
1113 return Err("value duplicated");
1114 }
1115 last_value = value;
1116 }
1117 Ok(values)
1118 });
1119
1120 tokio::time::sleep(Duration::from_millis(100)).await;
1122
1123 watchable.set(11).ok();
1124 tokio::time::sleep(Duration::from_millis(100)).await;
1125 let clone = watchable.clone();
1126 drop(clone); tokio::time::sleep(Duration::from_millis(100)).await;
1128 for i in 1..=10 {
1129 watchable.set(i + 11).ok();
1130 tokio::time::sleep(Duration::from_millis(100)).await;
1131 }
1132 drop(watchable);
1133
1134 let values = task
1135 .await
1136 .expect("task panicked")
1137 .expect("value duplicated");
1138 assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1139 }
1140
1141 #[test]
1142 fn test_has_watchers() {
1143 let a = Watchable::new(1u8);
1144 assert!(!a.has_watchers());
1145 let b = a.clone();
1146 assert!(!a.has_watchers());
1147 assert!(!b.has_watchers());
1148
1149 let watcher = a.watch();
1150 assert!(a.has_watchers());
1151 assert!(b.has_watchers());
1152
1153 drop(watcher);
1154
1155 assert!(!a.has_watchers());
1156 assert!(!b.has_watchers());
1157 }
1158
1159 #[tokio::test]
1160 async fn test_three_watchers_basic() {
1161 let watchable = Watchable::new(1u8);
1162
1163 let mut w1 = watchable.watch();
1164 let mut w2 = watchable.watch();
1165 let mut w3 = watchable.watch();
1166
1167 assert_eq!(w1.get(), 1);
1170 assert_eq!(w2.get(), 1);
1171 assert_eq!(w3.get(), 1);
1172
1173 watchable.set(42).unwrap();
1175
1176 assert_eq!(w1.updated().await.unwrap(), 42);
1178 assert_eq!(w2.updated().await.unwrap(), 42);
1179 assert_eq!(w3.updated().await.unwrap(), 42);
1180 }
1181
1182 #[tokio::test]
1183 async fn test_three_watchers_skip_intermediate() {
1184 let watchable = Watchable::new(0u8);
1185 let mut watcher = watchable.watch();
1186
1187 watchable.set(1).ok();
1188 watchable.set(2).ok();
1189 watchable.set(3).ok();
1190 watchable.set(4).ok();
1191
1192 let value = watcher.updated().await.unwrap();
1193
1194 assert_eq!(value, 4);
1195 }
1196
1197 #[tokio::test]
1198 async fn test_three_watchers_with_streams() {
1199 let watchable = Watchable::new(10u8);
1200
1201 let mut stream1 = watchable.watch().stream();
1202 let mut stream2 = watchable.watch().stream();
1203 let mut stream3 = watchable.watch().stream_updates_only();
1204
1205 assert_eq!(stream1.next().await.unwrap(), 10);
1206 assert_eq!(stream2.next().await.unwrap(), 10);
1207
1208 watchable.set(20).ok();
1210
1211 assert_eq!(stream1.next().await.unwrap(), 20);
1213 assert_eq!(stream2.next().await.unwrap(), 20);
1214 assert_eq!(stream3.next().await.unwrap(), 20);
1215 }
1216
1217 #[tokio::test]
1218 async fn test_three_watchers_independent() {
1219 let watchable = Watchable::new(0u8);
1220
1221 let mut fast_watcher = watchable.watch();
1222 let mut slow_watcher = watchable.watch();
1223 let mut lazy_watcher = watchable.watch();
1224
1225 watchable.set(1).ok();
1226 assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1227
1228 watchable.set(2).ok();
1230 watchable.set(3).ok();
1231
1232 assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1233 assert_eq!(lazy_watcher.get(), 3);
1234 }
1235
1236 #[tokio::test]
1237 async fn test_combine_three_watchers() {
1238 let a = Watchable::new(1u8);
1239 let b = Watchable::new(2u8);
1240 let c = Watchable::new(3u8);
1241
1242 let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1243
1244 assert_eq!(combined.get(), (1, 2, 3));
1245
1246 b.set(20).ok();
1248
1249 assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1250
1251 c.set(30).ok();
1252 assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1253 }
1254
1255 #[tokio::test]
1256 async fn test_three_watchers_disconnection() {
1257 let watchable = Watchable::new(5u8);
1258
1259 let mut w1 = watchable.watch();
1261 let mut w2 = watchable.watch();
1262 let mut w3 = watchable.watch();
1263
1264 drop(watchable);
1266
1267 assert!(!w1.is_connected());
1269 assert!(!w2.is_connected());
1270 assert!(!w3.is_connected());
1271
1272 assert_eq!(w1.get(), 5);
1274 assert_eq!(w2.get(), 5);
1275
1276 assert!(w3.updated().await.is_err());
1278 }
1279
1280 #[tokio::test]
1281 async fn test_three_watchers_truly_concurrent() {
1282 use tokio::time::sleep;
1283 let watchable = Watchable::new(0u8);
1284
1285 let mut reader_handles = vec![];
1287 for i in 0..3 {
1288 let mut watcher = watchable.watch();
1289 let handle = tokio::spawn(async move {
1290 let mut values = vec![];
1291 for _ in 0..5 {
1293 if let Ok(value) = watcher.updated().await {
1294 values.push(value);
1295 } else {
1296 break;
1297 }
1298 }
1299 (i, values)
1300 });
1301 reader_handles.push(handle);
1302 }
1303
1304 let mut writer_handles = vec![];
1306 for i in 0..3 {
1307 let watchable_clone = watchable.clone();
1308 let handle = tokio::spawn(async move {
1309 for j in 0..5 {
1310 let value = (i * 10) + j;
1311 watchable_clone.set(value).ok();
1312 sleep(Duration::from_millis(5)).await;
1313 }
1314 });
1315 writer_handles.push(handle);
1316 }
1317
1318 for handle in writer_handles {
1320 handle.await.unwrap();
1321 }
1322
1323 for handle in reader_handles {
1325 let (task_id, values) = handle.await.unwrap();
1326 println!("Reader {}: saw values {:?}", task_id, values);
1327 assert!(!values.is_empty());
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn test_peek() {
1333 let a = Watchable::new(vec![1, 2, 3]);
1334 let mut wa = a.watch();
1335
1336 assert_eq!(wa.get(), vec![1, 2, 3]);
1337 assert_eq!(wa.peek(), &vec![1, 2, 3]);
1338
1339 let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1340
1341 assert_eq!(wa_map.get(), vec![2, 4, 6]);
1342 assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1343
1344 let mut wb = a.watch();
1345
1346 assert_eq!(wb.get(), vec![1, 2, 3]);
1347 assert_eq!(wb.peek(), &vec![1, 2, 3]);
1348
1349 let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1350
1351 assert_eq!(wb_map.get(), vec![2, 4, 6]);
1352 assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1353
1354 let mut w_join = Join::new([wa_map, wb_map].into_iter());
1355
1356 assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1357 assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_update_updates_peek() {
1362 let value = Watchable::new(42);
1363 let mut watcher = value.watch();
1364
1365 assert_eq!(watcher.peek(), &42);
1366 assert!(!watcher.update());
1367
1368 value.set(50).ok();
1369
1370 assert_eq!(watcher.peek(), &42); assert!(watcher.update()); assert_eq!(watcher.peek(), &50);
1373 assert!(!watcher.update());
1374
1375 let mut watcher_map = watcher.clone().map(|v| v * 2);
1376
1377 assert_eq!(watcher_map.peek(), &100);
1378 assert!(!watcher_map.update());
1379
1380 value.set(10).ok();
1381
1382 assert_eq!(watcher_map.peek(), &100);
1383 assert!(watcher_map.update());
1384 assert_eq!(watcher_map.peek(), &20);
1385 assert!(!watcher_map.update());
1386
1387 let value2 = Watchable::new(0);
1388 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1389
1390 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1391 assert!(!watcher_join.update());
1392
1393 value.set(0).ok();
1394 value2.set(1).ok();
1395
1396 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1397 assert!(watcher_join.update());
1398 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1399 assert!(!watcher_join.update());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_get_updates_peek() {
1404 let value = Watchable::new(42);
1405 let mut watcher = value.watch();
1406
1407 assert_eq!(watcher.peek(), &42);
1408 assert!(!watcher.update());
1409
1410 value.set(50).ok();
1411
1412 assert_eq!(watcher.peek(), &42); assert_eq!(watcher.get(), 50); assert_eq!(watcher.peek(), &50);
1415 assert!(!watcher.update());
1416
1417 let mut watcher_map = watcher.clone().map(|v| v * 2);
1418
1419 assert_eq!(watcher_map.peek(), &100);
1420 assert!(!watcher_map.update());
1421
1422 value.set(10).ok();
1423
1424 assert_eq!(watcher_map.peek(), &100);
1425 assert_eq!(watcher_map.get(), 20);
1426 assert_eq!(watcher_map.peek(), &20);
1427 assert!(!watcher_map.update());
1428
1429 let value2 = Watchable::new(0);
1430 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1431
1432 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1433 assert!(!watcher_join.update());
1434
1435 value.set(0).ok();
1436 value2.set(1).ok();
1437
1438 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1439 assert_eq!(watcher_join.get(), vec![0, 1]);
1440 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1441 assert!(!watcher_join.update());
1442 }
1443
1444 #[tokio::test]
1445 async fn test_ensure_wakers_bounded() {
1446 use tokio::time::{interval, Duration};
1447 let watchable = Watchable::new(0);
1448 let mut watcher = watchable.watch();
1449 let max_tick = 1000;
1450
1451 let handle = tokio::spawn(async move {
1452 let mut ticker = interval(Duration::from_nanos(1));
1453 let mut tick_no = 0;
1454 loop {
1455 tokio::select! {
1456 _ = watcher.updated() => {}
1457 _ = ticker.tick() => {
1458 tick_no += 1;
1460 if tick_no > max_tick{
1461 return
1462 }
1463 }
1464 }
1465 let num_wakers = watchable.shared.wakers.lock().unwrap().len();
1466 assert_eq!(num_wakers, 1);
1467 }
1468 });
1469
1470 tokio::time::timeout(Duration::from_secs(1), handle)
1471 .await
1472 .unwrap()
1473 .unwrap()
1474 }
1475}