1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5 task::Poll,
6};
7
8use rand::RngExt;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn replace_first(&mut self, target: Origin, replacement: Origin) -> bool {
141 for entry in &mut self.0 {
142 if *entry == target {
143 *entry = replacement;
144 return true;
145 }
146 }
147 false
148 }
149
150 pub fn contains(&self, origin: &Origin) -> bool {
152 self.0.contains(origin)
153 }
154
155 pub fn len(&self) -> usize {
157 self.0.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.0.is_empty()
163 }
164
165 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
167 self.0.iter()
168 }
169
170 pub fn as_slice(&self) -> &[Origin] {
172 &self.0
173 }
174}
175
176impl TryFrom<Vec<Origin>> for OriginList {
177 type Error = TooManyOrigins;
178
179 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
180 if v.len() > MAX_HOPS {
181 return Err(TooManyOrigins);
182 }
183 Ok(Self(v))
184 }
185}
186
187impl<'a> IntoIterator for &'a OriginList {
188 type Item = &'a Origin;
189 type IntoIter = std::slice::Iter<'a, Origin>;
190
191 fn into_iter(self) -> Self::IntoIter {
192 self.iter()
193 }
194}
195
196impl<V: Copy> Encode<V> for OriginList
197where
198 u64: Encode<V>,
199 Origin: Encode<V>,
200{
201 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
202 (self.0.len() as u64).encode(w, version)?;
203 for origin in &self.0 {
204 origin.encode(w, version)?;
205 }
206 Ok(())
207 }
208}
209
210impl<V: Copy> Decode<V> for OriginList
211where
212 u64: Decode<V>,
213 Origin: Decode<V>,
214{
215 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
216 let count = u64::decode(r, version)? as usize;
217 if count > MAX_HOPS {
218 return Err(DecodeError::BoundsExceeded);
219 }
220 let mut list = Vec::with_capacity(count);
221 for _ in 0..count {
222 list.push(Origin::decode(r, version)?);
223 }
224 Ok(Self(list))
225 }
226}
227
228static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231struct ConsumerId(u64);
232
233impl ConsumerId {
234 fn new() -> Self {
235 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
236 }
237}
238
239struct OriginBroadcast {
241 path: PathOwned,
242 active: BroadcastConsumer,
243 backup: VecDeque<BroadcastConsumer>,
244}
245
246fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
254 const SEED: u64 = 0x420C0DECB00B; const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
261
262 let mut hash = SEED;
263 for &byte in name.as_str().as_bytes() {
264 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
265 }
266 for hop in hops {
267 for &byte in &hop.id.to_le_bytes() {
268 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
269 }
270 }
271
272 (hops.len(), hash)
273}
274
275enum PendingUpdate {
284 Announce(BroadcastConsumer),
285 Unannounce,
286 UnannounceAnnounce(BroadcastConsumer),
287}
288
289#[derive(Default)]
294struct OriginConsumerState {
295 pending: BTreeMap<PathOwned, PendingUpdate>,
296}
297
298impl OriginConsumerState {
299 fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
300 let new = match self.pending.remove(&path) {
301 None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
303 Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
305 PendingUpdate::UnannounceAnnounce(broadcast)
306 }
307 };
308 self.pending.insert(path, new);
309 }
310
311 fn apply_unannounce(&mut self, path: PathOwned) {
312 match self.pending.remove(&path) {
313 Some(PendingUpdate::Announce(_)) => {}
315 None | Some(PendingUpdate::Unannounce) => {
316 self.pending.insert(path, PendingUpdate::Unannounce);
317 }
318 Some(PendingUpdate::UnannounceAnnounce(_)) => {
321 self.pending.insert(path, PendingUpdate::Unannounce);
322 }
323 }
324 }
325
326 fn take(&mut self) -> Option<OriginAnnounce> {
328 let path = self.pending.keys().next()?.clone();
329 match self.pending.remove(&path).unwrap() {
330 PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
331 PendingUpdate::Unannounce => Some((path, None)),
332 PendingUpdate::UnannounceAnnounce(broadcast) => {
333 self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
336 Some((path, None))
337 }
338 }
339 }
340}
341
342#[derive(Clone)]
343struct OriginConsumerNotify {
344 root: PathOwned,
345 state: kio::Producer<OriginConsumerState>,
346}
347
348impl OriginConsumerNotify {
349 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
350 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
351 self.state
352 .write()
353 .ok()
354 .expect("consumer closed")
355 .apply_announce(path, broadcast);
356 }
357
358 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
359 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
360 let mut state = self.state.write().ok().expect("consumer closed");
361 state.apply_unannounce(path.clone());
362 state.apply_announce(path, broadcast);
363 }
364
365 fn unannounce(&self, path: impl AsPath) {
366 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
367 self.state.write().ok().expect("consumer closed").apply_unannounce(path);
368 }
369}
370
371struct NotifyNode {
372 parent: Option<Lock<NotifyNode>>,
373
374 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
377}
378
379impl NotifyNode {
380 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
381 Self {
382 parent,
383 consumers: HashMap::new(),
384 }
385 }
386
387 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
388 for consumer in self.consumers.values() {
389 consumer.announce(path.as_path(), broadcast.clone());
390 }
391
392 if let Some(parent) = &self.parent {
393 parent.lock().announce(path, broadcast);
394 }
395 }
396
397 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
398 for consumer in self.consumers.values() {
399 consumer.reannounce(path.as_path(), broadcast.clone());
400 }
401
402 if let Some(parent) = &self.parent {
403 parent.lock().reannounce(path, broadcast);
404 }
405 }
406
407 fn unannounce(&mut self, path: impl AsPath) {
408 for consumer in self.consumers.values() {
409 consumer.unannounce(path.as_path());
410 }
411
412 if let Some(parent) = &self.parent {
413 parent.lock().unannounce(path);
414 }
415 }
416}
417
418struct OriginNode {
419 broadcast: Option<OriginBroadcast>,
421
422 nested: HashMap<String, Lock<OriginNode>>,
424
425 notify: Lock<NotifyNode>,
427}
428
429impl OriginNode {
430 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
431 Self {
432 broadcast: None,
433 nested: HashMap::new(),
434 notify: Lock::new(NotifyNode::new(parent)),
435 }
436 }
437
438 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
439 let (dir, rest) = path.next_part().expect("leaf called with empty path");
440
441 let next = self.entry(dir);
442 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
443 }
444
445 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
446 match self.nested.get(dir) {
447 Some(next) => next.clone(),
448 None => {
449 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
450 self.nested.insert(dir.to_string(), next.clone());
451 next
452 }
453 }
454 }
455
456 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
457 let full = full.as_path();
458 let rest = relative.as_path();
459
460 if let Some((dir, relative)) = rest.next_part() {
462 self.entry(dir).lock().publish(&full, broadcast, &relative);
464 } else if let Some(existing) = &mut self.broadcast {
465 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
473 return;
474 }
475
476 if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
477 let old = existing.active.clone();
478 existing.active = broadcast.clone();
479 existing.backup.push_back(old);
480
481 self.notify.lock().reannounce(full, broadcast);
482 } else {
483 existing.backup.push_back(broadcast.clone());
486 }
487 } else {
488 self.broadcast = Some(OriginBroadcast {
490 path: full.to_owned(),
491 active: broadcast.clone(),
492 backup: VecDeque::new(),
493 });
494 self.notify.lock().announce(full, broadcast);
495 }
496 }
497
498 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
499 self.consume_initial(&mut notify);
500 self.notify.lock().consumers.insert(id, notify);
501 }
502
503 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
504 if let Some(broadcast) = &self.broadcast {
505 notify.announce(&broadcast.path, broadcast.active.clone());
506 }
507
508 for nested in self.nested.values() {
510 nested.lock().consume_initial(notify);
511 }
512 }
513
514 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
515 let rest = rest.as_path();
516
517 if let Some((dir, rest)) = rest.next_part() {
518 let node = self.nested.get(dir)?.lock();
519 node.consume_broadcast(&rest)
520 } else {
521 self.broadcast.as_ref().map(|b| b.active.clone())
522 }
523 }
524
525 fn unconsume(&mut self, id: ConsumerId) {
526 self.notify.lock().consumers.remove(&id).expect("consumer not found");
527 if self.is_empty() {
528 }
531 }
532
533 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
535 let full = full.as_path();
536 let relative = relative.as_path();
537
538 if let Some((dir, relative)) = relative.next_part() {
539 let nested = self.entry(dir);
540 let mut locked = nested.lock();
541 locked.remove(&full, broadcast, &relative);
542
543 if locked.is_empty() {
544 drop(locked);
545 self.nested.remove(dir);
546 }
547 } else {
548 let entry = match &mut self.broadcast {
549 Some(existing) => existing,
550 None => return,
551 };
552
553 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
555 if let Some(pos) = pos {
556 entry.backup.remove(pos);
557 return;
559 }
560
561 assert!(entry.active.is_clone(&broadcast));
563
564 let best = entry
567 .backup
568 .iter()
569 .enumerate()
570 .min_by_key(|(_, b)| route_key(&full, &b.hops))
571 .map(|(i, _)| i);
572 if let Some(idx) = best {
573 let active = entry.backup.remove(idx).expect("index in range");
574 entry.active = active;
575 self.notify.lock().reannounce(full, &entry.active);
576 } else {
577 self.broadcast = None;
579 self.notify.lock().unannounce(full);
580 }
581 }
582 }
583
584 fn is_empty(&self) -> bool {
585 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
586 }
587}
588
589#[derive(Clone)]
590struct OriginNodes {
591 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
592}
593
594impl OriginNodes {
595 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
598 let mut roots = Vec::new();
599
600 for (root, state) in &self.nodes {
601 for prefix in prefixes {
602 if root.has_prefix(prefix) {
603 roots.push((root.to_owned(), state.clone()));
605 continue;
606 }
607
608 if let Some(suffix) = prefix.strip_prefix(root) {
609 let nested = state.lock().leaf(&suffix);
611 roots.push((prefix.to_owned(), nested));
612 }
613 }
614 }
615
616 if roots.is_empty() {
617 None
618 } else {
619 Some(Self { nodes: roots })
620 }
621 }
622
623 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
624 let new_root = new_root.as_path();
625 let mut roots = Vec::new();
626
627 if new_root.is_empty() {
628 return Some(self.clone());
629 }
630
631 for (root, state) in &self.nodes {
632 if let Some(suffix) = root.strip_prefix(&new_root) {
633 roots.push((suffix.to_owned(), state.clone()));
635 } else if let Some(suffix) = new_root.strip_prefix(root) {
636 let nested = state.lock().leaf(&suffix);
639 roots.push(("".into(), nested));
640 }
641 }
642
643 if roots.is_empty() {
644 None
645 } else {
646 Some(Self { nodes: roots })
647 }
648 }
649
650 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
652 let path = path.as_path();
653
654 for (root, state) in &self.nodes {
655 if let Some(suffix) = path.strip_prefix(root) {
656 return Some((state.clone(), suffix.to_owned()));
657 }
658 }
659
660 None
661 }
662}
663
664impl Default for OriginNodes {
665 fn default() -> Self {
666 Self {
667 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
668 }
669 }
670}
671
672pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
674
675#[derive(Clone)]
677pub struct OriginProducer {
678 info: Origin,
682
683 nodes: OriginNodes,
686
687 root: PathOwned,
689}
690
691impl std::ops::Deref for OriginProducer {
692 type Target = Origin;
693
694 fn deref(&self) -> &Self::Target {
695 &self.info
696 }
697}
698
699impl OriginProducer {
700 pub fn new(info: Origin) -> Self {
703 Self {
704 info,
705 nodes: OriginNodes::default(),
706 root: PathOwned::default(),
707 }
708 }
709
710 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
715 let broadcast = Broadcast::new().produce();
716 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
717 }
718
719 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
731 let path = path.as_path();
732
733 if broadcast.hops.contains(&self.info) {
735 return false;
736 }
737
738 let (root, rest) = match self.nodes.get(&path) {
739 Some(root) => root,
740 None => return false,
741 };
742
743 let full = self.root.join(&path);
744
745 root.lock().publish(&full, &broadcast, &rest);
746 let root = root.clone();
747
748 web_async::spawn(async move {
749 broadcast.closed().await;
750 root.lock().remove(&full, broadcast, &rest);
751 });
752
753 true
754 }
755
756 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
762 let prefixes = PathPrefixes::new(prefixes);
763 Some(OriginProducer {
764 info: self.info,
765 nodes: self.nodes.select(&prefixes)?,
766 root: self.root.clone(),
767 })
768 }
769
770 pub fn consume(&self) -> OriginConsumer {
772 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
773 }
774
775 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
780 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
781 let path = path.as_path();
782 let (root, rest) = self.nodes.get(&path)?;
783 let state = root.lock();
784 state.consume_broadcast(&rest)
785 }
786
787 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
792 let prefix = prefix.as_path();
793
794 Some(Self {
795 info: self.info,
796 root: self.root.join(&prefix).to_owned(),
797 nodes: self.nodes.root(&prefix)?,
798 })
799 }
800
801 pub fn root(&self) -> &Path<'_> {
803 &self.root
804 }
805
806 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
809 self.nodes.nodes.iter().map(|(root, _)| root)
810 }
811
812 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
814 self.root.join(path)
815 }
816}
817
818pub struct OriginConsumer {
822 id: ConsumerId,
823 info: Origin,
825 nodes: OriginNodes,
826
827 state: kio::Producer<OriginConsumerState>,
830
831 root: PathOwned,
833}
834
835impl std::ops::Deref for OriginConsumer {
836 type Target = Origin;
837
838 fn deref(&self) -> &Self::Target {
839 &self.info
840 }
841}
842
843impl OriginConsumer {
844 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
845 let state = kio::Producer::<OriginConsumerState>::default();
846 let id = ConsumerId::new();
847
848 for (_, node) in &nodes.nodes {
849 let notify = OriginConsumerNotify {
850 root: root.clone(),
851 state: state.clone(),
852 };
853 node.lock().consume(id, notify);
854 }
855
856 Self {
857 id,
858 info,
859 nodes,
860 state,
861 root,
862 }
863 }
864
865 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
873 kio::wait(|waiter| self.poll_announced(waiter)).await
874 }
875
876 pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
882 match self.state.poll(waiter, |state| match state.take() {
883 Some(item) => Poll::Ready(item),
884 None => Poll::Pending,
885 }) {
886 Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
887 Poll::Ready(Err(_)) => Poll::Ready(None),
889 Poll::Pending => Poll::Pending,
890 }
891 }
892
893 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
898 self.state.write().ok()?.take()
899 }
900
901 pub fn consume(&self) -> Self {
903 self.clone()
904 }
905
906 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
914 let path = path.as_path();
915 let (root, rest) = self.nodes.get(&path)?;
916 let state = root.lock();
917 state.consume_broadcast(&rest)
918 }
919
920 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
929 let path = path.as_path();
930
931 let mut consumer = self.scope(std::slice::from_ref(&path))?;
933
934 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
938 return None;
939 }
940
941 loop {
942 let (announced, broadcast) = consumer.announced().await?;
943 if announced.as_path() == path {
945 if let Some(broadcast) = broadcast {
946 return Some(broadcast);
947 }
948 }
949 }
950 }
951
952 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
958 let prefixes = PathPrefixes::new(prefixes);
959 Some(OriginConsumer::new(
960 self.info,
961 self.root.clone(),
962 self.nodes.select(&prefixes)?,
963 ))
964 }
965
966 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
971 let prefix = prefix.as_path();
972
973 Some(Self::new(
974 self.info,
975 self.root.join(&prefix).to_owned(),
976 self.nodes.root(&prefix)?,
977 ))
978 }
979
980 pub fn root(&self) -> &Path<'_> {
982 &self.root
983 }
984
985 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
988 self.nodes.nodes.iter().map(|(root, _)| root)
989 }
990
991 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
993 self.root.join(path)
994 }
995}
996
997impl Drop for OriginConsumer {
998 fn drop(&mut self) {
999 for (_, root) in &self.nodes.nodes {
1000 root.lock().unconsume(self.id);
1001 }
1002 }
1003}
1004
1005impl Clone for OriginConsumer {
1006 fn clone(&self) -> Self {
1007 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
1008 }
1009}
1010
1011#[cfg(test)]
1012use futures::FutureExt;
1013
1014#[cfg(test)]
1015impl OriginConsumer {
1016 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1017 let expected = expected.as_path();
1018 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1019 assert_eq!(path, expected, "wrong path");
1020 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1021 }
1022
1023 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1024 let expected = expected.as_path();
1025 let (path, active) = self.try_announced().expect("no next");
1026 assert_eq!(path, expected, "wrong path");
1027 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1028 }
1029
1030 pub fn assert_next_none(&mut self, expected: impl AsPath) {
1031 let expected = expected.as_path();
1032 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1033 assert_eq!(path, expected, "wrong path");
1034 assert!(active.is_none(), "should be unannounced");
1035 }
1036
1037 pub fn assert_next_wait(&mut self) {
1038 if let Some(res) = self.announced().now_or_never() {
1039 panic!("next should block: got {:?}", res.map(|(path, _)| path));
1040 }
1041 }
1042
1043 }
1052
1053#[cfg(test)]
1054mod tests {
1055 use crate::Broadcast;
1056
1057 use super::*;
1058
1059 #[test]
1060 fn origin_list_push_fails_at_limit() {
1061 let mut list = OriginList::new();
1062 for _ in 0..MAX_HOPS {
1063 list.push(Origin::random()).unwrap();
1064 }
1065 assert_eq!(list.len(), MAX_HOPS);
1066 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1067 }
1068
1069 #[test]
1070 fn origin_list_replace_first() {
1071 let mut list = OriginList::new();
1072 for _ in 0..3 {
1073 list.push(Origin::UNKNOWN).unwrap();
1074 }
1075
1076 assert!(list.replace_first(Origin::UNKNOWN, Origin::from(7)));
1078 assert_eq!(list.as_slice(), &[Origin::from(7), Origin::UNKNOWN, Origin::UNKNOWN]);
1079
1080 assert!(!list.replace_first(Origin::from(99), Origin::from(8)));
1082 assert_eq!(list.len(), 3);
1083 }
1084
1085 #[test]
1086 fn origin_list_try_from_vec_enforces_limit() {
1087 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1088 assert!(OriginList::try_from(under).is_ok());
1089
1090 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1091 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1092 }
1093
1094 #[tokio::test]
1095 async fn test_announce() {
1096 tokio::time::pause();
1097
1098 let origin = Origin::random().produce();
1099 let broadcast1 = Broadcast::new().produce();
1100 let broadcast2 = Broadcast::new().produce();
1101
1102 let mut consumer1 = origin.consume();
1103 consumer1.assert_next_wait();
1105
1106 origin.publish_broadcast("test1", broadcast1.consume());
1108
1109 consumer1.assert_next("test1", &broadcast1.consume());
1110 consumer1.assert_next_wait();
1111
1112 let mut consumer2 = origin.consume();
1115
1116 origin.publish_broadcast("test2", broadcast2.consume());
1118
1119 consumer1.assert_next("test2", &broadcast2.consume());
1120 consumer1.assert_next_wait();
1121
1122 consumer2.assert_next("test1", &broadcast1.consume());
1123 consumer2.assert_next("test2", &broadcast2.consume());
1124 consumer2.assert_next_wait();
1125
1126 drop(broadcast1);
1128
1129 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1131
1132 consumer1.assert_next_none("test1");
1134 consumer2.assert_next_none("test1");
1135 consumer1.assert_next_wait();
1136 consumer2.assert_next_wait();
1137
1138 let mut consumer3 = origin.consume();
1140 consumer3.assert_next("test2", &broadcast2.consume());
1141 consumer3.assert_next_wait();
1142
1143 drop(broadcast2);
1145
1146 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1148
1149 consumer1.assert_next_none("test2");
1150 consumer2.assert_next_none("test2");
1151 consumer3.assert_next_none("test2");
1152
1153 }
1159
1160 #[tokio::test]
1161 async fn test_duplicate() {
1162 tokio::time::pause();
1163
1164 let origin = Origin::random().produce();
1165
1166 let broadcast1 = Broadcast::new().produce();
1167 let broadcast2 = Broadcast::new().produce();
1168 let broadcast3 = Broadcast::new().produce();
1169
1170 let consumer1 = broadcast1.consume();
1171 let consumer2 = broadcast2.consume();
1172 let consumer3 = broadcast3.consume();
1173
1174 let mut consumer = origin.consume();
1175
1176 origin.publish_broadcast("test", consumer1.clone());
1177 origin.publish_broadcast("test", consumer2.clone());
1178 origin.publish_broadcast("test", consumer3.clone());
1179 assert!(consumer.get_broadcast("test").is_some());
1180
1181 consumer.assert_next("test", &consumer1);
1184 consumer.assert_next_wait();
1185
1186 drop(broadcast2);
1188
1189 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1191
1192 assert!(consumer.get_broadcast("test").is_some());
1193 consumer.assert_next_wait();
1194
1195 drop(broadcast1);
1197
1198 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1200
1201 assert!(consumer.get_broadcast("test").is_some());
1202 consumer.assert_next_none("test");
1203 consumer.assert_next("test", &consumer3);
1204
1205 drop(broadcast3);
1207
1208 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1210 assert!(consumer.get_broadcast("test").is_none());
1211
1212 consumer.assert_next_none("test");
1213 consumer.assert_next_wait();
1214 }
1215
1216 #[tokio::test]
1217 async fn test_duplicate_reverse() {
1218 tokio::time::pause();
1219
1220 let origin = Origin::random().produce();
1221 let broadcast1 = Broadcast::new().produce();
1222 let broadcast2 = Broadcast::new().produce();
1223
1224 origin.publish_broadcast("test", broadcast1.consume());
1225 origin.publish_broadcast("test", broadcast2.consume());
1226 assert!(origin.consume().get_broadcast("test").is_some());
1227
1228 drop(broadcast2);
1230
1231 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1233 assert!(origin.consume().get_broadcast("test").is_some());
1234
1235 drop(broadcast1);
1236
1237 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1239 assert!(origin.consume().get_broadcast("test").is_none());
1240 }
1241
1242 #[tokio::test]
1243 async fn test_deterministic_tiebreak() {
1244 tokio::time::pause();
1245
1246 fn route(ids: &[u64]) -> BroadcastProducer {
1248 let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1249 Broadcast { hops }.produce()
1250 }
1251
1252 fn winner(first: &[u64], second: &[u64]) -> OriginList {
1254 let origin = Origin::random().produce();
1255 let a = route(first);
1256 let b = route(second);
1257 origin.publish_broadcast("test", a.consume());
1258 origin.publish_broadcast("test", b.consume());
1259 let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1260 drop((a, b));
1262 hops
1263 }
1264
1265 let forward = winner(&[10, 20], &[30, 40]);
1268 let reverse = winner(&[30, 40], &[10, 20]);
1269 assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1270
1271 assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1273 assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1274 }
1275
1276 #[tokio::test]
1277 async fn test_double_publish() {
1278 tokio::time::pause();
1279
1280 let origin = Origin::random().produce();
1281 let broadcast = Broadcast::new().produce();
1282
1283 origin.publish_broadcast("test", broadcast.consume());
1285 origin.publish_broadcast("test", broadcast.consume());
1286
1287 assert!(origin.consume().get_broadcast("test").is_some());
1288
1289 drop(broadcast);
1290
1291 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1293 assert!(origin.consume().get_broadcast("test").is_none());
1294 }
1295 #[tokio::test]
1300 async fn test_many_announces() {
1301 let origin = Origin::random().produce();
1302 let broadcast = Broadcast::new().produce();
1303
1304 let mut consumer = origin.consume();
1305 for i in 0..256 {
1306 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1307 }
1308
1309 for i in 0..256 {
1310 consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1311 }
1312 consumer.assert_next_wait();
1313 }
1314
1315 #[tokio::test]
1316 async fn test_many_announces_try() {
1317 let origin = Origin::random().produce();
1318 let broadcast = Broadcast::new().produce();
1319
1320 let mut consumer = origin.consume();
1321 for i in 0..256 {
1322 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1323 }
1324
1325 for i in 0..256 {
1326 consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1327 }
1328 }
1329
1330 #[tokio::test]
1331 async fn test_with_root_basic() {
1332 let origin = Origin::random().produce();
1333 let broadcast = Broadcast::new().produce();
1334
1335 let foo_producer = origin.with_root("foo").expect("should create root");
1337 assert_eq!(foo_producer.root().as_str(), "foo");
1338
1339 let mut consumer = origin.consume();
1340
1341 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1343 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1345
1346 let mut foo_consumer = foo_producer.consume();
1348 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1349 }
1350
1351 #[tokio::test]
1352 async fn test_with_root_nested() {
1353 let origin = Origin::random().produce();
1354 let broadcast = Broadcast::new().produce();
1355
1356 let foo_producer = origin.with_root("foo").expect("should create foo root");
1358 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1359 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1360
1361 let mut consumer = origin.consume();
1362
1363 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1365 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1367
1368 let mut foo_bar_consumer = foo_bar_producer.consume();
1370 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1371 }
1372
1373 #[tokio::test]
1374 async fn test_publish_scope_allows() {
1375 let origin = Origin::random().produce();
1376 let broadcast = Broadcast::new().produce();
1377
1378 let limited_producer = origin
1380 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1381 .expect("should create limited producer");
1382
1383 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1385 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1386 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1387
1388 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1390 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1392 }
1393
1394 #[tokio::test]
1395 async fn test_publish_scope_empty() {
1396 let origin = Origin::random().produce();
1397
1398 assert!(origin.scope(&[]).is_none());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_consume_scope_filters() {
1404 let origin = Origin::random().produce();
1405 let broadcast1 = Broadcast::new().produce();
1406 let broadcast2 = Broadcast::new().produce();
1407 let broadcast3 = Broadcast::new().produce();
1408
1409 let mut consumer = origin.consume();
1410
1411 origin.publish_broadcast("allowed", broadcast1.consume());
1413 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1414 origin.publish_broadcast("notallowed", broadcast3.consume());
1415
1416 let mut limited_consumer = origin
1418 .consume()
1419 .scope(&["allowed".into()])
1420 .expect("should create limited consumer");
1421
1422 limited_consumer.assert_next("allowed", &broadcast1.consume());
1424 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1425 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1429 consumer.assert_next("allowed/nested", &broadcast2.consume());
1430 consumer.assert_next("notallowed", &broadcast3.consume());
1431 }
1432
1433 #[tokio::test]
1434 async fn test_consume_scope_multiple_prefixes() {
1435 let origin = Origin::random().produce();
1436 let broadcast1 = Broadcast::new().produce();
1437 let broadcast2 = Broadcast::new().produce();
1438 let broadcast3 = Broadcast::new().produce();
1439
1440 origin.publish_broadcast("foo/test", broadcast1.consume());
1441 origin.publish_broadcast("bar/test", broadcast2.consume());
1442 origin.publish_broadcast("baz/test", broadcast3.consume());
1443
1444 let mut limited_consumer = origin
1446 .consume()
1447 .scope(&["foo".into(), "bar".into()])
1448 .expect("should create limited consumer");
1449
1450 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1452 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1453 limited_consumer.assert_next_wait(); }
1455
1456 #[tokio::test]
1457 async fn test_with_root_and_publish_scope() {
1458 let origin = Origin::random().produce();
1459 let broadcast = Broadcast::new().produce();
1460
1461 let foo_producer = origin.with_root("foo").expect("should create foo root");
1463
1464 let limited_producer = foo_producer
1466 .scope(&["bar".into(), "goop/pee".into()])
1467 .expect("should create limited producer");
1468
1469 let mut consumer = origin.consume();
1470
1471 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1473 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1474 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1475 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1476
1477 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1479 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1481
1482 consumer.assert_next("foo/bar", &broadcast.consume());
1484 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1485 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1486 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1487 }
1488
1489 #[tokio::test]
1490 async fn test_with_root_and_consume_scope() {
1491 let origin = Origin::random().produce();
1492 let broadcast1 = Broadcast::new().produce();
1493 let broadcast2 = Broadcast::new().produce();
1494 let broadcast3 = Broadcast::new().produce();
1495
1496 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1498 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1499 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1500
1501 let foo_producer = origin.with_root("foo").expect("should create foo root");
1503
1504 let mut limited_consumer = foo_producer
1506 .consume()
1507 .scope(&["bar".into(), "goop/pee".into()])
1508 .expect("should create limited consumer");
1509
1510 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1512 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1513 limited_consumer.assert_next_wait(); }
1515
1516 #[tokio::test]
1517 async fn test_with_root_unauthorized() {
1518 let origin = Origin::random().produce();
1519
1520 let limited_producer = origin
1522 .scope(&["allowed".into()])
1523 .expect("should create limited producer");
1524
1525 assert!(limited_producer.with_root("notallowed").is_none());
1527
1528 let allowed_root = limited_producer
1530 .with_root("allowed")
1531 .expect("should create allowed root");
1532 assert_eq!(allowed_root.root().as_str(), "allowed");
1533 }
1534
1535 #[tokio::test]
1536 async fn test_wildcard_permission() {
1537 let origin = Origin::random().produce();
1538 let broadcast = Broadcast::new().produce();
1539
1540 let root_producer = origin.clone();
1542
1543 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1545 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1546
1547 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1549 assert_eq!(foo_producer.root().as_str(), "foo");
1550 }
1551
1552 #[tokio::test]
1553 async fn test_consume_broadcast_with_permissions() {
1554 let origin = Origin::random().produce();
1555 let broadcast1 = Broadcast::new().produce();
1556 let broadcast2 = Broadcast::new().produce();
1557
1558 origin.publish_broadcast("allowed/test", broadcast1.consume());
1559 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1560
1561 let limited_consumer = origin
1563 .consume()
1564 .scope(&["allowed".into()])
1565 .expect("should create limited consumer");
1566
1567 let result = limited_consumer.get_broadcast("allowed/test");
1569 assert!(result.is_some());
1570 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1571
1572 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1574
1575 let consumer = origin.consume();
1577 assert!(consumer.get_broadcast("allowed/test").is_some());
1578 assert!(consumer.get_broadcast("notallowed/test").is_some());
1579 }
1580
1581 #[tokio::test]
1582 async fn test_nested_paths_with_permissions() {
1583 let origin = Origin::random().produce();
1584 let broadcast = Broadcast::new().produce();
1585
1586 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1588
1589 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1591 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1592 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1593
1594 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1596 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1597 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1598 }
1599
1600 #[tokio::test]
1601 async fn test_multiple_consumers_with_different_permissions() {
1602 let origin = Origin::random().produce();
1603 let broadcast1 = Broadcast::new().produce();
1604 let broadcast2 = Broadcast::new().produce();
1605 let broadcast3 = Broadcast::new().produce();
1606
1607 origin.publish_broadcast("foo/test", broadcast1.consume());
1609 origin.publish_broadcast("bar/test", broadcast2.consume());
1610 origin.publish_broadcast("baz/test", broadcast3.consume());
1611
1612 let mut foo_consumer = origin
1614 .consume()
1615 .scope(&["foo".into()])
1616 .expect("should create foo consumer");
1617
1618 let mut bar_consumer = origin
1619 .consume()
1620 .scope(&["bar".into()])
1621 .expect("should create bar consumer");
1622
1623 let mut foobar_consumer = origin
1624 .consume()
1625 .scope(&["foo".into(), "bar".into()])
1626 .expect("should create foobar consumer");
1627
1628 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1630 foo_consumer.assert_next_wait();
1631
1632 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1633 bar_consumer.assert_next_wait();
1634
1635 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1636 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1637 foobar_consumer.assert_next_wait();
1638 }
1639
1640 #[tokio::test]
1641 async fn test_select_with_empty_prefix() {
1642 let origin = Origin::random().produce();
1643 let broadcast1 = Broadcast::new().produce();
1644 let broadcast2 = Broadcast::new().produce();
1645
1646 let demo_producer = origin.with_root("demo").expect("should create demo root");
1648 let limited_producer = demo_producer
1649 .scope(&["worm-node".into(), "foobar".into()])
1650 .expect("should create limited producer");
1651
1652 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1654 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1655
1656 let mut consumer = limited_producer
1658 .consume()
1659 .scope(&["".into()])
1660 .expect("should create consumer with empty prefix");
1661
1662 let a1 = consumer.try_announced().expect("expected first announcement");
1664 let a2 = consumer.try_announced().expect("expected second announcement");
1665 consumer.assert_next_wait();
1666
1667 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1668 paths.sort();
1669 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1670 }
1671
1672 #[tokio::test]
1673 async fn test_select_narrowing_scope() {
1674 let origin = Origin::random().produce();
1675 let broadcast1 = Broadcast::new().produce();
1676 let broadcast2 = Broadcast::new().produce();
1677 let broadcast3 = Broadcast::new().produce();
1678
1679 let demo_producer = origin.with_root("demo").expect("should create demo root");
1681 let limited_producer = demo_producer
1682 .scope(&["worm-node".into(), "foobar".into()])
1683 .expect("should create limited producer");
1684
1685 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1687 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1688 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1689
1690 let mut worm_consumer = limited_producer
1692 .consume()
1693 .scope(&["worm-node".into()])
1694 .expect("should create worm-node consumer");
1695
1696 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1698 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1699 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1703 .consume()
1704 .scope(&["worm-node/foo".into()])
1705 .expect("should create worm-node/foo consumer");
1706
1707 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1708 foo_consumer.assert_next_wait(); }
1710
1711 #[tokio::test]
1712 async fn test_select_multiple_roots_with_empty_prefix() {
1713 let origin = Origin::random().produce();
1714 let broadcast1 = Broadcast::new().produce();
1715 let broadcast2 = Broadcast::new().produce();
1716 let broadcast3 = Broadcast::new().produce();
1717
1718 let limited_producer = origin
1720 .scope(&["app1".into(), "app2".into(), "shared".into()])
1721 .expect("should create limited producer");
1722
1723 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1725 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1726 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1727
1728 let mut consumer = limited_producer
1730 .consume()
1731 .scope(&["".into()])
1732 .expect("should create consumer with empty prefix");
1733
1734 consumer.assert_next("app1/data", &broadcast1.consume());
1736 consumer.assert_next("app2/config", &broadcast2.consume());
1737 consumer.assert_next("shared/resource", &broadcast3.consume());
1738 consumer.assert_next_wait();
1739 }
1740
1741 #[tokio::test]
1742 async fn test_publish_scope_with_empty_prefix() {
1743 let origin = Origin::random().produce();
1744 let broadcast = Broadcast::new().produce();
1745
1746 let limited_producer = origin
1748 .scope(&["services/api".into(), "services/web".into()])
1749 .expect("should create limited producer");
1750
1751 let same_producer = limited_producer
1753 .scope(&["".into()])
1754 .expect("should create producer with empty prefix");
1755
1756 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1758 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1759 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1760 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1761 }
1762
1763 #[tokio::test]
1764 async fn test_select_narrowing_to_deeper_path() {
1765 let origin = Origin::random().produce();
1766 let broadcast1 = Broadcast::new().produce();
1767 let broadcast2 = Broadcast::new().produce();
1768 let broadcast3 = Broadcast::new().produce();
1769
1770 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1772
1773 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1775 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1776 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1777
1778 let mut team2_consumer = limited_producer
1780 .consume()
1781 .scope(&["org/team2".into()])
1782 .expect("should create team2 consumer");
1783
1784 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1785 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1789 .consume()
1790 .scope(&["org/team1/project1".into()])
1791 .expect("should create project1 consumer");
1792
1793 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1795 project1_consumer.assert_next_wait();
1796 }
1797
1798 #[tokio::test]
1799 async fn test_select_with_non_matching_prefix() {
1800 let origin = Origin::random().produce();
1801
1802 let limited_producer = origin
1804 .scope(&["allowed/path".into()])
1805 .expect("should create limited producer");
1806
1807 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1809
1810 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1812 }
1813
1814 #[tokio::test]
1817 async fn test_with_root_trailing_slash_consumer() {
1818 let origin = Origin::random().produce();
1819
1820 let prefix = "some_prefix/".to_string();
1822 let mut consumer = origin.consume().with_root(prefix).unwrap();
1823
1824 let b = origin.create_broadcast("some_prefix/test").unwrap();
1825 consumer.assert_next("test", &b.consume());
1826 }
1827
1828 #[tokio::test]
1830 async fn test_with_root_trailing_slash_producer() {
1831 let origin = Origin::random().produce();
1832
1833 let prefix = "some_prefix/".to_string();
1835 let rooted = origin.with_root(prefix).unwrap();
1836
1837 let b = rooted.create_broadcast("test").unwrap();
1838
1839 let mut consumer = rooted.consume();
1840 consumer.assert_next("test", &b.consume());
1841 }
1842
1843 #[tokio::test]
1845 async fn test_with_root_trailing_slash_unannounce() {
1846 tokio::time::pause();
1847
1848 let origin = Origin::random().produce();
1849
1850 let prefix = "some_prefix/".to_string();
1851 let mut consumer = origin.consume().with_root(prefix).unwrap();
1852
1853 let b = origin.create_broadcast("some_prefix/test").unwrap();
1854 consumer.assert_next("test", &b.consume());
1855
1856 drop(b);
1858 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1859
1860 consumer.assert_next_none("test");
1862 }
1863
1864 #[tokio::test]
1865 async fn test_select_maintains_access_with_wider_prefix() {
1866 let origin = Origin::random().produce();
1867 let broadcast1 = Broadcast::new().produce();
1868 let broadcast2 = Broadcast::new().produce();
1869
1870 let demo_producer = origin.with_root("demo").expect("should create demo root");
1872 let user_producer = demo_producer
1873 .scope(&["worm-node".into(), "foobar".into()])
1874 .expect("should create user producer");
1875
1876 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1878 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1879
1880 let mut consumer = user_producer
1882 .consume()
1883 .scope(&["".into()])
1884 .expect("scope with empty prefix should not fail when user has specific permissions");
1885
1886 let a1 = consumer.try_announced().expect("expected first announcement");
1888 let a2 = consumer.try_announced().expect("expected second announcement");
1889 consumer.assert_next_wait();
1890
1891 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1892 paths.sort();
1893 assert_eq!(paths, ["foobar", "worm-node/data"]);
1894
1895 let mut narrow_consumer = user_producer
1897 .consume()
1898 .scope(&["worm-node".into()])
1899 .expect("should be able to narrow scope to worm-node");
1900
1901 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1902 narrow_consumer.assert_next_wait(); }
1904
1905 #[tokio::test]
1906 async fn test_duplicate_prefixes_deduped() {
1907 let origin = Origin::random().produce();
1908 let broadcast = Broadcast::new().produce();
1909
1910 let producer = origin
1912 .scope(&["demo".into(), "demo".into()])
1913 .expect("should create producer");
1914
1915 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1916
1917 let mut consumer = producer.consume();
1918 consumer.assert_next("demo/stream", &broadcast.consume());
1919 consumer.assert_next_wait();
1920 }
1921
1922 #[tokio::test]
1923 async fn test_overlapping_prefixes_deduped() {
1924 let origin = Origin::random().produce();
1925 let broadcast = Broadcast::new().produce();
1926
1927 let producer = origin
1929 .scope(&["demo".into(), "demo/foo".into()])
1930 .expect("should create producer");
1931
1932 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1934
1935 let mut consumer = producer.consume();
1936 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1937 consumer.assert_next_wait();
1938 }
1939
1940 #[tokio::test]
1941 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1942 let origin = Origin::random().produce();
1943 let broadcast = Broadcast::new().produce();
1944
1945 let producer = origin
1947 .scope(&["demo".into(), "demo/foo".into()])
1948 .expect("should create producer");
1949
1950 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1951
1952 let mut consumer = producer.consume();
1953 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1955 consumer.assert_next_wait();
1956 }
1957
1958 #[tokio::test]
1959 async fn test_allowed_returns_deduped_prefixes() {
1960 let origin = Origin::random().produce();
1961
1962 let producer = origin
1963 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1964 .expect("should create producer");
1965
1966 let allowed: Vec<_> = producer.allowed().collect();
1967 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1968 }
1969
1970 #[tokio::test]
1971 async fn test_announced_broadcast_already_announced() {
1972 let origin = Origin::random().produce();
1973 let broadcast = Broadcast::new().produce();
1974
1975 origin.publish_broadcast("test", broadcast.consume());
1976
1977 let consumer = origin.consume();
1978 let result = consumer.announced_broadcast("test").await.expect("should find it");
1979 assert!(result.is_clone(&broadcast.consume()));
1980 }
1981
1982 #[tokio::test]
1983 async fn test_announced_broadcast_delayed() {
1984 tokio::time::pause();
1985
1986 let origin = Origin::random().produce();
1987 let broadcast = Broadcast::new().produce();
1988
1989 let consumer = origin.consume();
1990
1991 let wait = tokio::spawn({
1993 let consumer = consumer.clone();
1994 async move { consumer.announced_broadcast("test").await }
1995 });
1996
1997 tokio::task::yield_now().await;
1999
2000 origin.publish_broadcast("test", broadcast.consume());
2001
2002 let result = wait.await.unwrap().expect("should find it");
2003 assert!(result.is_clone(&broadcast.consume()));
2004 }
2005
2006 #[tokio::test]
2007 async fn test_announced_broadcast_ignores_unrelated_paths() {
2008 tokio::time::pause();
2009
2010 let origin = Origin::random().produce();
2011 let other = Broadcast::new().produce();
2012 let target = Broadcast::new().produce();
2013
2014 let consumer = origin.consume();
2015
2016 let wait = tokio::spawn({
2017 let consumer = consumer.clone();
2018 async move { consumer.announced_broadcast("target").await }
2019 });
2020
2021 tokio::task::yield_now().await;
2022
2023 origin.publish_broadcast("other", other.consume());
2025 tokio::task::yield_now().await;
2026 assert!(!wait.is_finished(), "must not resolve on unrelated path");
2027
2028 origin.publish_broadcast("target", target.consume());
2029 let result = wait.await.unwrap().expect("should find target");
2030 assert!(result.is_clone(&target.consume()));
2031 }
2032
2033 #[tokio::test]
2034 async fn test_announced_broadcast_skips_nested_paths() {
2035 tokio::time::pause();
2036
2037 let origin = Origin::random().produce();
2038 let nested = Broadcast::new().produce();
2039 let exact = Broadcast::new().produce();
2040
2041 let consumer = origin.consume();
2042
2043 let wait = tokio::spawn({
2044 let consumer = consumer.clone();
2045 async move { consumer.announced_broadcast("foo").await }
2046 });
2047
2048 tokio::task::yield_now().await;
2049
2050 origin.publish_broadcast("foo/bar", nested.consume());
2052 tokio::task::yield_now().await;
2053 assert!(!wait.is_finished(), "must not resolve on a nested path");
2054
2055 origin.publish_broadcast("foo", exact.consume());
2056 let result = wait.await.unwrap().expect("should find foo exactly");
2057 assert!(result.is_clone(&exact.consume()));
2058 }
2059
2060 #[tokio::test]
2061 async fn test_announced_broadcast_disallowed() {
2062 let origin = Origin::random().produce();
2063 let limited = origin
2064 .consume()
2065 .scope(&["allowed".into()])
2066 .expect("should create limited");
2067
2068 assert!(limited.announced_broadcast("notallowed").await.is_none());
2070 }
2071
2072 #[tokio::test]
2073 async fn test_announced_broadcast_scope_too_narrow() {
2074 let origin = Origin::random().produce();
2077 let limited = origin
2078 .consume()
2079 .scope(&["foo/specific".into()])
2080 .expect("should create limited");
2081
2082 let result = limited
2084 .announced_broadcast("foo")
2085 .now_or_never()
2086 .expect("must not block");
2087 assert!(result.is_none());
2088 }
2089
2090 #[tokio::test]
2094 async fn test_coalesce_announce_then_unannounce() {
2095 tokio::time::pause();
2097
2098 let origin = Origin::random().produce();
2099 let mut consumer = origin.consume();
2100
2101 let broadcast = Broadcast::new().produce();
2102 origin.publish_broadcast("test", broadcast.consume());
2103 drop(broadcast);
2104
2105 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2106
2107 consumer.assert_next_wait();
2108 }
2109
2110 #[tokio::test]
2111 async fn test_coalesce_announce_unannounce_announce() {
2112 tokio::time::pause();
2115
2116 let origin = Origin::random().produce();
2117 let mut consumer = origin.consume();
2118
2119 let broadcast1 = Broadcast::new().produce();
2120 let broadcast2 = Broadcast::new().produce();
2121
2122 origin.publish_broadcast("test", broadcast1.consume());
2123 drop(broadcast1);
2124 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2125 origin.publish_broadcast("test", broadcast2.consume());
2126
2127 consumer.assert_next("test", &broadcast2.consume());
2128 consumer.assert_next_wait();
2129 }
2130
2131 #[tokio::test]
2132 async fn test_coalesce_unannounce_announce_preserved() {
2133 tokio::time::pause();
2136
2137 let origin = Origin::random().produce();
2138 let broadcast1 = Broadcast::new().produce();
2139 origin.publish_broadcast("test", broadcast1.consume());
2140
2141 let mut consumer = origin.consume();
2142 consumer.assert_next("test", &broadcast1.consume());
2143
2144 drop(broadcast1);
2146 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2147
2148 let broadcast2 = Broadcast::new().produce();
2149 origin.publish_broadcast("test", broadcast2.consume());
2150
2151 consumer.assert_next_none("test");
2153 consumer.assert_next("test", &broadcast2.consume());
2154 consumer.assert_next_wait();
2155 }
2156
2157 #[tokio::test]
2158 async fn test_coalesce_unannounce_announce_unannounce() {
2159 tokio::time::pause();
2162
2163 let origin = Origin::random().produce();
2164 let broadcast1 = Broadcast::new().produce();
2165 origin.publish_broadcast("test", broadcast1.consume());
2166
2167 let mut consumer = origin.consume();
2168 consumer.assert_next("test", &broadcast1.consume());
2169
2170 drop(broadcast1);
2171 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2172
2173 let broadcast2 = Broadcast::new().produce();
2174 origin.publish_broadcast("test", broadcast2.consume());
2175 drop(broadcast2);
2176 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2177
2178 consumer.assert_next_none("test");
2179 consumer.assert_next_wait();
2180 }
2181
2182 #[tokio::test]
2183 async fn test_coalesce_churn_bounded() {
2184 tokio::time::pause();
2189
2190 let origin = Origin::random().produce();
2191 let mut consumer = origin.consume();
2192
2193 for _ in 0..1000 {
2194 let broadcast = Broadcast::new().produce();
2195 origin.publish_broadcast("test", broadcast.consume());
2196 drop(broadcast);
2197 }
2198 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2199
2200 let mut collected = Vec::new();
2201 while let Some(update) = consumer.try_announced() {
2202 collected.push(update);
2203 }
2204 assert!(
2205 collected.len() <= 1,
2206 "expected at most one pending update, got {}",
2207 collected.len()
2208 );
2209 assert!(
2210 collected.iter().all(|(path, _)| path == &Path::new("test")),
2211 "unexpected path in pending updates",
2212 );
2213 }
2214}