1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5 task::{Poll, ready},
6};
7
8use rand::RngExt;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn replace_first(&mut self, target: Origin, replacement: Origin) -> bool {
141 for entry in &mut self.0 {
142 if *entry == target {
143 *entry = replacement;
144 return true;
145 }
146 }
147 false
148 }
149
150 pub fn contains(&self, origin: &Origin) -> bool {
152 self.0.contains(origin)
153 }
154
155 pub fn len(&self) -> usize {
157 self.0.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.0.is_empty()
163 }
164
165 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
167 self.0.iter()
168 }
169
170 pub fn as_slice(&self) -> &[Origin] {
172 &self.0
173 }
174}
175
176impl TryFrom<Vec<Origin>> for OriginList {
177 type Error = TooManyOrigins;
178
179 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
180 if v.len() > MAX_HOPS {
181 return Err(TooManyOrigins);
182 }
183 Ok(Self(v))
184 }
185}
186
187impl<'a> IntoIterator for &'a OriginList {
188 type Item = &'a Origin;
189 type IntoIter = std::slice::Iter<'a, Origin>;
190
191 fn into_iter(self) -> Self::IntoIter {
192 self.iter()
193 }
194}
195
196impl<V: Copy> Encode<V> for OriginList
197where
198 u64: Encode<V>,
199 Origin: Encode<V>,
200{
201 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
202 (self.0.len() as u64).encode(w, version)?;
203 for origin in &self.0 {
204 origin.encode(w, version)?;
205 }
206 Ok(())
207 }
208}
209
210impl<V: Copy> Decode<V> for OriginList
211where
212 u64: Decode<V>,
213 Origin: Decode<V>,
214{
215 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
216 let count = u64::decode(r, version)? as usize;
217 if count > MAX_HOPS {
218 return Err(DecodeError::BoundsExceeded);
219 }
220 let mut list = Vec::with_capacity(count);
221 for _ in 0..count {
222 list.push(Origin::decode(r, version)?);
223 }
224 Ok(Self(list))
225 }
226}
227
228static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231struct ConsumerId(u64);
232
233impl ConsumerId {
234 fn new() -> Self {
235 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
236 }
237}
238
239struct OriginBroadcast {
241 path: PathOwned,
242 active: BroadcastConsumer,
243 backup: VecDeque<BroadcastConsumer>,
244}
245
246fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
254 const SEED: u64 = 0x420C0DECB00B; const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
261
262 let mut hash = SEED;
263 for &byte in name.as_str().as_bytes() {
264 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
265 }
266 for hop in hops {
267 for &byte in &hop.id.to_le_bytes() {
268 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
269 }
270 }
271
272 (hops.len(), hash)
273}
274
275enum PendingUpdate {
284 Announce(BroadcastConsumer),
285 Unannounce,
286 UnannounceAnnounce(BroadcastConsumer),
287}
288
289#[derive(Default)]
294struct OriginConsumerState {
295 pending: BTreeMap<PathOwned, PendingUpdate>,
296}
297
298impl OriginConsumerState {
299 fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
300 let new = match self.pending.remove(&path) {
301 None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
303 Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
305 PendingUpdate::UnannounceAnnounce(broadcast)
306 }
307 };
308 self.pending.insert(path, new);
309 }
310
311 fn apply_unannounce(&mut self, path: PathOwned) {
312 match self.pending.remove(&path) {
313 Some(PendingUpdate::Announce(_)) => {}
315 None | Some(PendingUpdate::Unannounce) => {
316 self.pending.insert(path, PendingUpdate::Unannounce);
317 }
318 Some(PendingUpdate::UnannounceAnnounce(_)) => {
321 self.pending.insert(path, PendingUpdate::Unannounce);
322 }
323 }
324 }
325
326 fn take(&mut self) -> Option<OriginAnnounce> {
328 let path = self.pending.keys().next()?.clone();
329 match self.pending.remove(&path).unwrap() {
330 PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
331 PendingUpdate::Unannounce => Some((path, None)),
332 PendingUpdate::UnannounceAnnounce(broadcast) => {
333 self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
336 Some((path, None))
337 }
338 }
339 }
340}
341
342#[derive(Clone)]
343struct OriginConsumerNotify {
344 root: PathOwned,
345 state: kio::Producer<OriginConsumerState>,
346}
347
348impl OriginConsumerNotify {
349 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
350 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
351 self.state
352 .write()
353 .ok()
354 .expect("consumer closed")
355 .apply_announce(path, broadcast);
356 }
357
358 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
359 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
360 let mut state = self.state.write().ok().expect("consumer closed");
361 state.apply_unannounce(path.clone());
362 state.apply_announce(path, broadcast);
363 }
364
365 fn unannounce(&self, path: impl AsPath) {
366 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
367 self.state.write().ok().expect("consumer closed").apply_unannounce(path);
368 }
369}
370
371struct NotifyNode {
372 parent: Option<Lock<NotifyNode>>,
373
374 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
377}
378
379impl NotifyNode {
380 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
381 Self {
382 parent,
383 consumers: HashMap::new(),
384 }
385 }
386
387 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
388 for consumer in self.consumers.values() {
389 consumer.announce(path.as_path(), broadcast.clone());
390 }
391
392 if let Some(parent) = &self.parent {
393 parent.lock().announce(path, broadcast);
394 }
395 }
396
397 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
398 for consumer in self.consumers.values() {
399 consumer.reannounce(path.as_path(), broadcast.clone());
400 }
401
402 if let Some(parent) = &self.parent {
403 parent.lock().reannounce(path, broadcast);
404 }
405 }
406
407 fn unannounce(&mut self, path: impl AsPath) {
408 for consumer in self.consumers.values() {
409 consumer.unannounce(path.as_path());
410 }
411
412 if let Some(parent) = &self.parent {
413 parent.lock().unannounce(path);
414 }
415 }
416}
417
418struct OriginNode {
419 broadcast: Option<OriginBroadcast>,
421
422 nested: HashMap<String, Lock<OriginNode>>,
424
425 notify: Lock<NotifyNode>,
427}
428
429impl OriginNode {
430 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
431 Self {
432 broadcast: None,
433 nested: HashMap::new(),
434 notify: Lock::new(NotifyNode::new(parent)),
435 }
436 }
437
438 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
439 let (dir, rest) = path.next_part().expect("leaf called with empty path");
440
441 let next = self.entry(dir);
442 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
443 }
444
445 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
446 match self.nested.get(dir) {
447 Some(next) => next.clone(),
448 None => {
449 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
450 self.nested.insert(dir.to_string(), next.clone());
451 next
452 }
453 }
454 }
455
456 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
457 let full = full.as_path();
458 let rest = relative.as_path();
459
460 if let Some((dir, relative)) = rest.next_part() {
462 self.entry(dir).lock().publish(&full, broadcast, &relative);
464 } else if let Some(existing) = &mut self.broadcast {
465 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
473 return;
474 }
475
476 if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
477 let old = existing.active.clone();
478 existing.active = broadcast.clone();
479 existing.backup.push_back(old);
480
481 self.notify.lock().reannounce(full, broadcast);
482 } else {
483 existing.backup.push_back(broadcast.clone());
486 }
487 } else {
488 self.broadcast = Some(OriginBroadcast {
490 path: full.to_owned(),
491 active: broadcast.clone(),
492 backup: VecDeque::new(),
493 });
494 self.notify.lock().announce(full, broadcast);
495 }
496 }
497
498 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
499 self.consume_initial(&mut notify);
500 self.notify.lock().consumers.insert(id, notify);
501 }
502
503 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
504 if let Some(broadcast) = &self.broadcast {
505 notify.announce(&broadcast.path, broadcast.active.clone());
506 }
507
508 for nested in self.nested.values() {
510 nested.lock().consume_initial(notify);
511 }
512 }
513
514 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
515 let rest = rest.as_path();
516
517 if let Some((dir, rest)) = rest.next_part() {
518 let node = self.nested.get(dir)?.lock();
519 node.consume_broadcast(&rest)
520 } else {
521 self.broadcast.as_ref().map(|b| b.active.clone())
522 }
523 }
524
525 fn unconsume(&mut self, id: ConsumerId) {
526 self.notify.lock().consumers.remove(&id).expect("consumer not found");
527 if self.is_empty() {
528 }
531 }
532
533 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
535 let full = full.as_path();
536 let relative = relative.as_path();
537
538 if let Some((dir, relative)) = relative.next_part() {
539 let nested = self.entry(dir);
540 let mut locked = nested.lock();
541 locked.remove(&full, broadcast, &relative);
542
543 if locked.is_empty() {
544 drop(locked);
545 self.nested.remove(dir);
546 }
547 } else {
548 let entry = match &mut self.broadcast {
549 Some(existing) => existing,
550 None => return,
551 };
552
553 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
555 if let Some(pos) = pos {
556 entry.backup.remove(pos);
557 return;
559 }
560
561 assert!(entry.active.is_clone(&broadcast));
563
564 let best = entry
567 .backup
568 .iter()
569 .enumerate()
570 .min_by_key(|(_, b)| route_key(&full, &b.hops))
571 .map(|(i, _)| i);
572 if let Some(idx) = best {
573 let active = entry.backup.remove(idx).expect("index in range");
574 entry.active = active;
575 self.notify.lock().reannounce(full, &entry.active);
576 } else {
577 self.broadcast = None;
579 self.notify.lock().unannounce(full);
580 }
581 }
582 }
583
584 fn is_empty(&self) -> bool {
585 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
586 }
587}
588
589#[derive(Clone)]
590struct OriginNodes {
591 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
592}
593
594impl OriginNodes {
595 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
598 let mut roots = Vec::new();
599
600 for (root, state) in &self.nodes {
601 for prefix in prefixes {
602 if root.has_prefix(prefix) {
603 roots.push((root.to_owned(), state.clone()));
605 continue;
606 }
607
608 if let Some(suffix) = prefix.strip_prefix(root) {
609 let nested = state.lock().leaf(&suffix);
611 roots.push((prefix.to_owned(), nested));
612 }
613 }
614 }
615
616 if roots.is_empty() {
617 None
618 } else {
619 Some(Self { nodes: roots })
620 }
621 }
622
623 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
624 let new_root = new_root.as_path();
625 let mut roots = Vec::new();
626
627 if new_root.is_empty() {
628 return Some(self.clone());
629 }
630
631 for (root, state) in &self.nodes {
632 if let Some(suffix) = root.strip_prefix(&new_root) {
633 roots.push((suffix.to_owned(), state.clone()));
635 } else if let Some(suffix) = new_root.strip_prefix(root) {
636 let nested = state.lock().leaf(&suffix);
639 roots.push(("".into(), nested));
640 }
641 }
642
643 if roots.is_empty() {
644 None
645 } else {
646 Some(Self { nodes: roots })
647 }
648 }
649
650 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
652 let path = path.as_path();
653
654 for (root, state) in &self.nodes {
655 if let Some(suffix) = path.strip_prefix(root) {
656 return Some((state.clone(), suffix.to_owned()));
657 }
658 }
659
660 None
661 }
662}
663
664impl Default for OriginNodes {
665 fn default() -> Self {
666 Self {
667 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
668 }
669 }
670}
671
672pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
674
675#[derive(Clone)]
677pub struct OriginProducer {
678 info: Origin,
682
683 nodes: OriginNodes,
686
687 root: PathOwned,
689}
690
691impl std::ops::Deref for OriginProducer {
692 type Target = Origin;
693
694 fn deref(&self) -> &Self::Target {
695 &self.info
696 }
697}
698
699impl OriginProducer {
700 pub fn new(info: Origin) -> Self {
703 Self {
704 info,
705 nodes: OriginNodes::default(),
706 root: PathOwned::default(),
707 }
708 }
709
710 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
715 let broadcast = Broadcast::new().produce();
716 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
717 }
718
719 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
731 let path = path.as_path();
732
733 if broadcast.hops.contains(&self.info) {
735 return false;
736 }
737
738 let (root, rest) = match self.nodes.get(&path) {
739 Some(root) => root,
740 None => return false,
741 };
742
743 let full = self.root.join(&path);
744
745 root.lock().publish(&full, &broadcast, &rest);
746 let root = root.clone();
747
748 web_async::spawn(async move {
749 broadcast.closed().await;
750 root.lock().remove(&full, broadcast, &rest);
751 });
752
753 true
754 }
755
756 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
762 let prefixes = PathPrefixes::new(prefixes);
763 Some(OriginProducer {
764 info: self.info,
765 nodes: self.nodes.select(&prefixes)?,
766 root: self.root.clone(),
767 })
768 }
769
770 pub fn consume(&self) -> OriginConsumer {
772 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
773 }
774
775 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
780 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
781 let path = path.as_path();
782 let (root, rest) = self.nodes.get(&path)?;
783 let state = root.lock();
784 state.consume_broadcast(&rest)
785 }
786
787 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
792 let prefix = prefix.as_path();
793
794 Some(Self {
795 info: self.info,
796 root: self.root.join(&prefix).to_owned(),
797 nodes: self.nodes.root(&prefix)?,
798 })
799 }
800
801 pub fn root(&self) -> &Path<'_> {
803 &self.root
804 }
805
806 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
809 self.nodes.nodes.iter().map(|(root, _)| root)
810 }
811
812 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
814 self.root.join(path)
815 }
816}
817
818pub struct OriginConsumer {
822 id: ConsumerId,
823 info: Origin,
825 nodes: OriginNodes,
826
827 state: kio::Producer<OriginConsumerState>,
830
831 root: PathOwned,
833}
834
835impl std::ops::Deref for OriginConsumer {
836 type Target = Origin;
837
838 fn deref(&self) -> &Self::Target {
839 &self.info
840 }
841}
842
843impl OriginConsumer {
844 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
845 let state = kio::Producer::<OriginConsumerState>::default();
846 let id = ConsumerId::new();
847
848 for (_, node) in &nodes.nodes {
849 let notify = OriginConsumerNotify {
850 root: root.clone(),
851 state: state.clone(),
852 };
853 node.lock().consume(id, notify);
854 }
855
856 Self {
857 id,
858 info,
859 nodes,
860 state,
861 root,
862 }
863 }
864
865 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
873 kio::wait(|waiter| self.poll_announced(waiter)).await
874 }
875
876 pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
882 let mut state = match ready!(self.state.poll(waiter, |state| {
883 if state.pending.is_empty() {
884 Poll::Pending
885 } else {
886 Poll::Ready(())
887 }
888 })) {
889 Ok(state) => state,
890 Err(_) => return Poll::Ready(None),
892 };
893 Poll::Ready(Some(state.take().expect("predicate guaranteed an update")))
894 }
895
896 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
901 self.state.write().ok()?.take()
902 }
903
904 pub fn consume(&self) -> Self {
906 self.clone()
907 }
908
909 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
917 let path = path.as_path();
918 let (root, rest) = self.nodes.get(&path)?;
919 let state = root.lock();
920 state.consume_broadcast(&rest)
921 }
922
923 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
932 let path = path.as_path();
933
934 let mut consumer = self.scope(std::slice::from_ref(&path))?;
936
937 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
941 return None;
942 }
943
944 loop {
945 let (announced, broadcast) = consumer.announced().await?;
946 if announced.as_path() == path {
948 if let Some(broadcast) = broadcast {
949 return Some(broadcast);
950 }
951 }
952 }
953 }
954
955 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
961 let prefixes = PathPrefixes::new(prefixes);
962 Some(OriginConsumer::new(
963 self.info,
964 self.root.clone(),
965 self.nodes.select(&prefixes)?,
966 ))
967 }
968
969 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
974 let prefix = prefix.as_path();
975
976 Some(Self::new(
977 self.info,
978 self.root.join(&prefix).to_owned(),
979 self.nodes.root(&prefix)?,
980 ))
981 }
982
983 pub fn root(&self) -> &Path<'_> {
985 &self.root
986 }
987
988 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
991 self.nodes.nodes.iter().map(|(root, _)| root)
992 }
993
994 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
996 self.root.join(path)
997 }
998}
999
1000impl Drop for OriginConsumer {
1001 fn drop(&mut self) {
1002 for (_, root) in &self.nodes.nodes {
1003 root.lock().unconsume(self.id);
1004 }
1005 }
1006}
1007
1008impl Clone for OriginConsumer {
1009 fn clone(&self) -> Self {
1010 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
1011 }
1012}
1013
1014#[cfg(test)]
1015use futures::FutureExt;
1016
1017#[cfg(test)]
1018impl OriginConsumer {
1019 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1020 let expected = expected.as_path();
1021 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1022 assert_eq!(path, expected, "wrong path");
1023 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1024 }
1025
1026 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1027 let expected = expected.as_path();
1028 let (path, active) = self.try_announced().expect("no next");
1029 assert_eq!(path, expected, "wrong path");
1030 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1031 }
1032
1033 pub fn assert_next_none(&mut self, expected: impl AsPath) {
1034 let expected = expected.as_path();
1035 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1036 assert_eq!(path, expected, "wrong path");
1037 assert!(active.is_none(), "should be unannounced");
1038 }
1039
1040 pub fn assert_next_wait(&mut self) {
1041 if let Some(res) = self.announced().now_or_never() {
1042 panic!("next should block: got {:?}", res.map(|(path, _)| path));
1043 }
1044 }
1045
1046 }
1055
1056#[cfg(test)]
1057mod tests {
1058 use crate::Broadcast;
1059
1060 use super::*;
1061
1062 #[test]
1063 fn origin_list_push_fails_at_limit() {
1064 let mut list = OriginList::new();
1065 for _ in 0..MAX_HOPS {
1066 list.push(Origin::random()).unwrap();
1067 }
1068 assert_eq!(list.len(), MAX_HOPS);
1069 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1070 }
1071
1072 #[test]
1073 fn origin_list_replace_first() {
1074 let mut list = OriginList::new();
1075 for _ in 0..3 {
1076 list.push(Origin::UNKNOWN).unwrap();
1077 }
1078
1079 assert!(list.replace_first(Origin::UNKNOWN, Origin::from(7)));
1081 assert_eq!(list.as_slice(), &[Origin::from(7), Origin::UNKNOWN, Origin::UNKNOWN]);
1082
1083 assert!(!list.replace_first(Origin::from(99), Origin::from(8)));
1085 assert_eq!(list.len(), 3);
1086 }
1087
1088 #[test]
1089 fn origin_list_try_from_vec_enforces_limit() {
1090 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1091 assert!(OriginList::try_from(under).is_ok());
1092
1093 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1094 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1095 }
1096
1097 #[tokio::test]
1098 async fn test_announce() {
1099 tokio::time::pause();
1100
1101 let origin = Origin::random().produce();
1102 let broadcast1 = Broadcast::new().produce();
1103 let broadcast2 = Broadcast::new().produce();
1104
1105 let mut consumer1 = origin.consume();
1106 consumer1.assert_next_wait();
1108
1109 origin.publish_broadcast("test1", broadcast1.consume());
1111
1112 consumer1.assert_next("test1", &broadcast1.consume());
1113 consumer1.assert_next_wait();
1114
1115 let mut consumer2 = origin.consume();
1118
1119 origin.publish_broadcast("test2", broadcast2.consume());
1121
1122 consumer1.assert_next("test2", &broadcast2.consume());
1123 consumer1.assert_next_wait();
1124
1125 consumer2.assert_next("test1", &broadcast1.consume());
1126 consumer2.assert_next("test2", &broadcast2.consume());
1127 consumer2.assert_next_wait();
1128
1129 drop(broadcast1);
1131
1132 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1134
1135 consumer1.assert_next_none("test1");
1137 consumer2.assert_next_none("test1");
1138 consumer1.assert_next_wait();
1139 consumer2.assert_next_wait();
1140
1141 let mut consumer3 = origin.consume();
1143 consumer3.assert_next("test2", &broadcast2.consume());
1144 consumer3.assert_next_wait();
1145
1146 drop(broadcast2);
1148
1149 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1151
1152 consumer1.assert_next_none("test2");
1153 consumer2.assert_next_none("test2");
1154 consumer3.assert_next_none("test2");
1155
1156 }
1162
1163 #[tokio::test]
1164 async fn test_duplicate() {
1165 tokio::time::pause();
1166
1167 let origin = Origin::random().produce();
1168
1169 let broadcast1 = Broadcast::new().produce();
1170 let broadcast2 = Broadcast::new().produce();
1171 let broadcast3 = Broadcast::new().produce();
1172
1173 let consumer1 = broadcast1.consume();
1174 let consumer2 = broadcast2.consume();
1175 let consumer3 = broadcast3.consume();
1176
1177 let mut consumer = origin.consume();
1178
1179 origin.publish_broadcast("test", consumer1.clone());
1180 origin.publish_broadcast("test", consumer2.clone());
1181 origin.publish_broadcast("test", consumer3.clone());
1182 assert!(consumer.get_broadcast("test").is_some());
1183
1184 consumer.assert_next("test", &consumer1);
1187 consumer.assert_next_wait();
1188
1189 drop(broadcast2);
1191
1192 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1194
1195 assert!(consumer.get_broadcast("test").is_some());
1196 consumer.assert_next_wait();
1197
1198 drop(broadcast1);
1200
1201 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1203
1204 assert!(consumer.get_broadcast("test").is_some());
1205 consumer.assert_next_none("test");
1206 consumer.assert_next("test", &consumer3);
1207
1208 drop(broadcast3);
1210
1211 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213 assert!(consumer.get_broadcast("test").is_none());
1214
1215 consumer.assert_next_none("test");
1216 consumer.assert_next_wait();
1217 }
1218
1219 #[tokio::test]
1220 async fn test_duplicate_reverse() {
1221 tokio::time::pause();
1222
1223 let origin = Origin::random().produce();
1224 let broadcast1 = Broadcast::new().produce();
1225 let broadcast2 = Broadcast::new().produce();
1226
1227 origin.publish_broadcast("test", broadcast1.consume());
1228 origin.publish_broadcast("test", broadcast2.consume());
1229 assert!(origin.consume().get_broadcast("test").is_some());
1230
1231 drop(broadcast2);
1233
1234 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1236 assert!(origin.consume().get_broadcast("test").is_some());
1237
1238 drop(broadcast1);
1239
1240 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1242 assert!(origin.consume().get_broadcast("test").is_none());
1243 }
1244
1245 #[tokio::test]
1246 async fn test_deterministic_tiebreak() {
1247 tokio::time::pause();
1248
1249 fn route(ids: &[u64]) -> BroadcastProducer {
1251 let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1252 Broadcast { hops }.produce()
1253 }
1254
1255 fn winner(first: &[u64], second: &[u64]) -> OriginList {
1257 let origin = Origin::random().produce();
1258 let a = route(first);
1259 let b = route(second);
1260 origin.publish_broadcast("test", a.consume());
1261 origin.publish_broadcast("test", b.consume());
1262 let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1263 drop((a, b));
1265 hops
1266 }
1267
1268 let forward = winner(&[10, 20], &[30, 40]);
1271 let reverse = winner(&[30, 40], &[10, 20]);
1272 assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1273
1274 assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1276 assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1277 }
1278
1279 #[tokio::test]
1280 async fn test_double_publish() {
1281 tokio::time::pause();
1282
1283 let origin = Origin::random().produce();
1284 let broadcast = Broadcast::new().produce();
1285
1286 origin.publish_broadcast("test", broadcast.consume());
1288 origin.publish_broadcast("test", broadcast.consume());
1289
1290 assert!(origin.consume().get_broadcast("test").is_some());
1291
1292 drop(broadcast);
1293
1294 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1296 assert!(origin.consume().get_broadcast("test").is_none());
1297 }
1298 #[tokio::test]
1303 async fn test_many_announces() {
1304 let origin = Origin::random().produce();
1305 let broadcast = Broadcast::new().produce();
1306
1307 let mut consumer = origin.consume();
1308 for i in 0..256 {
1309 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1310 }
1311
1312 for i in 0..256 {
1313 consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1314 }
1315 consumer.assert_next_wait();
1316 }
1317
1318 #[tokio::test]
1319 async fn test_many_announces_try() {
1320 let origin = Origin::random().produce();
1321 let broadcast = Broadcast::new().produce();
1322
1323 let mut consumer = origin.consume();
1324 for i in 0..256 {
1325 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1326 }
1327
1328 for i in 0..256 {
1329 consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1330 }
1331 }
1332
1333 #[tokio::test]
1334 async fn test_with_root_basic() {
1335 let origin = Origin::random().produce();
1336 let broadcast = Broadcast::new().produce();
1337
1338 let foo_producer = origin.with_root("foo").expect("should create root");
1340 assert_eq!(foo_producer.root().as_str(), "foo");
1341
1342 let mut consumer = origin.consume();
1343
1344 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1346 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1348
1349 let mut foo_consumer = foo_producer.consume();
1351 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1352 }
1353
1354 #[tokio::test]
1355 async fn test_with_root_nested() {
1356 let origin = Origin::random().produce();
1357 let broadcast = Broadcast::new().produce();
1358
1359 let foo_producer = origin.with_root("foo").expect("should create foo root");
1361 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1362 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1363
1364 let mut consumer = origin.consume();
1365
1366 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1368 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1370
1371 let mut foo_bar_consumer = foo_bar_producer.consume();
1373 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1374 }
1375
1376 #[tokio::test]
1377 async fn test_publish_scope_allows() {
1378 let origin = Origin::random().produce();
1379 let broadcast = Broadcast::new().produce();
1380
1381 let limited_producer = origin
1383 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1384 .expect("should create limited producer");
1385
1386 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1388 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1389 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1390
1391 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1393 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1395 }
1396
1397 #[tokio::test]
1398 async fn test_publish_scope_empty() {
1399 let origin = Origin::random().produce();
1400
1401 assert!(origin.scope(&[]).is_none());
1403 }
1404
1405 #[tokio::test]
1406 async fn test_consume_scope_filters() {
1407 let origin = Origin::random().produce();
1408 let broadcast1 = Broadcast::new().produce();
1409 let broadcast2 = Broadcast::new().produce();
1410 let broadcast3 = Broadcast::new().produce();
1411
1412 let mut consumer = origin.consume();
1413
1414 origin.publish_broadcast("allowed", broadcast1.consume());
1416 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1417 origin.publish_broadcast("notallowed", broadcast3.consume());
1418
1419 let mut limited_consumer = origin
1421 .consume()
1422 .scope(&["allowed".into()])
1423 .expect("should create limited consumer");
1424
1425 limited_consumer.assert_next("allowed", &broadcast1.consume());
1427 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1428 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1432 consumer.assert_next("allowed/nested", &broadcast2.consume());
1433 consumer.assert_next("notallowed", &broadcast3.consume());
1434 }
1435
1436 #[tokio::test]
1437 async fn test_consume_scope_multiple_prefixes() {
1438 let origin = Origin::random().produce();
1439 let broadcast1 = Broadcast::new().produce();
1440 let broadcast2 = Broadcast::new().produce();
1441 let broadcast3 = Broadcast::new().produce();
1442
1443 origin.publish_broadcast("foo/test", broadcast1.consume());
1444 origin.publish_broadcast("bar/test", broadcast2.consume());
1445 origin.publish_broadcast("baz/test", broadcast3.consume());
1446
1447 let mut limited_consumer = origin
1449 .consume()
1450 .scope(&["foo".into(), "bar".into()])
1451 .expect("should create limited consumer");
1452
1453 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1455 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1456 limited_consumer.assert_next_wait(); }
1458
1459 #[tokio::test]
1460 async fn test_with_root_and_publish_scope() {
1461 let origin = Origin::random().produce();
1462 let broadcast = Broadcast::new().produce();
1463
1464 let foo_producer = origin.with_root("foo").expect("should create foo root");
1466
1467 let limited_producer = foo_producer
1469 .scope(&["bar".into(), "goop/pee".into()])
1470 .expect("should create limited producer");
1471
1472 let mut consumer = origin.consume();
1473
1474 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1476 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1477 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1478 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1479
1480 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1482 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1484
1485 consumer.assert_next("foo/bar", &broadcast.consume());
1487 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1488 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1489 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1490 }
1491
1492 #[tokio::test]
1493 async fn test_with_root_and_consume_scope() {
1494 let origin = Origin::random().produce();
1495 let broadcast1 = Broadcast::new().produce();
1496 let broadcast2 = Broadcast::new().produce();
1497 let broadcast3 = Broadcast::new().produce();
1498
1499 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1501 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1502 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1503
1504 let foo_producer = origin.with_root("foo").expect("should create foo root");
1506
1507 let mut limited_consumer = foo_producer
1509 .consume()
1510 .scope(&["bar".into(), "goop/pee".into()])
1511 .expect("should create limited consumer");
1512
1513 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1515 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1516 limited_consumer.assert_next_wait(); }
1518
1519 #[tokio::test]
1520 async fn test_with_root_unauthorized() {
1521 let origin = Origin::random().produce();
1522
1523 let limited_producer = origin
1525 .scope(&["allowed".into()])
1526 .expect("should create limited producer");
1527
1528 assert!(limited_producer.with_root("notallowed").is_none());
1530
1531 let allowed_root = limited_producer
1533 .with_root("allowed")
1534 .expect("should create allowed root");
1535 assert_eq!(allowed_root.root().as_str(), "allowed");
1536 }
1537
1538 #[tokio::test]
1539 async fn test_wildcard_permission() {
1540 let origin = Origin::random().produce();
1541 let broadcast = Broadcast::new().produce();
1542
1543 let root_producer = origin.clone();
1545
1546 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1548 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1549
1550 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1552 assert_eq!(foo_producer.root().as_str(), "foo");
1553 }
1554
1555 #[tokio::test]
1556 async fn test_consume_broadcast_with_permissions() {
1557 let origin = Origin::random().produce();
1558 let broadcast1 = Broadcast::new().produce();
1559 let broadcast2 = Broadcast::new().produce();
1560
1561 origin.publish_broadcast("allowed/test", broadcast1.consume());
1562 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1563
1564 let limited_consumer = origin
1566 .consume()
1567 .scope(&["allowed".into()])
1568 .expect("should create limited consumer");
1569
1570 let result = limited_consumer.get_broadcast("allowed/test");
1572 assert!(result.is_some());
1573 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1574
1575 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1577
1578 let consumer = origin.consume();
1580 assert!(consumer.get_broadcast("allowed/test").is_some());
1581 assert!(consumer.get_broadcast("notallowed/test").is_some());
1582 }
1583
1584 #[tokio::test]
1585 async fn test_nested_paths_with_permissions() {
1586 let origin = Origin::random().produce();
1587 let broadcast = Broadcast::new().produce();
1588
1589 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1591
1592 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1594 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1595 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1596
1597 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1599 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1600 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1601 }
1602
1603 #[tokio::test]
1604 async fn test_multiple_consumers_with_different_permissions() {
1605 let origin = Origin::random().produce();
1606 let broadcast1 = Broadcast::new().produce();
1607 let broadcast2 = Broadcast::new().produce();
1608 let broadcast3 = Broadcast::new().produce();
1609
1610 origin.publish_broadcast("foo/test", broadcast1.consume());
1612 origin.publish_broadcast("bar/test", broadcast2.consume());
1613 origin.publish_broadcast("baz/test", broadcast3.consume());
1614
1615 let mut foo_consumer = origin
1617 .consume()
1618 .scope(&["foo".into()])
1619 .expect("should create foo consumer");
1620
1621 let mut bar_consumer = origin
1622 .consume()
1623 .scope(&["bar".into()])
1624 .expect("should create bar consumer");
1625
1626 let mut foobar_consumer = origin
1627 .consume()
1628 .scope(&["foo".into(), "bar".into()])
1629 .expect("should create foobar consumer");
1630
1631 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1633 foo_consumer.assert_next_wait();
1634
1635 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1636 bar_consumer.assert_next_wait();
1637
1638 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1639 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1640 foobar_consumer.assert_next_wait();
1641 }
1642
1643 #[tokio::test]
1644 async fn test_select_with_empty_prefix() {
1645 let origin = Origin::random().produce();
1646 let broadcast1 = Broadcast::new().produce();
1647 let broadcast2 = Broadcast::new().produce();
1648
1649 let demo_producer = origin.with_root("demo").expect("should create demo root");
1651 let limited_producer = demo_producer
1652 .scope(&["worm-node".into(), "foobar".into()])
1653 .expect("should create limited producer");
1654
1655 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1657 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1658
1659 let mut consumer = limited_producer
1661 .consume()
1662 .scope(&["".into()])
1663 .expect("should create consumer with empty prefix");
1664
1665 let a1 = consumer.try_announced().expect("expected first announcement");
1667 let a2 = consumer.try_announced().expect("expected second announcement");
1668 consumer.assert_next_wait();
1669
1670 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1671 paths.sort();
1672 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1673 }
1674
1675 #[tokio::test]
1676 async fn test_select_narrowing_scope() {
1677 let origin = Origin::random().produce();
1678 let broadcast1 = Broadcast::new().produce();
1679 let broadcast2 = Broadcast::new().produce();
1680 let broadcast3 = Broadcast::new().produce();
1681
1682 let demo_producer = origin.with_root("demo").expect("should create demo root");
1684 let limited_producer = demo_producer
1685 .scope(&["worm-node".into(), "foobar".into()])
1686 .expect("should create limited producer");
1687
1688 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1690 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1691 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1692
1693 let mut worm_consumer = limited_producer
1695 .consume()
1696 .scope(&["worm-node".into()])
1697 .expect("should create worm-node consumer");
1698
1699 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1701 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1702 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1706 .consume()
1707 .scope(&["worm-node/foo".into()])
1708 .expect("should create worm-node/foo consumer");
1709
1710 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1711 foo_consumer.assert_next_wait(); }
1713
1714 #[tokio::test]
1715 async fn test_select_multiple_roots_with_empty_prefix() {
1716 let origin = Origin::random().produce();
1717 let broadcast1 = Broadcast::new().produce();
1718 let broadcast2 = Broadcast::new().produce();
1719 let broadcast3 = Broadcast::new().produce();
1720
1721 let limited_producer = origin
1723 .scope(&["app1".into(), "app2".into(), "shared".into()])
1724 .expect("should create limited producer");
1725
1726 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1728 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1729 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1730
1731 let mut consumer = limited_producer
1733 .consume()
1734 .scope(&["".into()])
1735 .expect("should create consumer with empty prefix");
1736
1737 consumer.assert_next("app1/data", &broadcast1.consume());
1739 consumer.assert_next("app2/config", &broadcast2.consume());
1740 consumer.assert_next("shared/resource", &broadcast3.consume());
1741 consumer.assert_next_wait();
1742 }
1743
1744 #[tokio::test]
1745 async fn test_publish_scope_with_empty_prefix() {
1746 let origin = Origin::random().produce();
1747 let broadcast = Broadcast::new().produce();
1748
1749 let limited_producer = origin
1751 .scope(&["services/api".into(), "services/web".into()])
1752 .expect("should create limited producer");
1753
1754 let same_producer = limited_producer
1756 .scope(&["".into()])
1757 .expect("should create producer with empty prefix");
1758
1759 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1761 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1762 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1763 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1764 }
1765
1766 #[tokio::test]
1767 async fn test_select_narrowing_to_deeper_path() {
1768 let origin = Origin::random().produce();
1769 let broadcast1 = Broadcast::new().produce();
1770 let broadcast2 = Broadcast::new().produce();
1771 let broadcast3 = Broadcast::new().produce();
1772
1773 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1775
1776 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1778 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1779 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1780
1781 let mut team2_consumer = limited_producer
1783 .consume()
1784 .scope(&["org/team2".into()])
1785 .expect("should create team2 consumer");
1786
1787 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1788 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1792 .consume()
1793 .scope(&["org/team1/project1".into()])
1794 .expect("should create project1 consumer");
1795
1796 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1798 project1_consumer.assert_next_wait();
1799 }
1800
1801 #[tokio::test]
1802 async fn test_select_with_non_matching_prefix() {
1803 let origin = Origin::random().produce();
1804
1805 let limited_producer = origin
1807 .scope(&["allowed/path".into()])
1808 .expect("should create limited producer");
1809
1810 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1812
1813 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1815 }
1816
1817 #[tokio::test]
1820 async fn test_with_root_trailing_slash_consumer() {
1821 let origin = Origin::random().produce();
1822
1823 let prefix = "some_prefix/".to_string();
1825 let mut consumer = origin.consume().with_root(prefix).unwrap();
1826
1827 let b = origin.create_broadcast("some_prefix/test").unwrap();
1828 consumer.assert_next("test", &b.consume());
1829 }
1830
1831 #[tokio::test]
1833 async fn test_with_root_trailing_slash_producer() {
1834 let origin = Origin::random().produce();
1835
1836 let prefix = "some_prefix/".to_string();
1838 let rooted = origin.with_root(prefix).unwrap();
1839
1840 let b = rooted.create_broadcast("test").unwrap();
1841
1842 let mut consumer = rooted.consume();
1843 consumer.assert_next("test", &b.consume());
1844 }
1845
1846 #[tokio::test]
1848 async fn test_with_root_trailing_slash_unannounce() {
1849 tokio::time::pause();
1850
1851 let origin = Origin::random().produce();
1852
1853 let prefix = "some_prefix/".to_string();
1854 let mut consumer = origin.consume().with_root(prefix).unwrap();
1855
1856 let b = origin.create_broadcast("some_prefix/test").unwrap();
1857 consumer.assert_next("test", &b.consume());
1858
1859 drop(b);
1861 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1862
1863 consumer.assert_next_none("test");
1865 }
1866
1867 #[tokio::test]
1868 async fn test_select_maintains_access_with_wider_prefix() {
1869 let origin = Origin::random().produce();
1870 let broadcast1 = Broadcast::new().produce();
1871 let broadcast2 = Broadcast::new().produce();
1872
1873 let demo_producer = origin.with_root("demo").expect("should create demo root");
1875 let user_producer = demo_producer
1876 .scope(&["worm-node".into(), "foobar".into()])
1877 .expect("should create user producer");
1878
1879 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1881 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1882
1883 let mut consumer = user_producer
1885 .consume()
1886 .scope(&["".into()])
1887 .expect("scope with empty prefix should not fail when user has specific permissions");
1888
1889 let a1 = consumer.try_announced().expect("expected first announcement");
1891 let a2 = consumer.try_announced().expect("expected second announcement");
1892 consumer.assert_next_wait();
1893
1894 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1895 paths.sort();
1896 assert_eq!(paths, ["foobar", "worm-node/data"]);
1897
1898 let mut narrow_consumer = user_producer
1900 .consume()
1901 .scope(&["worm-node".into()])
1902 .expect("should be able to narrow scope to worm-node");
1903
1904 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1905 narrow_consumer.assert_next_wait(); }
1907
1908 #[tokio::test]
1909 async fn test_duplicate_prefixes_deduped() {
1910 let origin = Origin::random().produce();
1911 let broadcast = Broadcast::new().produce();
1912
1913 let producer = origin
1915 .scope(&["demo".into(), "demo".into()])
1916 .expect("should create producer");
1917
1918 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1919
1920 let mut consumer = producer.consume();
1921 consumer.assert_next("demo/stream", &broadcast.consume());
1922 consumer.assert_next_wait();
1923 }
1924
1925 #[tokio::test]
1926 async fn test_overlapping_prefixes_deduped() {
1927 let origin = Origin::random().produce();
1928 let broadcast = Broadcast::new().produce();
1929
1930 let producer = origin
1932 .scope(&["demo".into(), "demo/foo".into()])
1933 .expect("should create producer");
1934
1935 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1937
1938 let mut consumer = producer.consume();
1939 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1940 consumer.assert_next_wait();
1941 }
1942
1943 #[tokio::test]
1944 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1945 let origin = Origin::random().produce();
1946 let broadcast = Broadcast::new().produce();
1947
1948 let producer = origin
1950 .scope(&["demo".into(), "demo/foo".into()])
1951 .expect("should create producer");
1952
1953 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1954
1955 let mut consumer = producer.consume();
1956 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1958 consumer.assert_next_wait();
1959 }
1960
1961 #[tokio::test]
1962 async fn test_allowed_returns_deduped_prefixes() {
1963 let origin = Origin::random().produce();
1964
1965 let producer = origin
1966 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1967 .expect("should create producer");
1968
1969 let allowed: Vec<_> = producer.allowed().collect();
1970 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1971 }
1972
1973 #[tokio::test]
1974 async fn test_announced_broadcast_already_announced() {
1975 let origin = Origin::random().produce();
1976 let broadcast = Broadcast::new().produce();
1977
1978 origin.publish_broadcast("test", broadcast.consume());
1979
1980 let consumer = origin.consume();
1981 let result = consumer.announced_broadcast("test").await.expect("should find it");
1982 assert!(result.is_clone(&broadcast.consume()));
1983 }
1984
1985 #[tokio::test]
1986 async fn test_announced_broadcast_delayed() {
1987 tokio::time::pause();
1988
1989 let origin = Origin::random().produce();
1990 let broadcast = Broadcast::new().produce();
1991
1992 let consumer = origin.consume();
1993
1994 let wait = tokio::spawn({
1996 let consumer = consumer.clone();
1997 async move { consumer.announced_broadcast("test").await }
1998 });
1999
2000 tokio::task::yield_now().await;
2002
2003 origin.publish_broadcast("test", broadcast.consume());
2004
2005 let result = wait.await.unwrap().expect("should find it");
2006 assert!(result.is_clone(&broadcast.consume()));
2007 }
2008
2009 #[tokio::test]
2010 async fn test_announced_broadcast_ignores_unrelated_paths() {
2011 tokio::time::pause();
2012
2013 let origin = Origin::random().produce();
2014 let other = Broadcast::new().produce();
2015 let target = Broadcast::new().produce();
2016
2017 let consumer = origin.consume();
2018
2019 let wait = tokio::spawn({
2020 let consumer = consumer.clone();
2021 async move { consumer.announced_broadcast("target").await }
2022 });
2023
2024 tokio::task::yield_now().await;
2025
2026 origin.publish_broadcast("other", other.consume());
2028 tokio::task::yield_now().await;
2029 assert!(!wait.is_finished(), "must not resolve on unrelated path");
2030
2031 origin.publish_broadcast("target", target.consume());
2032 let result = wait.await.unwrap().expect("should find target");
2033 assert!(result.is_clone(&target.consume()));
2034 }
2035
2036 #[tokio::test]
2037 async fn test_announced_broadcast_skips_nested_paths() {
2038 tokio::time::pause();
2039
2040 let origin = Origin::random().produce();
2041 let nested = Broadcast::new().produce();
2042 let exact = Broadcast::new().produce();
2043
2044 let consumer = origin.consume();
2045
2046 let wait = tokio::spawn({
2047 let consumer = consumer.clone();
2048 async move { consumer.announced_broadcast("foo").await }
2049 });
2050
2051 tokio::task::yield_now().await;
2052
2053 origin.publish_broadcast("foo/bar", nested.consume());
2055 tokio::task::yield_now().await;
2056 assert!(!wait.is_finished(), "must not resolve on a nested path");
2057
2058 origin.publish_broadcast("foo", exact.consume());
2059 let result = wait.await.unwrap().expect("should find foo exactly");
2060 assert!(result.is_clone(&exact.consume()));
2061 }
2062
2063 #[tokio::test]
2064 async fn test_announced_broadcast_disallowed() {
2065 let origin = Origin::random().produce();
2066 let limited = origin
2067 .consume()
2068 .scope(&["allowed".into()])
2069 .expect("should create limited");
2070
2071 assert!(limited.announced_broadcast("notallowed").await.is_none());
2073 }
2074
2075 #[tokio::test]
2076 async fn test_announced_broadcast_scope_too_narrow() {
2077 let origin = Origin::random().produce();
2080 let limited = origin
2081 .consume()
2082 .scope(&["foo/specific".into()])
2083 .expect("should create limited");
2084
2085 let result = limited
2087 .announced_broadcast("foo")
2088 .now_or_never()
2089 .expect("must not block");
2090 assert!(result.is_none());
2091 }
2092
2093 #[tokio::test]
2097 async fn test_coalesce_announce_then_unannounce() {
2098 tokio::time::pause();
2100
2101 let origin = Origin::random().produce();
2102 let mut consumer = origin.consume();
2103
2104 let broadcast = Broadcast::new().produce();
2105 origin.publish_broadcast("test", broadcast.consume());
2106 drop(broadcast);
2107
2108 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2109
2110 consumer.assert_next_wait();
2111 }
2112
2113 #[tokio::test]
2114 async fn test_coalesce_announce_unannounce_announce() {
2115 tokio::time::pause();
2118
2119 let origin = Origin::random().produce();
2120 let mut consumer = origin.consume();
2121
2122 let broadcast1 = Broadcast::new().produce();
2123 let broadcast2 = Broadcast::new().produce();
2124
2125 origin.publish_broadcast("test", broadcast1.consume());
2126 drop(broadcast1);
2127 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2128 origin.publish_broadcast("test", broadcast2.consume());
2129
2130 consumer.assert_next("test", &broadcast2.consume());
2131 consumer.assert_next_wait();
2132 }
2133
2134 #[tokio::test]
2135 async fn test_coalesce_unannounce_announce_preserved() {
2136 tokio::time::pause();
2139
2140 let origin = Origin::random().produce();
2141 let broadcast1 = Broadcast::new().produce();
2142 origin.publish_broadcast("test", broadcast1.consume());
2143
2144 let mut consumer = origin.consume();
2145 consumer.assert_next("test", &broadcast1.consume());
2146
2147 drop(broadcast1);
2149 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2150
2151 let broadcast2 = Broadcast::new().produce();
2152 origin.publish_broadcast("test", broadcast2.consume());
2153
2154 consumer.assert_next_none("test");
2156 consumer.assert_next("test", &broadcast2.consume());
2157 consumer.assert_next_wait();
2158 }
2159
2160 #[tokio::test]
2161 async fn test_coalesce_unannounce_announce_unannounce() {
2162 tokio::time::pause();
2165
2166 let origin = Origin::random().produce();
2167 let broadcast1 = Broadcast::new().produce();
2168 origin.publish_broadcast("test", broadcast1.consume());
2169
2170 let mut consumer = origin.consume();
2171 consumer.assert_next("test", &broadcast1.consume());
2172
2173 drop(broadcast1);
2174 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2175
2176 let broadcast2 = Broadcast::new().produce();
2177 origin.publish_broadcast("test", broadcast2.consume());
2178 drop(broadcast2);
2179 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2180
2181 consumer.assert_next_none("test");
2182 consumer.assert_next_wait();
2183 }
2184
2185 #[tokio::test]
2186 async fn test_coalesce_churn_bounded() {
2187 tokio::time::pause();
2192
2193 let origin = Origin::random().produce();
2194 let mut consumer = origin.consume();
2195
2196 for _ in 0..1000 {
2197 let broadcast = Broadcast::new().produce();
2198 origin.publish_broadcast("test", broadcast.consume());
2199 drop(broadcast);
2200 }
2201 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2202
2203 let mut collected = Vec::new();
2204 while let Some(update) = consumer.try_announced() {
2205 collected.push(update);
2206 }
2207 assert!(
2208 collected.len() <= 1,
2209 "expected at most one pending update, got {}",
2210 collected.len()
2211 );
2212 assert!(
2213 collected.iter().all(|(path, _)| path == &Path::new("test")),
2214 "unexpected path in pending updates",
2215 );
2216 }
2217}