1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5 task::Poll,
6};
7
8use rand::Rng;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn contains(&self, origin: &Origin) -> bool {
140 self.0.contains(origin)
141 }
142
143 pub fn len(&self) -> usize {
145 self.0.len()
146 }
147
148 pub fn is_empty(&self) -> bool {
150 self.0.is_empty()
151 }
152
153 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155 self.0.iter()
156 }
157
158 pub fn as_slice(&self) -> &[Origin] {
160 &self.0
161 }
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165 type Error = TooManyOrigins;
166
167 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168 if v.len() > MAX_HOPS {
169 return Err(TooManyOrigins);
170 }
171 Ok(Self(v))
172 }
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176 type Item = &'a Origin;
177 type IntoIter = std::slice::Iter<'a, Origin>;
178
179 fn into_iter(self) -> Self::IntoIter {
180 self.iter()
181 }
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186 u64: Encode<V>,
187 Origin: Encode<V>,
188{
189 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190 (self.0.len() as u64).encode(w, version)?;
191 for origin in &self.0 {
192 origin.encode(w, version)?;
193 }
194 Ok(())
195 }
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200 u64: Decode<V>,
201 Origin: Decode<V>,
202{
203 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204 let count = u64::decode(r, version)? as usize;
205 if count > MAX_HOPS {
206 return Err(DecodeError::BoundsExceeded);
207 }
208 let mut list = Vec::with_capacity(count);
209 for _ in 0..count {
210 list.push(Origin::decode(r, version)?);
211 }
212 Ok(Self(list))
213 }
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222 fn new() -> Self {
223 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224 }
225}
226
227struct OriginBroadcast {
229 path: PathOwned,
230 active: BroadcastConsumer,
231 backup: VecDeque<BroadcastConsumer>,
232}
233
234fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
242 const SEED: u64 = 0x420C0DECB00B; const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
249
250 let mut hash = SEED;
251 for &byte in name.as_str().as_bytes() {
252 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
253 }
254 for hop in hops {
255 for &byte in &hop.id.to_le_bytes() {
256 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
257 }
258 }
259
260 (hops.len(), hash)
261}
262
263enum PendingUpdate {
272 Announce(BroadcastConsumer),
273 Unannounce,
274 UnannounceAnnounce(BroadcastConsumer),
275}
276
277#[derive(Default)]
282struct OriginConsumerState {
283 pending: BTreeMap<PathOwned, PendingUpdate>,
284}
285
286impl OriginConsumerState {
287 fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
288 let new = match self.pending.remove(&path) {
289 None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
291 Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
293 PendingUpdate::UnannounceAnnounce(broadcast)
294 }
295 };
296 self.pending.insert(path, new);
297 }
298
299 fn apply_unannounce(&mut self, path: PathOwned) {
300 match self.pending.remove(&path) {
301 Some(PendingUpdate::Announce(_)) => {}
303 None | Some(PendingUpdate::Unannounce) => {
304 self.pending.insert(path, PendingUpdate::Unannounce);
305 }
306 Some(PendingUpdate::UnannounceAnnounce(_)) => {
309 self.pending.insert(path, PendingUpdate::Unannounce);
310 }
311 }
312 }
313
314 fn take(&mut self) -> Option<OriginAnnounce> {
316 let path = self.pending.keys().next()?.clone();
317 match self.pending.remove(&path).unwrap() {
318 PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
319 PendingUpdate::Unannounce => Some((path, None)),
320 PendingUpdate::UnannounceAnnounce(broadcast) => {
321 self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
324 Some((path, None))
325 }
326 }
327 }
328}
329
330#[derive(Clone)]
331struct OriginConsumerNotify {
332 root: PathOwned,
333 state: kio::Producer<OriginConsumerState>,
334}
335
336impl OriginConsumerNotify {
337 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
338 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
339 self.state
340 .write()
341 .ok()
342 .expect("consumer closed")
343 .apply_announce(path, broadcast);
344 }
345
346 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
347 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
348 let mut state = self.state.write().ok().expect("consumer closed");
349 state.apply_unannounce(path.clone());
350 state.apply_announce(path, broadcast);
351 }
352
353 fn unannounce(&self, path: impl AsPath) {
354 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
355 self.state.write().ok().expect("consumer closed").apply_unannounce(path);
356 }
357}
358
359struct NotifyNode {
360 parent: Option<Lock<NotifyNode>>,
361
362 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
365}
366
367impl NotifyNode {
368 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
369 Self {
370 parent,
371 consumers: HashMap::new(),
372 }
373 }
374
375 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
376 for consumer in self.consumers.values() {
377 consumer.announce(path.as_path(), broadcast.clone());
378 }
379
380 if let Some(parent) = &self.parent {
381 parent.lock().announce(path, broadcast);
382 }
383 }
384
385 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
386 for consumer in self.consumers.values() {
387 consumer.reannounce(path.as_path(), broadcast.clone());
388 }
389
390 if let Some(parent) = &self.parent {
391 parent.lock().reannounce(path, broadcast);
392 }
393 }
394
395 fn unannounce(&mut self, path: impl AsPath) {
396 for consumer in self.consumers.values() {
397 consumer.unannounce(path.as_path());
398 }
399
400 if let Some(parent) = &self.parent {
401 parent.lock().unannounce(path);
402 }
403 }
404}
405
406struct OriginNode {
407 broadcast: Option<OriginBroadcast>,
409
410 nested: HashMap<String, Lock<OriginNode>>,
412
413 notify: Lock<NotifyNode>,
415}
416
417impl OriginNode {
418 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
419 Self {
420 broadcast: None,
421 nested: HashMap::new(),
422 notify: Lock::new(NotifyNode::new(parent)),
423 }
424 }
425
426 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
427 let (dir, rest) = path.next_part().expect("leaf called with empty path");
428
429 let next = self.entry(dir);
430 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
431 }
432
433 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
434 match self.nested.get(dir) {
435 Some(next) => next.clone(),
436 None => {
437 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
438 self.nested.insert(dir.to_string(), next.clone());
439 next
440 }
441 }
442 }
443
444 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
445 let full = full.as_path();
446 let rest = relative.as_path();
447
448 if let Some((dir, relative)) = rest.next_part() {
450 self.entry(dir).lock().publish(&full, broadcast, &relative);
452 } else if let Some(existing) = &mut self.broadcast {
453 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
461 return;
462 }
463
464 if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
465 let old = existing.active.clone();
466 existing.active = broadcast.clone();
467 existing.backup.push_back(old);
468
469 self.notify.lock().reannounce(full, broadcast);
470 } else {
471 existing.backup.push_back(broadcast.clone());
474 }
475 } else {
476 self.broadcast = Some(OriginBroadcast {
478 path: full.to_owned(),
479 active: broadcast.clone(),
480 backup: VecDeque::new(),
481 });
482 self.notify.lock().announce(full, broadcast);
483 }
484 }
485
486 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
487 self.consume_initial(&mut notify);
488 self.notify.lock().consumers.insert(id, notify);
489 }
490
491 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
492 if let Some(broadcast) = &self.broadcast {
493 notify.announce(&broadcast.path, broadcast.active.clone());
494 }
495
496 for nested in self.nested.values() {
498 nested.lock().consume_initial(notify);
499 }
500 }
501
502 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
503 let rest = rest.as_path();
504
505 if let Some((dir, rest)) = rest.next_part() {
506 let node = self.nested.get(dir)?.lock();
507 node.consume_broadcast(&rest)
508 } else {
509 self.broadcast.as_ref().map(|b| b.active.clone())
510 }
511 }
512
513 fn unconsume(&mut self, id: ConsumerId) {
514 self.notify.lock().consumers.remove(&id).expect("consumer not found");
515 if self.is_empty() {
516 }
519 }
520
521 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
523 let full = full.as_path();
524 let relative = relative.as_path();
525
526 if let Some((dir, relative)) = relative.next_part() {
527 let nested = self.entry(dir);
528 let mut locked = nested.lock();
529 locked.remove(&full, broadcast, &relative);
530
531 if locked.is_empty() {
532 drop(locked);
533 self.nested.remove(dir);
534 }
535 } else {
536 let entry = match &mut self.broadcast {
537 Some(existing) => existing,
538 None => return,
539 };
540
541 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
543 if let Some(pos) = pos {
544 entry.backup.remove(pos);
545 return;
547 }
548
549 assert!(entry.active.is_clone(&broadcast));
551
552 let best = entry
555 .backup
556 .iter()
557 .enumerate()
558 .min_by_key(|(_, b)| route_key(&full, &b.hops))
559 .map(|(i, _)| i);
560 if let Some(idx) = best {
561 let active = entry.backup.remove(idx).expect("index in range");
562 entry.active = active;
563 self.notify.lock().reannounce(full, &entry.active);
564 } else {
565 self.broadcast = None;
567 self.notify.lock().unannounce(full);
568 }
569 }
570 }
571
572 fn is_empty(&self) -> bool {
573 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
574 }
575}
576
577#[derive(Clone)]
578struct OriginNodes {
579 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
580}
581
582impl OriginNodes {
583 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
586 let mut roots = Vec::new();
587
588 for (root, state) in &self.nodes {
589 for prefix in prefixes {
590 if root.has_prefix(prefix) {
591 roots.push((root.to_owned(), state.clone()));
593 continue;
594 }
595
596 if let Some(suffix) = prefix.strip_prefix(root) {
597 let nested = state.lock().leaf(&suffix);
599 roots.push((prefix.to_owned(), nested));
600 }
601 }
602 }
603
604 if roots.is_empty() {
605 None
606 } else {
607 Some(Self { nodes: roots })
608 }
609 }
610
611 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
612 let new_root = new_root.as_path();
613 let mut roots = Vec::new();
614
615 if new_root.is_empty() {
616 return Some(self.clone());
617 }
618
619 for (root, state) in &self.nodes {
620 if let Some(suffix) = root.strip_prefix(&new_root) {
621 roots.push((suffix.to_owned(), state.clone()));
623 } else if let Some(suffix) = new_root.strip_prefix(root) {
624 let nested = state.lock().leaf(&suffix);
627 roots.push(("".into(), nested));
628 }
629 }
630
631 if roots.is_empty() {
632 None
633 } else {
634 Some(Self { nodes: roots })
635 }
636 }
637
638 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
640 let path = path.as_path();
641
642 for (root, state) in &self.nodes {
643 if let Some(suffix) = path.strip_prefix(root) {
644 return Some((state.clone(), suffix.to_owned()));
645 }
646 }
647
648 None
649 }
650}
651
652impl Default for OriginNodes {
653 fn default() -> Self {
654 Self {
655 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
656 }
657 }
658}
659
660pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
662
663#[derive(Clone)]
665pub struct OriginProducer {
666 info: Origin,
670
671 nodes: OriginNodes,
674
675 root: PathOwned,
677}
678
679impl std::ops::Deref for OriginProducer {
680 type Target = Origin;
681
682 fn deref(&self) -> &Self::Target {
683 &self.info
684 }
685}
686
687impl OriginProducer {
688 pub fn new(info: Origin) -> Self {
691 Self {
692 info,
693 nodes: OriginNodes::default(),
694 root: PathOwned::default(),
695 }
696 }
697
698 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
703 let broadcast = Broadcast::new().produce();
704 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
705 }
706
707 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
719 let path = path.as_path();
720
721 if broadcast.hops.contains(&self.info) {
723 return false;
724 }
725
726 let (root, rest) = match self.nodes.get(&path) {
727 Some(root) => root,
728 None => return false,
729 };
730
731 let full = self.root.join(&path);
732
733 root.lock().publish(&full, &broadcast, &rest);
734 let root = root.clone();
735
736 web_async::spawn(async move {
737 broadcast.closed().await;
738 root.lock().remove(&full, broadcast, &rest);
739 });
740
741 true
742 }
743
744 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
750 let prefixes = PathPrefixes::new(prefixes);
751 Some(OriginProducer {
752 info: self.info,
753 nodes: self.nodes.select(&prefixes)?,
754 root: self.root.clone(),
755 })
756 }
757
758 pub fn consume(&self) -> OriginConsumer {
760 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
761 }
762
763 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
768 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
769 let path = path.as_path();
770 let (root, rest) = self.nodes.get(&path)?;
771 let state = root.lock();
772 state.consume_broadcast(&rest)
773 }
774
775 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
780 let prefix = prefix.as_path();
781
782 Some(Self {
783 info: self.info,
784 root: self.root.join(&prefix).to_owned(),
785 nodes: self.nodes.root(&prefix)?,
786 })
787 }
788
789 pub fn root(&self) -> &Path<'_> {
791 &self.root
792 }
793
794 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
797 self.nodes.nodes.iter().map(|(root, _)| root)
798 }
799
800 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
802 self.root.join(path)
803 }
804}
805
806pub struct OriginConsumer {
810 id: ConsumerId,
811 info: Origin,
813 nodes: OriginNodes,
814
815 state: kio::Producer<OriginConsumerState>,
818
819 root: PathOwned,
821}
822
823impl std::ops::Deref for OriginConsumer {
824 type Target = Origin;
825
826 fn deref(&self) -> &Self::Target {
827 &self.info
828 }
829}
830
831impl OriginConsumer {
832 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
833 let state = kio::Producer::<OriginConsumerState>::default();
834 let id = ConsumerId::new();
835
836 for (_, node) in &nodes.nodes {
837 let notify = OriginConsumerNotify {
838 root: root.clone(),
839 state: state.clone(),
840 };
841 node.lock().consume(id, notify);
842 }
843
844 Self {
845 id,
846 info,
847 nodes,
848 state,
849 root,
850 }
851 }
852
853 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
861 kio::wait(|waiter| self.poll_announced(waiter)).await
862 }
863
864 pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
870 match self.state.poll(waiter, |state| match state.take() {
871 Some(item) => Poll::Ready(item),
872 None => Poll::Pending,
873 }) {
874 Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
875 Poll::Ready(Err(_)) => Poll::Ready(None),
877 Poll::Pending => Poll::Pending,
878 }
879 }
880
881 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
886 self.state.write().ok()?.take()
887 }
888
889 pub fn consume(&self) -> Self {
891 self.clone()
892 }
893
894 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
902 let path = path.as_path();
903 let (root, rest) = self.nodes.get(&path)?;
904 let state = root.lock();
905 state.consume_broadcast(&rest)
906 }
907
908 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
917 let path = path.as_path();
918
919 let mut consumer = self.scope(std::slice::from_ref(&path))?;
921
922 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
926 return None;
927 }
928
929 loop {
930 let (announced, broadcast) = consumer.announced().await?;
931 if announced.as_path() == path {
933 if let Some(broadcast) = broadcast {
934 return Some(broadcast);
935 }
936 }
937 }
938 }
939
940 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
946 let prefixes = PathPrefixes::new(prefixes);
947 Some(OriginConsumer::new(
948 self.info,
949 self.root.clone(),
950 self.nodes.select(&prefixes)?,
951 ))
952 }
953
954 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
959 let prefix = prefix.as_path();
960
961 Some(Self::new(
962 self.info,
963 self.root.join(&prefix).to_owned(),
964 self.nodes.root(&prefix)?,
965 ))
966 }
967
968 pub fn root(&self) -> &Path<'_> {
970 &self.root
971 }
972
973 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
976 self.nodes.nodes.iter().map(|(root, _)| root)
977 }
978
979 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
981 self.root.join(path)
982 }
983}
984
985impl Drop for OriginConsumer {
986 fn drop(&mut self) {
987 for (_, root) in &self.nodes.nodes {
988 root.lock().unconsume(self.id);
989 }
990 }
991}
992
993impl Clone for OriginConsumer {
994 fn clone(&self) -> Self {
995 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
996 }
997}
998
999#[cfg(test)]
1000use futures::FutureExt;
1001
1002#[cfg(test)]
1003impl OriginConsumer {
1004 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1005 let expected = expected.as_path();
1006 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1007 assert_eq!(path, expected, "wrong path");
1008 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1009 }
1010
1011 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1012 let expected = expected.as_path();
1013 let (path, active) = self.try_announced().expect("no next");
1014 assert_eq!(path, expected, "wrong path");
1015 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1016 }
1017
1018 pub fn assert_next_none(&mut self, expected: impl AsPath) {
1019 let expected = expected.as_path();
1020 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1021 assert_eq!(path, expected, "wrong path");
1022 assert!(active.is_none(), "should be unannounced");
1023 }
1024
1025 pub fn assert_next_wait(&mut self) {
1026 if let Some(res) = self.announced().now_or_never() {
1027 panic!("next should block: got {:?}", res.map(|(path, _)| path));
1028 }
1029 }
1030
1031 }
1040
1041#[cfg(test)]
1042mod tests {
1043 use crate::Broadcast;
1044
1045 use super::*;
1046
1047 #[test]
1048 fn origin_list_push_fails_at_limit() {
1049 let mut list = OriginList::new();
1050 for _ in 0..MAX_HOPS {
1051 list.push(Origin::random()).unwrap();
1052 }
1053 assert_eq!(list.len(), MAX_HOPS);
1054 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1055 }
1056
1057 #[test]
1058 fn origin_list_try_from_vec_enforces_limit() {
1059 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1060 assert!(OriginList::try_from(under).is_ok());
1061
1062 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1063 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1064 }
1065
1066 #[tokio::test]
1067 async fn test_announce() {
1068 tokio::time::pause();
1069
1070 let origin = Origin::random().produce();
1071 let broadcast1 = Broadcast::new().produce();
1072 let broadcast2 = Broadcast::new().produce();
1073
1074 let mut consumer1 = origin.consume();
1075 consumer1.assert_next_wait();
1077
1078 origin.publish_broadcast("test1", broadcast1.consume());
1080
1081 consumer1.assert_next("test1", &broadcast1.consume());
1082 consumer1.assert_next_wait();
1083
1084 let mut consumer2 = origin.consume();
1087
1088 origin.publish_broadcast("test2", broadcast2.consume());
1090
1091 consumer1.assert_next("test2", &broadcast2.consume());
1092 consumer1.assert_next_wait();
1093
1094 consumer2.assert_next("test1", &broadcast1.consume());
1095 consumer2.assert_next("test2", &broadcast2.consume());
1096 consumer2.assert_next_wait();
1097
1098 drop(broadcast1);
1100
1101 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1103
1104 consumer1.assert_next_none("test1");
1106 consumer2.assert_next_none("test1");
1107 consumer1.assert_next_wait();
1108 consumer2.assert_next_wait();
1109
1110 let mut consumer3 = origin.consume();
1112 consumer3.assert_next("test2", &broadcast2.consume());
1113 consumer3.assert_next_wait();
1114
1115 drop(broadcast2);
1117
1118 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1120
1121 consumer1.assert_next_none("test2");
1122 consumer2.assert_next_none("test2");
1123 consumer3.assert_next_none("test2");
1124
1125 }
1131
1132 #[tokio::test]
1133 async fn test_duplicate() {
1134 tokio::time::pause();
1135
1136 let origin = Origin::random().produce();
1137
1138 let broadcast1 = Broadcast::new().produce();
1139 let broadcast2 = Broadcast::new().produce();
1140 let broadcast3 = Broadcast::new().produce();
1141
1142 let consumer1 = broadcast1.consume();
1143 let consumer2 = broadcast2.consume();
1144 let consumer3 = broadcast3.consume();
1145
1146 let mut consumer = origin.consume();
1147
1148 origin.publish_broadcast("test", consumer1.clone());
1149 origin.publish_broadcast("test", consumer2.clone());
1150 origin.publish_broadcast("test", consumer3.clone());
1151 assert!(consumer.get_broadcast("test").is_some());
1152
1153 consumer.assert_next("test", &consumer1);
1156 consumer.assert_next_wait();
1157
1158 drop(broadcast2);
1160
1161 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1163
1164 assert!(consumer.get_broadcast("test").is_some());
1165 consumer.assert_next_wait();
1166
1167 drop(broadcast1);
1169
1170 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1172
1173 assert!(consumer.get_broadcast("test").is_some());
1174 consumer.assert_next_none("test");
1175 consumer.assert_next("test", &consumer3);
1176
1177 drop(broadcast3);
1179
1180 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1182 assert!(consumer.get_broadcast("test").is_none());
1183
1184 consumer.assert_next_none("test");
1185 consumer.assert_next_wait();
1186 }
1187
1188 #[tokio::test]
1189 async fn test_duplicate_reverse() {
1190 tokio::time::pause();
1191
1192 let origin = Origin::random().produce();
1193 let broadcast1 = Broadcast::new().produce();
1194 let broadcast2 = Broadcast::new().produce();
1195
1196 origin.publish_broadcast("test", broadcast1.consume());
1197 origin.publish_broadcast("test", broadcast2.consume());
1198 assert!(origin.consume().get_broadcast("test").is_some());
1199
1200 drop(broadcast2);
1202
1203 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1205 assert!(origin.consume().get_broadcast("test").is_some());
1206
1207 drop(broadcast1);
1208
1209 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1211 assert!(origin.consume().get_broadcast("test").is_none());
1212 }
1213
1214 #[tokio::test]
1215 async fn test_deterministic_tiebreak() {
1216 tokio::time::pause();
1217
1218 fn route(ids: &[u64]) -> BroadcastProducer {
1220 let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1221 Broadcast { hops }.produce()
1222 }
1223
1224 fn winner(first: &[u64], second: &[u64]) -> OriginList {
1226 let origin = Origin::random().produce();
1227 let a = route(first);
1228 let b = route(second);
1229 origin.publish_broadcast("test", a.consume());
1230 origin.publish_broadcast("test", b.consume());
1231 let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1232 drop((a, b));
1234 hops
1235 }
1236
1237 let forward = winner(&[10, 20], &[30, 40]);
1240 let reverse = winner(&[30, 40], &[10, 20]);
1241 assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1242
1243 assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1245 assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_double_publish() {
1250 tokio::time::pause();
1251
1252 let origin = Origin::random().produce();
1253 let broadcast = Broadcast::new().produce();
1254
1255 origin.publish_broadcast("test", broadcast.consume());
1257 origin.publish_broadcast("test", broadcast.consume());
1258
1259 assert!(origin.consume().get_broadcast("test").is_some());
1260
1261 drop(broadcast);
1262
1263 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1265 assert!(origin.consume().get_broadcast("test").is_none());
1266 }
1267 #[tokio::test]
1272 async fn test_many_announces() {
1273 let origin = Origin::random().produce();
1274 let broadcast = Broadcast::new().produce();
1275
1276 let mut consumer = origin.consume();
1277 for i in 0..256 {
1278 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1279 }
1280
1281 for i in 0..256 {
1282 consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1283 }
1284 consumer.assert_next_wait();
1285 }
1286
1287 #[tokio::test]
1288 async fn test_many_announces_try() {
1289 let origin = Origin::random().produce();
1290 let broadcast = Broadcast::new().produce();
1291
1292 let mut consumer = origin.consume();
1293 for i in 0..256 {
1294 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1295 }
1296
1297 for i in 0..256 {
1298 consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1299 }
1300 }
1301
1302 #[tokio::test]
1303 async fn test_with_root_basic() {
1304 let origin = Origin::random().produce();
1305 let broadcast = Broadcast::new().produce();
1306
1307 let foo_producer = origin.with_root("foo").expect("should create root");
1309 assert_eq!(foo_producer.root().as_str(), "foo");
1310
1311 let mut consumer = origin.consume();
1312
1313 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1315 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1317
1318 let mut foo_consumer = foo_producer.consume();
1320 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1321 }
1322
1323 #[tokio::test]
1324 async fn test_with_root_nested() {
1325 let origin = Origin::random().produce();
1326 let broadcast = Broadcast::new().produce();
1327
1328 let foo_producer = origin.with_root("foo").expect("should create foo root");
1330 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1331 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1332
1333 let mut consumer = origin.consume();
1334
1335 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1337 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1339
1340 let mut foo_bar_consumer = foo_bar_producer.consume();
1342 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1343 }
1344
1345 #[tokio::test]
1346 async fn test_publish_scope_allows() {
1347 let origin = Origin::random().produce();
1348 let broadcast = Broadcast::new().produce();
1349
1350 let limited_producer = origin
1352 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1353 .expect("should create limited producer");
1354
1355 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1357 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1358 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1359
1360 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1362 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1364 }
1365
1366 #[tokio::test]
1367 async fn test_publish_scope_empty() {
1368 let origin = Origin::random().produce();
1369
1370 assert!(origin.scope(&[]).is_none());
1372 }
1373
1374 #[tokio::test]
1375 async fn test_consume_scope_filters() {
1376 let origin = Origin::random().produce();
1377 let broadcast1 = Broadcast::new().produce();
1378 let broadcast2 = Broadcast::new().produce();
1379 let broadcast3 = Broadcast::new().produce();
1380
1381 let mut consumer = origin.consume();
1382
1383 origin.publish_broadcast("allowed", broadcast1.consume());
1385 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1386 origin.publish_broadcast("notallowed", broadcast3.consume());
1387
1388 let mut limited_consumer = origin
1390 .consume()
1391 .scope(&["allowed".into()])
1392 .expect("should create limited consumer");
1393
1394 limited_consumer.assert_next("allowed", &broadcast1.consume());
1396 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1397 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1401 consumer.assert_next("allowed/nested", &broadcast2.consume());
1402 consumer.assert_next("notallowed", &broadcast3.consume());
1403 }
1404
1405 #[tokio::test]
1406 async fn test_consume_scope_multiple_prefixes() {
1407 let origin = Origin::random().produce();
1408 let broadcast1 = Broadcast::new().produce();
1409 let broadcast2 = Broadcast::new().produce();
1410 let broadcast3 = Broadcast::new().produce();
1411
1412 origin.publish_broadcast("foo/test", broadcast1.consume());
1413 origin.publish_broadcast("bar/test", broadcast2.consume());
1414 origin.publish_broadcast("baz/test", broadcast3.consume());
1415
1416 let mut limited_consumer = origin
1418 .consume()
1419 .scope(&["foo".into(), "bar".into()])
1420 .expect("should create limited consumer");
1421
1422 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1424 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1425 limited_consumer.assert_next_wait(); }
1427
1428 #[tokio::test]
1429 async fn test_with_root_and_publish_scope() {
1430 let origin = Origin::random().produce();
1431 let broadcast = Broadcast::new().produce();
1432
1433 let foo_producer = origin.with_root("foo").expect("should create foo root");
1435
1436 let limited_producer = foo_producer
1438 .scope(&["bar".into(), "goop/pee".into()])
1439 .expect("should create limited producer");
1440
1441 let mut consumer = origin.consume();
1442
1443 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1445 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1446 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1447 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1448
1449 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1451 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1453
1454 consumer.assert_next("foo/bar", &broadcast.consume());
1456 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1457 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1458 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1459 }
1460
1461 #[tokio::test]
1462 async fn test_with_root_and_consume_scope() {
1463 let origin = Origin::random().produce();
1464 let broadcast1 = Broadcast::new().produce();
1465 let broadcast2 = Broadcast::new().produce();
1466 let broadcast3 = Broadcast::new().produce();
1467
1468 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1470 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1471 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1472
1473 let foo_producer = origin.with_root("foo").expect("should create foo root");
1475
1476 let mut limited_consumer = foo_producer
1478 .consume()
1479 .scope(&["bar".into(), "goop/pee".into()])
1480 .expect("should create limited consumer");
1481
1482 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1484 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1485 limited_consumer.assert_next_wait(); }
1487
1488 #[tokio::test]
1489 async fn test_with_root_unauthorized() {
1490 let origin = Origin::random().produce();
1491
1492 let limited_producer = origin
1494 .scope(&["allowed".into()])
1495 .expect("should create limited producer");
1496
1497 assert!(limited_producer.with_root("notallowed").is_none());
1499
1500 let allowed_root = limited_producer
1502 .with_root("allowed")
1503 .expect("should create allowed root");
1504 assert_eq!(allowed_root.root().as_str(), "allowed");
1505 }
1506
1507 #[tokio::test]
1508 async fn test_wildcard_permission() {
1509 let origin = Origin::random().produce();
1510 let broadcast = Broadcast::new().produce();
1511
1512 let root_producer = origin.clone();
1514
1515 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1517 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1518
1519 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1521 assert_eq!(foo_producer.root().as_str(), "foo");
1522 }
1523
1524 #[tokio::test]
1525 async fn test_consume_broadcast_with_permissions() {
1526 let origin = Origin::random().produce();
1527 let broadcast1 = Broadcast::new().produce();
1528 let broadcast2 = Broadcast::new().produce();
1529
1530 origin.publish_broadcast("allowed/test", broadcast1.consume());
1531 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1532
1533 let limited_consumer = origin
1535 .consume()
1536 .scope(&["allowed".into()])
1537 .expect("should create limited consumer");
1538
1539 let result = limited_consumer.get_broadcast("allowed/test");
1541 assert!(result.is_some());
1542 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1543
1544 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1546
1547 let consumer = origin.consume();
1549 assert!(consumer.get_broadcast("allowed/test").is_some());
1550 assert!(consumer.get_broadcast("notallowed/test").is_some());
1551 }
1552
1553 #[tokio::test]
1554 async fn test_nested_paths_with_permissions() {
1555 let origin = Origin::random().produce();
1556 let broadcast = Broadcast::new().produce();
1557
1558 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1560
1561 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1563 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1564 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1565
1566 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1568 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1569 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1570 }
1571
1572 #[tokio::test]
1573 async fn test_multiple_consumers_with_different_permissions() {
1574 let origin = Origin::random().produce();
1575 let broadcast1 = Broadcast::new().produce();
1576 let broadcast2 = Broadcast::new().produce();
1577 let broadcast3 = Broadcast::new().produce();
1578
1579 origin.publish_broadcast("foo/test", broadcast1.consume());
1581 origin.publish_broadcast("bar/test", broadcast2.consume());
1582 origin.publish_broadcast("baz/test", broadcast3.consume());
1583
1584 let mut foo_consumer = origin
1586 .consume()
1587 .scope(&["foo".into()])
1588 .expect("should create foo consumer");
1589
1590 let mut bar_consumer = origin
1591 .consume()
1592 .scope(&["bar".into()])
1593 .expect("should create bar consumer");
1594
1595 let mut foobar_consumer = origin
1596 .consume()
1597 .scope(&["foo".into(), "bar".into()])
1598 .expect("should create foobar consumer");
1599
1600 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1602 foo_consumer.assert_next_wait();
1603
1604 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1605 bar_consumer.assert_next_wait();
1606
1607 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1608 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1609 foobar_consumer.assert_next_wait();
1610 }
1611
1612 #[tokio::test]
1613 async fn test_select_with_empty_prefix() {
1614 let origin = Origin::random().produce();
1615 let broadcast1 = Broadcast::new().produce();
1616 let broadcast2 = Broadcast::new().produce();
1617
1618 let demo_producer = origin.with_root("demo").expect("should create demo root");
1620 let limited_producer = demo_producer
1621 .scope(&["worm-node".into(), "foobar".into()])
1622 .expect("should create limited producer");
1623
1624 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1626 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1627
1628 let mut consumer = limited_producer
1630 .consume()
1631 .scope(&["".into()])
1632 .expect("should create consumer with empty prefix");
1633
1634 let a1 = consumer.try_announced().expect("expected first announcement");
1636 let a2 = consumer.try_announced().expect("expected second announcement");
1637 consumer.assert_next_wait();
1638
1639 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1640 paths.sort();
1641 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1642 }
1643
1644 #[tokio::test]
1645 async fn test_select_narrowing_scope() {
1646 let origin = Origin::random().produce();
1647 let broadcast1 = Broadcast::new().produce();
1648 let broadcast2 = Broadcast::new().produce();
1649 let broadcast3 = Broadcast::new().produce();
1650
1651 let demo_producer = origin.with_root("demo").expect("should create demo root");
1653 let limited_producer = demo_producer
1654 .scope(&["worm-node".into(), "foobar".into()])
1655 .expect("should create limited producer");
1656
1657 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1659 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1660 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1661
1662 let mut worm_consumer = limited_producer
1664 .consume()
1665 .scope(&["worm-node".into()])
1666 .expect("should create worm-node consumer");
1667
1668 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1670 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1671 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1675 .consume()
1676 .scope(&["worm-node/foo".into()])
1677 .expect("should create worm-node/foo consumer");
1678
1679 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1680 foo_consumer.assert_next_wait(); }
1682
1683 #[tokio::test]
1684 async fn test_select_multiple_roots_with_empty_prefix() {
1685 let origin = Origin::random().produce();
1686 let broadcast1 = Broadcast::new().produce();
1687 let broadcast2 = Broadcast::new().produce();
1688 let broadcast3 = Broadcast::new().produce();
1689
1690 let limited_producer = origin
1692 .scope(&["app1".into(), "app2".into(), "shared".into()])
1693 .expect("should create limited producer");
1694
1695 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1697 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1698 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1699
1700 let mut consumer = limited_producer
1702 .consume()
1703 .scope(&["".into()])
1704 .expect("should create consumer with empty prefix");
1705
1706 consumer.assert_next("app1/data", &broadcast1.consume());
1708 consumer.assert_next("app2/config", &broadcast2.consume());
1709 consumer.assert_next("shared/resource", &broadcast3.consume());
1710 consumer.assert_next_wait();
1711 }
1712
1713 #[tokio::test]
1714 async fn test_publish_scope_with_empty_prefix() {
1715 let origin = Origin::random().produce();
1716 let broadcast = Broadcast::new().produce();
1717
1718 let limited_producer = origin
1720 .scope(&["services/api".into(), "services/web".into()])
1721 .expect("should create limited producer");
1722
1723 let same_producer = limited_producer
1725 .scope(&["".into()])
1726 .expect("should create producer with empty prefix");
1727
1728 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1730 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1731 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1732 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1733 }
1734
1735 #[tokio::test]
1736 async fn test_select_narrowing_to_deeper_path() {
1737 let origin = Origin::random().produce();
1738 let broadcast1 = Broadcast::new().produce();
1739 let broadcast2 = Broadcast::new().produce();
1740 let broadcast3 = Broadcast::new().produce();
1741
1742 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1744
1745 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1747 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1748 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1749
1750 let mut team2_consumer = limited_producer
1752 .consume()
1753 .scope(&["org/team2".into()])
1754 .expect("should create team2 consumer");
1755
1756 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1757 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1761 .consume()
1762 .scope(&["org/team1/project1".into()])
1763 .expect("should create project1 consumer");
1764
1765 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1767 project1_consumer.assert_next_wait();
1768 }
1769
1770 #[tokio::test]
1771 async fn test_select_with_non_matching_prefix() {
1772 let origin = Origin::random().produce();
1773
1774 let limited_producer = origin
1776 .scope(&["allowed/path".into()])
1777 .expect("should create limited producer");
1778
1779 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1781
1782 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1784 }
1785
1786 #[tokio::test]
1789 async fn test_with_root_trailing_slash_consumer() {
1790 let origin = Origin::random().produce();
1791
1792 let prefix = "some_prefix/".to_string();
1794 let mut consumer = origin.consume().with_root(prefix).unwrap();
1795
1796 let b = origin.create_broadcast("some_prefix/test").unwrap();
1797 consumer.assert_next("test", &b.consume());
1798 }
1799
1800 #[tokio::test]
1802 async fn test_with_root_trailing_slash_producer() {
1803 let origin = Origin::random().produce();
1804
1805 let prefix = "some_prefix/".to_string();
1807 let rooted = origin.with_root(prefix).unwrap();
1808
1809 let b = rooted.create_broadcast("test").unwrap();
1810
1811 let mut consumer = rooted.consume();
1812 consumer.assert_next("test", &b.consume());
1813 }
1814
1815 #[tokio::test]
1817 async fn test_with_root_trailing_slash_unannounce() {
1818 tokio::time::pause();
1819
1820 let origin = Origin::random().produce();
1821
1822 let prefix = "some_prefix/".to_string();
1823 let mut consumer = origin.consume().with_root(prefix).unwrap();
1824
1825 let b = origin.create_broadcast("some_prefix/test").unwrap();
1826 consumer.assert_next("test", &b.consume());
1827
1828 drop(b);
1830 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1831
1832 consumer.assert_next_none("test");
1834 }
1835
1836 #[tokio::test]
1837 async fn test_select_maintains_access_with_wider_prefix() {
1838 let origin = Origin::random().produce();
1839 let broadcast1 = Broadcast::new().produce();
1840 let broadcast2 = Broadcast::new().produce();
1841
1842 let demo_producer = origin.with_root("demo").expect("should create demo root");
1844 let user_producer = demo_producer
1845 .scope(&["worm-node".into(), "foobar".into()])
1846 .expect("should create user producer");
1847
1848 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1850 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1851
1852 let mut consumer = user_producer
1854 .consume()
1855 .scope(&["".into()])
1856 .expect("scope with empty prefix should not fail when user has specific permissions");
1857
1858 let a1 = consumer.try_announced().expect("expected first announcement");
1860 let a2 = consumer.try_announced().expect("expected second announcement");
1861 consumer.assert_next_wait();
1862
1863 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1864 paths.sort();
1865 assert_eq!(paths, ["foobar", "worm-node/data"]);
1866
1867 let mut narrow_consumer = user_producer
1869 .consume()
1870 .scope(&["worm-node".into()])
1871 .expect("should be able to narrow scope to worm-node");
1872
1873 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1874 narrow_consumer.assert_next_wait(); }
1876
1877 #[tokio::test]
1878 async fn test_duplicate_prefixes_deduped() {
1879 let origin = Origin::random().produce();
1880 let broadcast = Broadcast::new().produce();
1881
1882 let producer = origin
1884 .scope(&["demo".into(), "demo".into()])
1885 .expect("should create producer");
1886
1887 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1888
1889 let mut consumer = producer.consume();
1890 consumer.assert_next("demo/stream", &broadcast.consume());
1891 consumer.assert_next_wait();
1892 }
1893
1894 #[tokio::test]
1895 async fn test_overlapping_prefixes_deduped() {
1896 let origin = Origin::random().produce();
1897 let broadcast = Broadcast::new().produce();
1898
1899 let producer = origin
1901 .scope(&["demo".into(), "demo/foo".into()])
1902 .expect("should create producer");
1903
1904 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1906
1907 let mut consumer = producer.consume();
1908 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1909 consumer.assert_next_wait();
1910 }
1911
1912 #[tokio::test]
1913 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1914 let origin = Origin::random().produce();
1915 let broadcast = Broadcast::new().produce();
1916
1917 let producer = origin
1919 .scope(&["demo".into(), "demo/foo".into()])
1920 .expect("should create producer");
1921
1922 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1923
1924 let mut consumer = producer.consume();
1925 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1927 consumer.assert_next_wait();
1928 }
1929
1930 #[tokio::test]
1931 async fn test_allowed_returns_deduped_prefixes() {
1932 let origin = Origin::random().produce();
1933
1934 let producer = origin
1935 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1936 .expect("should create producer");
1937
1938 let allowed: Vec<_> = producer.allowed().collect();
1939 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1940 }
1941
1942 #[tokio::test]
1943 async fn test_announced_broadcast_already_announced() {
1944 let origin = Origin::random().produce();
1945 let broadcast = Broadcast::new().produce();
1946
1947 origin.publish_broadcast("test", broadcast.consume());
1948
1949 let consumer = origin.consume();
1950 let result = consumer.announced_broadcast("test").await.expect("should find it");
1951 assert!(result.is_clone(&broadcast.consume()));
1952 }
1953
1954 #[tokio::test]
1955 async fn test_announced_broadcast_delayed() {
1956 tokio::time::pause();
1957
1958 let origin = Origin::random().produce();
1959 let broadcast = Broadcast::new().produce();
1960
1961 let consumer = origin.consume();
1962
1963 let wait = tokio::spawn({
1965 let consumer = consumer.clone();
1966 async move { consumer.announced_broadcast("test").await }
1967 });
1968
1969 tokio::task::yield_now().await;
1971
1972 origin.publish_broadcast("test", broadcast.consume());
1973
1974 let result = wait.await.unwrap().expect("should find it");
1975 assert!(result.is_clone(&broadcast.consume()));
1976 }
1977
1978 #[tokio::test]
1979 async fn test_announced_broadcast_ignores_unrelated_paths() {
1980 tokio::time::pause();
1981
1982 let origin = Origin::random().produce();
1983 let other = Broadcast::new().produce();
1984 let target = Broadcast::new().produce();
1985
1986 let consumer = origin.consume();
1987
1988 let wait = tokio::spawn({
1989 let consumer = consumer.clone();
1990 async move { consumer.announced_broadcast("target").await }
1991 });
1992
1993 tokio::task::yield_now().await;
1994
1995 origin.publish_broadcast("other", other.consume());
1997 tokio::task::yield_now().await;
1998 assert!(!wait.is_finished(), "must not resolve on unrelated path");
1999
2000 origin.publish_broadcast("target", target.consume());
2001 let result = wait.await.unwrap().expect("should find target");
2002 assert!(result.is_clone(&target.consume()));
2003 }
2004
2005 #[tokio::test]
2006 async fn test_announced_broadcast_skips_nested_paths() {
2007 tokio::time::pause();
2008
2009 let origin = Origin::random().produce();
2010 let nested = Broadcast::new().produce();
2011 let exact = Broadcast::new().produce();
2012
2013 let consumer = origin.consume();
2014
2015 let wait = tokio::spawn({
2016 let consumer = consumer.clone();
2017 async move { consumer.announced_broadcast("foo").await }
2018 });
2019
2020 tokio::task::yield_now().await;
2021
2022 origin.publish_broadcast("foo/bar", nested.consume());
2024 tokio::task::yield_now().await;
2025 assert!(!wait.is_finished(), "must not resolve on a nested path");
2026
2027 origin.publish_broadcast("foo", exact.consume());
2028 let result = wait.await.unwrap().expect("should find foo exactly");
2029 assert!(result.is_clone(&exact.consume()));
2030 }
2031
2032 #[tokio::test]
2033 async fn test_announced_broadcast_disallowed() {
2034 let origin = Origin::random().produce();
2035 let limited = origin
2036 .consume()
2037 .scope(&["allowed".into()])
2038 .expect("should create limited");
2039
2040 assert!(limited.announced_broadcast("notallowed").await.is_none());
2042 }
2043
2044 #[tokio::test]
2045 async fn test_announced_broadcast_scope_too_narrow() {
2046 let origin = Origin::random().produce();
2049 let limited = origin
2050 .consume()
2051 .scope(&["foo/specific".into()])
2052 .expect("should create limited");
2053
2054 let result = limited
2056 .announced_broadcast("foo")
2057 .now_or_never()
2058 .expect("must not block");
2059 assert!(result.is_none());
2060 }
2061
2062 #[tokio::test]
2066 async fn test_coalesce_announce_then_unannounce() {
2067 tokio::time::pause();
2069
2070 let origin = Origin::random().produce();
2071 let mut consumer = origin.consume();
2072
2073 let broadcast = Broadcast::new().produce();
2074 origin.publish_broadcast("test", broadcast.consume());
2075 drop(broadcast);
2076
2077 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2078
2079 consumer.assert_next_wait();
2080 }
2081
2082 #[tokio::test]
2083 async fn test_coalesce_announce_unannounce_announce() {
2084 tokio::time::pause();
2087
2088 let origin = Origin::random().produce();
2089 let mut consumer = origin.consume();
2090
2091 let broadcast1 = Broadcast::new().produce();
2092 let broadcast2 = Broadcast::new().produce();
2093
2094 origin.publish_broadcast("test", broadcast1.consume());
2095 drop(broadcast1);
2096 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2097 origin.publish_broadcast("test", broadcast2.consume());
2098
2099 consumer.assert_next("test", &broadcast2.consume());
2100 consumer.assert_next_wait();
2101 }
2102
2103 #[tokio::test]
2104 async fn test_coalesce_unannounce_announce_preserved() {
2105 tokio::time::pause();
2108
2109 let origin = Origin::random().produce();
2110 let broadcast1 = Broadcast::new().produce();
2111 origin.publish_broadcast("test", broadcast1.consume());
2112
2113 let mut consumer = origin.consume();
2114 consumer.assert_next("test", &broadcast1.consume());
2115
2116 drop(broadcast1);
2118 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2119
2120 let broadcast2 = Broadcast::new().produce();
2121 origin.publish_broadcast("test", broadcast2.consume());
2122
2123 consumer.assert_next_none("test");
2125 consumer.assert_next("test", &broadcast2.consume());
2126 consumer.assert_next_wait();
2127 }
2128
2129 #[tokio::test]
2130 async fn test_coalesce_unannounce_announce_unannounce() {
2131 tokio::time::pause();
2134
2135 let origin = Origin::random().produce();
2136 let broadcast1 = Broadcast::new().produce();
2137 origin.publish_broadcast("test", broadcast1.consume());
2138
2139 let mut consumer = origin.consume();
2140 consumer.assert_next("test", &broadcast1.consume());
2141
2142 drop(broadcast1);
2143 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2144
2145 let broadcast2 = Broadcast::new().produce();
2146 origin.publish_broadcast("test", broadcast2.consume());
2147 drop(broadcast2);
2148 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2149
2150 consumer.assert_next_none("test");
2151 consumer.assert_next_wait();
2152 }
2153
2154 #[tokio::test]
2155 async fn test_coalesce_churn_bounded() {
2156 tokio::time::pause();
2161
2162 let origin = Origin::random().produce();
2163 let mut consumer = origin.consume();
2164
2165 for _ in 0..1000 {
2166 let broadcast = Broadcast::new().produce();
2167 origin.publish_broadcast("test", broadcast.consume());
2168 drop(broadcast);
2169 }
2170 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2171
2172 let mut collected = Vec::new();
2173 while let Some(update) = consumer.try_announced() {
2174 collected.push(update);
2175 }
2176 assert!(
2177 collected.len() <= 1,
2178 "expected at most one pending update, got {}",
2179 collected.len()
2180 );
2181 assert!(
2182 collected.iter().all(|(path, _)| path == &Path::new("test")),
2183 "unexpected path in pending updates",
2184 );
2185 }
2186}