1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5 task::Poll,
6};
7
8use rand::Rng;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn contains(&self, origin: &Origin) -> bool {
140 self.0.contains(origin)
141 }
142
143 pub fn len(&self) -> usize {
145 self.0.len()
146 }
147
148 pub fn is_empty(&self) -> bool {
150 self.0.is_empty()
151 }
152
153 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155 self.0.iter()
156 }
157
158 pub fn as_slice(&self) -> &[Origin] {
160 &self.0
161 }
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165 type Error = TooManyOrigins;
166
167 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168 if v.len() > MAX_HOPS {
169 return Err(TooManyOrigins);
170 }
171 Ok(Self(v))
172 }
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176 type Item = &'a Origin;
177 type IntoIter = std::slice::Iter<'a, Origin>;
178
179 fn into_iter(self) -> Self::IntoIter {
180 self.iter()
181 }
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186 u64: Encode<V>,
187 Origin: Encode<V>,
188{
189 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190 (self.0.len() as u64).encode(w, version)?;
191 for origin in &self.0 {
192 origin.encode(w, version)?;
193 }
194 Ok(())
195 }
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200 u64: Decode<V>,
201 Origin: Decode<V>,
202{
203 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204 let count = u64::decode(r, version)? as usize;
205 if count > MAX_HOPS {
206 return Err(DecodeError::BoundsExceeded);
207 }
208 let mut list = Vec::with_capacity(count);
209 for _ in 0..count {
210 list.push(Origin::decode(r, version)?);
211 }
212 Ok(Self(list))
213 }
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222 fn new() -> Self {
223 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224 }
225}
226
227struct OriginBroadcast {
229 path: PathOwned,
230 active: BroadcastConsumer,
231 backup: VecDeque<BroadcastConsumer>,
232}
233
234enum PendingUpdate {
243 Announce(BroadcastConsumer),
244 Unannounce,
245 UnannounceAnnounce(BroadcastConsumer),
246}
247
248#[derive(Default)]
253struct OriginConsumerState {
254 pending: BTreeMap<PathOwned, PendingUpdate>,
255}
256
257impl OriginConsumerState {
258 fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
259 let new = match self.pending.remove(&path) {
260 None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
262 Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
264 PendingUpdate::UnannounceAnnounce(broadcast)
265 }
266 };
267 self.pending.insert(path, new);
268 }
269
270 fn apply_unannounce(&mut self, path: PathOwned) {
271 match self.pending.remove(&path) {
272 Some(PendingUpdate::Announce(_)) => {}
274 None | Some(PendingUpdate::Unannounce) => {
275 self.pending.insert(path, PendingUpdate::Unannounce);
276 }
277 Some(PendingUpdate::UnannounceAnnounce(_)) => {
280 self.pending.insert(path, PendingUpdate::Unannounce);
281 }
282 }
283 }
284
285 fn take(&mut self) -> Option<OriginAnnounce> {
287 let path = self.pending.keys().next()?.clone();
288 match self.pending.remove(&path).unwrap() {
289 PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
290 PendingUpdate::Unannounce => Some((path, None)),
291 PendingUpdate::UnannounceAnnounce(broadcast) => {
292 self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
295 Some((path, None))
296 }
297 }
298 }
299}
300
301#[derive(Clone)]
302struct OriginConsumerNotify {
303 root: PathOwned,
304 state: conducer::Producer<OriginConsumerState>,
305}
306
307impl OriginConsumerNotify {
308 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
309 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
310 self.state
311 .write()
312 .ok()
313 .expect("consumer closed")
314 .apply_announce(path, broadcast);
315 }
316
317 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
318 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
319 let mut state = self.state.write().ok().expect("consumer closed");
320 state.apply_unannounce(path.clone());
321 state.apply_announce(path, broadcast);
322 }
323
324 fn unannounce(&self, path: impl AsPath) {
325 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
326 self.state.write().ok().expect("consumer closed").apply_unannounce(path);
327 }
328}
329
330struct NotifyNode {
331 parent: Option<Lock<NotifyNode>>,
332
333 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
336}
337
338impl NotifyNode {
339 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
340 Self {
341 parent,
342 consumers: HashMap::new(),
343 }
344 }
345
346 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
347 for consumer in self.consumers.values() {
348 consumer.announce(path.as_path(), broadcast.clone());
349 }
350
351 if let Some(parent) = &self.parent {
352 parent.lock().announce(path, broadcast);
353 }
354 }
355
356 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
357 for consumer in self.consumers.values() {
358 consumer.reannounce(path.as_path(), broadcast.clone());
359 }
360
361 if let Some(parent) = &self.parent {
362 parent.lock().reannounce(path, broadcast);
363 }
364 }
365
366 fn unannounce(&mut self, path: impl AsPath) {
367 for consumer in self.consumers.values() {
368 consumer.unannounce(path.as_path());
369 }
370
371 if let Some(parent) = &self.parent {
372 parent.lock().unannounce(path);
373 }
374 }
375}
376
377struct OriginNode {
378 broadcast: Option<OriginBroadcast>,
380
381 nested: HashMap<String, Lock<OriginNode>>,
383
384 notify: Lock<NotifyNode>,
386}
387
388impl OriginNode {
389 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
390 Self {
391 broadcast: None,
392 nested: HashMap::new(),
393 notify: Lock::new(NotifyNode::new(parent)),
394 }
395 }
396
397 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
398 let (dir, rest) = path.next_part().expect("leaf called with empty path");
399
400 let next = self.entry(dir);
401 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
402 }
403
404 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
405 match self.nested.get(dir) {
406 Some(next) => next.clone(),
407 None => {
408 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
409 self.nested.insert(dir.to_string(), next.clone());
410 next
411 }
412 }
413 }
414
415 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
416 let full = full.as_path();
417 let rest = relative.as_path();
418
419 if let Some((dir, relative)) = rest.next_part() {
421 self.entry(dir).lock().publish(&full, broadcast, &relative);
423 } else if let Some(existing) = &mut self.broadcast {
424 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
431 return;
432 }
433
434 if broadcast.hops.len() <= existing.active.hops.len() {
435 let old = existing.active.clone();
436 existing.active = broadcast.clone();
437 existing.backup.push_back(old);
438
439 self.notify.lock().reannounce(full, broadcast);
440 } else {
441 existing.backup.push_back(broadcast.clone());
443 }
444 } else {
445 self.broadcast = Some(OriginBroadcast {
447 path: full.to_owned(),
448 active: broadcast.clone(),
449 backup: VecDeque::new(),
450 });
451 self.notify.lock().announce(full, broadcast);
452 }
453 }
454
455 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
456 self.consume_initial(&mut notify);
457 self.notify.lock().consumers.insert(id, notify);
458 }
459
460 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
461 if let Some(broadcast) = &self.broadcast {
462 notify.announce(&broadcast.path, broadcast.active.clone());
463 }
464
465 for nested in self.nested.values() {
467 nested.lock().consume_initial(notify);
468 }
469 }
470
471 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
472 let rest = rest.as_path();
473
474 if let Some((dir, rest)) = rest.next_part() {
475 let node = self.nested.get(dir)?.lock();
476 node.consume_broadcast(&rest)
477 } else {
478 self.broadcast.as_ref().map(|b| b.active.clone())
479 }
480 }
481
482 fn unconsume(&mut self, id: ConsumerId) {
483 self.notify.lock().consumers.remove(&id).expect("consumer not found");
484 if self.is_empty() {
485 }
488 }
489
490 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
492 let full = full.as_path();
493 let relative = relative.as_path();
494
495 if let Some((dir, relative)) = relative.next_part() {
496 let nested = self.entry(dir);
497 let mut locked = nested.lock();
498 locked.remove(&full, broadcast, &relative);
499
500 if locked.is_empty() {
501 drop(locked);
502 self.nested.remove(dir);
503 }
504 } else {
505 let entry = match &mut self.broadcast {
506 Some(existing) => existing,
507 None => return,
508 };
509
510 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
512 if let Some(pos) = pos {
513 entry.backup.remove(pos);
514 return;
516 }
517
518 assert!(entry.active.is_clone(&broadcast));
520
521 let best = entry
524 .backup
525 .iter()
526 .enumerate()
527 .min_by_key(|(_, b)| b.hops.len())
528 .map(|(i, _)| i);
529 if let Some(idx) = best {
530 let active = entry.backup.remove(idx).expect("index in range");
531 entry.active = active;
532 self.notify.lock().reannounce(full, &entry.active);
533 } else {
534 self.broadcast = None;
536 self.notify.lock().unannounce(full);
537 }
538 }
539 }
540
541 fn is_empty(&self) -> bool {
542 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
543 }
544}
545
546#[derive(Clone)]
547struct OriginNodes {
548 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
549}
550
551impl OriginNodes {
552 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
555 let mut roots = Vec::new();
556
557 for (root, state) in &self.nodes {
558 for prefix in prefixes {
559 if root.has_prefix(prefix) {
560 roots.push((root.to_owned(), state.clone()));
562 continue;
563 }
564
565 if let Some(suffix) = prefix.strip_prefix(root) {
566 let nested = state.lock().leaf(&suffix);
568 roots.push((prefix.to_owned(), nested));
569 }
570 }
571 }
572
573 if roots.is_empty() {
574 None
575 } else {
576 Some(Self { nodes: roots })
577 }
578 }
579
580 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
581 let new_root = new_root.as_path();
582 let mut roots = Vec::new();
583
584 if new_root.is_empty() {
585 return Some(self.clone());
586 }
587
588 for (root, state) in &self.nodes {
589 if let Some(suffix) = root.strip_prefix(&new_root) {
590 roots.push((suffix.to_owned(), state.clone()));
592 } else if let Some(suffix) = new_root.strip_prefix(root) {
593 let nested = state.lock().leaf(&suffix);
596 roots.push(("".into(), nested));
597 }
598 }
599
600 if roots.is_empty() {
601 None
602 } else {
603 Some(Self { nodes: roots })
604 }
605 }
606
607 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
609 let path = path.as_path();
610
611 for (root, state) in &self.nodes {
612 if let Some(suffix) = path.strip_prefix(root) {
613 return Some((state.clone(), suffix.to_owned()));
614 }
615 }
616
617 None
618 }
619}
620
621impl Default for OriginNodes {
622 fn default() -> Self {
623 Self {
624 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
625 }
626 }
627}
628
629pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
631
632#[derive(Clone)]
634pub struct OriginProducer {
635 info: Origin,
639
640 nodes: OriginNodes,
643
644 root: PathOwned,
646}
647
648impl std::ops::Deref for OriginProducer {
649 type Target = Origin;
650
651 fn deref(&self) -> &Self::Target {
652 &self.info
653 }
654}
655
656impl OriginProducer {
657 pub fn new(info: Origin) -> Self {
660 Self {
661 info,
662 nodes: OriginNodes::default(),
663 root: PathOwned::default(),
664 }
665 }
666
667 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
672 let broadcast = Broadcast::new().produce();
673 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
674 }
675
676 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
686 let path = path.as_path();
687
688 if broadcast.hops.contains(&self.info) {
690 return false;
691 }
692
693 let (root, rest) = match self.nodes.get(&path) {
694 Some(root) => root,
695 None => return false,
696 };
697
698 let full = self.root.join(&path);
699
700 root.lock().publish(&full, &broadcast, &rest);
701 let root = root.clone();
702
703 web_async::spawn(async move {
704 broadcast.closed().await;
705 root.lock().remove(&full, broadcast, &rest);
706 });
707
708 true
709 }
710
711 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
717 let prefixes = PathPrefixes::new(prefixes);
718 Some(OriginProducer {
719 info: self.info,
720 nodes: self.nodes.select(&prefixes)?,
721 root: self.root.clone(),
722 })
723 }
724
725 pub fn consume(&self) -> OriginConsumer {
727 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
728 }
729
730 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
735 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
736 let path = path.as_path();
737 let (root, rest) = self.nodes.get(&path)?;
738 let state = root.lock();
739 state.consume_broadcast(&rest)
740 }
741
742 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
747 let prefix = prefix.as_path();
748
749 Some(Self {
750 info: self.info,
751 root: self.root.join(&prefix).to_owned(),
752 nodes: self.nodes.root(&prefix)?,
753 })
754 }
755
756 pub fn root(&self) -> &Path<'_> {
758 &self.root
759 }
760
761 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
764 self.nodes.nodes.iter().map(|(root, _)| root)
765 }
766
767 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
769 self.root.join(path)
770 }
771}
772
773pub struct OriginConsumer {
777 id: ConsumerId,
778 info: Origin,
780 nodes: OriginNodes,
781
782 state: conducer::Producer<OriginConsumerState>,
785
786 root: PathOwned,
788}
789
790impl std::ops::Deref for OriginConsumer {
791 type Target = Origin;
792
793 fn deref(&self) -> &Self::Target {
794 &self.info
795 }
796}
797
798impl OriginConsumer {
799 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
800 let state = conducer::Producer::<OriginConsumerState>::default();
801 let id = ConsumerId::new();
802
803 for (_, node) in &nodes.nodes {
804 let notify = OriginConsumerNotify {
805 root: root.clone(),
806 state: state.clone(),
807 };
808 node.lock().consume(id, notify);
809 }
810
811 Self {
812 id,
813 info,
814 nodes,
815 state,
816 root,
817 }
818 }
819
820 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
828 conducer::wait(|waiter| self.poll_announced(waiter)).await
829 }
830
831 pub fn poll_announced(&mut self, waiter: &conducer::Waiter) -> Poll<Option<OriginAnnounce>> {
837 match self.state.poll(waiter, |state| match state.take() {
838 Some(item) => Poll::Ready(item),
839 None => Poll::Pending,
840 }) {
841 Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
842 Poll::Ready(Err(_)) => Poll::Ready(None),
844 Poll::Pending => Poll::Pending,
845 }
846 }
847
848 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
853 self.state.write().ok()?.take()
854 }
855
856 pub fn consume(&self) -> Self {
858 self.clone()
859 }
860
861 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
869 let path = path.as_path();
870 let (root, rest) = self.nodes.get(&path)?;
871 let state = root.lock();
872 state.consume_broadcast(&rest)
873 }
874
875 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
884 let path = path.as_path();
885
886 let mut consumer = self.scope(std::slice::from_ref(&path))?;
888
889 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
893 return None;
894 }
895
896 loop {
897 let (announced, broadcast) = consumer.announced().await?;
898 if announced.as_path() == path {
900 if let Some(broadcast) = broadcast {
901 return Some(broadcast);
902 }
903 }
904 }
905 }
906
907 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
913 let prefixes = PathPrefixes::new(prefixes);
914 Some(OriginConsumer::new(
915 self.info,
916 self.root.clone(),
917 self.nodes.select(&prefixes)?,
918 ))
919 }
920
921 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
926 let prefix = prefix.as_path();
927
928 Some(Self::new(
929 self.info,
930 self.root.join(&prefix).to_owned(),
931 self.nodes.root(&prefix)?,
932 ))
933 }
934
935 pub fn root(&self) -> &Path<'_> {
937 &self.root
938 }
939
940 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
943 self.nodes.nodes.iter().map(|(root, _)| root)
944 }
945
946 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
948 self.root.join(path)
949 }
950}
951
952impl Drop for OriginConsumer {
953 fn drop(&mut self) {
954 for (_, root) in &self.nodes.nodes {
955 root.lock().unconsume(self.id);
956 }
957 }
958}
959
960impl Clone for OriginConsumer {
961 fn clone(&self) -> Self {
962 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
963 }
964}
965
966#[cfg(test)]
967use futures::FutureExt;
968
969#[cfg(test)]
970impl OriginConsumer {
971 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
972 let expected = expected.as_path();
973 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
974 assert_eq!(path, expected, "wrong path");
975 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
976 }
977
978 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
979 let expected = expected.as_path();
980 let (path, active) = self.try_announced().expect("no next");
981 assert_eq!(path, expected, "wrong path");
982 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
983 }
984
985 pub fn assert_next_none(&mut self, expected: impl AsPath) {
986 let expected = expected.as_path();
987 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
988 assert_eq!(path, expected, "wrong path");
989 assert!(active.is_none(), "should be unannounced");
990 }
991
992 pub fn assert_next_wait(&mut self) {
993 if let Some(res) = self.announced().now_or_never() {
994 panic!("next should block: got {:?}", res.map(|(path, _)| path));
995 }
996 }
997
998 }
1007
1008#[cfg(test)]
1009mod tests {
1010 use crate::Broadcast;
1011
1012 use super::*;
1013
1014 #[test]
1015 fn origin_list_push_fails_at_limit() {
1016 let mut list = OriginList::new();
1017 for _ in 0..MAX_HOPS {
1018 list.push(Origin::random()).unwrap();
1019 }
1020 assert_eq!(list.len(), MAX_HOPS);
1021 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1022 }
1023
1024 #[test]
1025 fn origin_list_try_from_vec_enforces_limit() {
1026 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1027 assert!(OriginList::try_from(under).is_ok());
1028
1029 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1030 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1031 }
1032
1033 #[tokio::test]
1034 async fn test_announce() {
1035 tokio::time::pause();
1036
1037 let origin = Origin::random().produce();
1038 let broadcast1 = Broadcast::new().produce();
1039 let broadcast2 = Broadcast::new().produce();
1040
1041 let mut consumer1 = origin.consume();
1042 consumer1.assert_next_wait();
1044
1045 origin.publish_broadcast("test1", broadcast1.consume());
1047
1048 consumer1.assert_next("test1", &broadcast1.consume());
1049 consumer1.assert_next_wait();
1050
1051 let mut consumer2 = origin.consume();
1054
1055 origin.publish_broadcast("test2", broadcast2.consume());
1057
1058 consumer1.assert_next("test2", &broadcast2.consume());
1059 consumer1.assert_next_wait();
1060
1061 consumer2.assert_next("test1", &broadcast1.consume());
1062 consumer2.assert_next("test2", &broadcast2.consume());
1063 consumer2.assert_next_wait();
1064
1065 drop(broadcast1);
1067
1068 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1070
1071 consumer1.assert_next_none("test1");
1073 consumer2.assert_next_none("test1");
1074 consumer1.assert_next_wait();
1075 consumer2.assert_next_wait();
1076
1077 let mut consumer3 = origin.consume();
1079 consumer3.assert_next("test2", &broadcast2.consume());
1080 consumer3.assert_next_wait();
1081
1082 drop(broadcast2);
1084
1085 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1087
1088 consumer1.assert_next_none("test2");
1089 consumer2.assert_next_none("test2");
1090 consumer3.assert_next_none("test2");
1091
1092 }
1098
1099 #[tokio::test]
1100 async fn test_duplicate() {
1101 tokio::time::pause();
1102
1103 let origin = Origin::random().produce();
1104
1105 let broadcast1 = Broadcast::new().produce();
1106 let broadcast2 = Broadcast::new().produce();
1107 let broadcast3 = Broadcast::new().produce();
1108
1109 let consumer1 = broadcast1.consume();
1110 let consumer2 = broadcast2.consume();
1111 let consumer3 = broadcast3.consume();
1112
1113 let mut consumer = origin.consume();
1114
1115 origin.publish_broadcast("test", consumer1.clone());
1116 origin.publish_broadcast("test", consumer2.clone());
1117 origin.publish_broadcast("test", consumer3.clone());
1118 assert!(consumer.get_broadcast("test").is_some());
1119
1120 consumer.assert_next("test", &consumer3);
1125 consumer.assert_next_wait();
1126
1127 drop(broadcast2);
1129
1130 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1132
1133 assert!(consumer.get_broadcast("test").is_some());
1134 consumer.assert_next_wait();
1135
1136 drop(broadcast3);
1138
1139 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1141
1142 assert!(consumer.get_broadcast("test").is_some());
1143 consumer.assert_next_none("test");
1144 consumer.assert_next("test", &consumer1);
1145
1146 drop(broadcast1);
1148
1149 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1151 assert!(consumer.get_broadcast("test").is_none());
1152
1153 consumer.assert_next_none("test");
1154 consumer.assert_next_wait();
1155 }
1156
1157 #[tokio::test]
1158 async fn test_duplicate_reverse() {
1159 tokio::time::pause();
1160
1161 let origin = Origin::random().produce();
1162 let broadcast1 = Broadcast::new().produce();
1163 let broadcast2 = Broadcast::new().produce();
1164
1165 origin.publish_broadcast("test", broadcast1.consume());
1166 origin.publish_broadcast("test", broadcast2.consume());
1167 assert!(origin.consume().get_broadcast("test").is_some());
1168
1169 drop(broadcast2);
1171
1172 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1174 assert!(origin.consume().get_broadcast("test").is_some());
1175
1176 drop(broadcast1);
1177
1178 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1180 assert!(origin.consume().get_broadcast("test").is_none());
1181 }
1182
1183 #[tokio::test]
1184 async fn test_double_publish() {
1185 tokio::time::pause();
1186
1187 let origin = Origin::random().produce();
1188 let broadcast = Broadcast::new().produce();
1189
1190 origin.publish_broadcast("test", broadcast.consume());
1192 origin.publish_broadcast("test", broadcast.consume());
1193
1194 assert!(origin.consume().get_broadcast("test").is_some());
1195
1196 drop(broadcast);
1197
1198 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1200 assert!(origin.consume().get_broadcast("test").is_none());
1201 }
1202 #[tokio::test]
1207 async fn test_many_announces() {
1208 let origin = Origin::random().produce();
1209 let broadcast = Broadcast::new().produce();
1210
1211 let mut consumer = origin.consume();
1212 for i in 0..256 {
1213 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1214 }
1215
1216 for i in 0..256 {
1217 consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1218 }
1219 consumer.assert_next_wait();
1220 }
1221
1222 #[tokio::test]
1223 async fn test_many_announces_try() {
1224 let origin = Origin::random().produce();
1225 let broadcast = Broadcast::new().produce();
1226
1227 let mut consumer = origin.consume();
1228 for i in 0..256 {
1229 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1230 }
1231
1232 for i in 0..256 {
1233 consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1234 }
1235 }
1236
1237 #[tokio::test]
1238 async fn test_with_root_basic() {
1239 let origin = Origin::random().produce();
1240 let broadcast = Broadcast::new().produce();
1241
1242 let foo_producer = origin.with_root("foo").expect("should create root");
1244 assert_eq!(foo_producer.root().as_str(), "foo");
1245
1246 let mut consumer = origin.consume();
1247
1248 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1250 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1252
1253 let mut foo_consumer = foo_producer.consume();
1255 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1256 }
1257
1258 #[tokio::test]
1259 async fn test_with_root_nested() {
1260 let origin = Origin::random().produce();
1261 let broadcast = Broadcast::new().produce();
1262
1263 let foo_producer = origin.with_root("foo").expect("should create foo root");
1265 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1266 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1267
1268 let mut consumer = origin.consume();
1269
1270 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1272 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1274
1275 let mut foo_bar_consumer = foo_bar_producer.consume();
1277 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1278 }
1279
1280 #[tokio::test]
1281 async fn test_publish_scope_allows() {
1282 let origin = Origin::random().produce();
1283 let broadcast = Broadcast::new().produce();
1284
1285 let limited_producer = origin
1287 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1288 .expect("should create limited producer");
1289
1290 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1292 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1293 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1294
1295 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1297 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1299 }
1300
1301 #[tokio::test]
1302 async fn test_publish_scope_empty() {
1303 let origin = Origin::random().produce();
1304
1305 assert!(origin.scope(&[]).is_none());
1307 }
1308
1309 #[tokio::test]
1310 async fn test_consume_scope_filters() {
1311 let origin = Origin::random().produce();
1312 let broadcast1 = Broadcast::new().produce();
1313 let broadcast2 = Broadcast::new().produce();
1314 let broadcast3 = Broadcast::new().produce();
1315
1316 let mut consumer = origin.consume();
1317
1318 origin.publish_broadcast("allowed", broadcast1.consume());
1320 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1321 origin.publish_broadcast("notallowed", broadcast3.consume());
1322
1323 let mut limited_consumer = origin
1325 .consume()
1326 .scope(&["allowed".into()])
1327 .expect("should create limited consumer");
1328
1329 limited_consumer.assert_next("allowed", &broadcast1.consume());
1331 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1332 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1336 consumer.assert_next("allowed/nested", &broadcast2.consume());
1337 consumer.assert_next("notallowed", &broadcast3.consume());
1338 }
1339
1340 #[tokio::test]
1341 async fn test_consume_scope_multiple_prefixes() {
1342 let origin = Origin::random().produce();
1343 let broadcast1 = Broadcast::new().produce();
1344 let broadcast2 = Broadcast::new().produce();
1345 let broadcast3 = Broadcast::new().produce();
1346
1347 origin.publish_broadcast("foo/test", broadcast1.consume());
1348 origin.publish_broadcast("bar/test", broadcast2.consume());
1349 origin.publish_broadcast("baz/test", broadcast3.consume());
1350
1351 let mut limited_consumer = origin
1353 .consume()
1354 .scope(&["foo".into(), "bar".into()])
1355 .expect("should create limited consumer");
1356
1357 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1359 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1360 limited_consumer.assert_next_wait(); }
1362
1363 #[tokio::test]
1364 async fn test_with_root_and_publish_scope() {
1365 let origin = Origin::random().produce();
1366 let broadcast = Broadcast::new().produce();
1367
1368 let foo_producer = origin.with_root("foo").expect("should create foo root");
1370
1371 let limited_producer = foo_producer
1373 .scope(&["bar".into(), "goop/pee".into()])
1374 .expect("should create limited producer");
1375
1376 let mut consumer = origin.consume();
1377
1378 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1380 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1381 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1382 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1383
1384 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1386 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1388
1389 consumer.assert_next("foo/bar", &broadcast.consume());
1391 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1392 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1393 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1394 }
1395
1396 #[tokio::test]
1397 async fn test_with_root_and_consume_scope() {
1398 let origin = Origin::random().produce();
1399 let broadcast1 = Broadcast::new().produce();
1400 let broadcast2 = Broadcast::new().produce();
1401 let broadcast3 = Broadcast::new().produce();
1402
1403 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1405 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1406 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1407
1408 let foo_producer = origin.with_root("foo").expect("should create foo root");
1410
1411 let mut limited_consumer = foo_producer
1413 .consume()
1414 .scope(&["bar".into(), "goop/pee".into()])
1415 .expect("should create limited consumer");
1416
1417 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1419 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1420 limited_consumer.assert_next_wait(); }
1422
1423 #[tokio::test]
1424 async fn test_with_root_unauthorized() {
1425 let origin = Origin::random().produce();
1426
1427 let limited_producer = origin
1429 .scope(&["allowed".into()])
1430 .expect("should create limited producer");
1431
1432 assert!(limited_producer.with_root("notallowed").is_none());
1434
1435 let allowed_root = limited_producer
1437 .with_root("allowed")
1438 .expect("should create allowed root");
1439 assert_eq!(allowed_root.root().as_str(), "allowed");
1440 }
1441
1442 #[tokio::test]
1443 async fn test_wildcard_permission() {
1444 let origin = Origin::random().produce();
1445 let broadcast = Broadcast::new().produce();
1446
1447 let root_producer = origin.clone();
1449
1450 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1452 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1453
1454 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1456 assert_eq!(foo_producer.root().as_str(), "foo");
1457 }
1458
1459 #[tokio::test]
1460 async fn test_consume_broadcast_with_permissions() {
1461 let origin = Origin::random().produce();
1462 let broadcast1 = Broadcast::new().produce();
1463 let broadcast2 = Broadcast::new().produce();
1464
1465 origin.publish_broadcast("allowed/test", broadcast1.consume());
1466 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1467
1468 let limited_consumer = origin
1470 .consume()
1471 .scope(&["allowed".into()])
1472 .expect("should create limited consumer");
1473
1474 let result = limited_consumer.get_broadcast("allowed/test");
1476 assert!(result.is_some());
1477 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1478
1479 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1481
1482 let consumer = origin.consume();
1484 assert!(consumer.get_broadcast("allowed/test").is_some());
1485 assert!(consumer.get_broadcast("notallowed/test").is_some());
1486 }
1487
1488 #[tokio::test]
1489 async fn test_nested_paths_with_permissions() {
1490 let origin = Origin::random().produce();
1491 let broadcast = Broadcast::new().produce();
1492
1493 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1495
1496 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1498 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1499 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1500
1501 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1503 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1504 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1505 }
1506
1507 #[tokio::test]
1508 async fn test_multiple_consumers_with_different_permissions() {
1509 let origin = Origin::random().produce();
1510 let broadcast1 = Broadcast::new().produce();
1511 let broadcast2 = Broadcast::new().produce();
1512 let broadcast3 = Broadcast::new().produce();
1513
1514 origin.publish_broadcast("foo/test", broadcast1.consume());
1516 origin.publish_broadcast("bar/test", broadcast2.consume());
1517 origin.publish_broadcast("baz/test", broadcast3.consume());
1518
1519 let mut foo_consumer = origin
1521 .consume()
1522 .scope(&["foo".into()])
1523 .expect("should create foo consumer");
1524
1525 let mut bar_consumer = origin
1526 .consume()
1527 .scope(&["bar".into()])
1528 .expect("should create bar consumer");
1529
1530 let mut foobar_consumer = origin
1531 .consume()
1532 .scope(&["foo".into(), "bar".into()])
1533 .expect("should create foobar consumer");
1534
1535 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1537 foo_consumer.assert_next_wait();
1538
1539 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1540 bar_consumer.assert_next_wait();
1541
1542 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1543 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1544 foobar_consumer.assert_next_wait();
1545 }
1546
1547 #[tokio::test]
1548 async fn test_select_with_empty_prefix() {
1549 let origin = Origin::random().produce();
1550 let broadcast1 = Broadcast::new().produce();
1551 let broadcast2 = Broadcast::new().produce();
1552
1553 let demo_producer = origin.with_root("demo").expect("should create demo root");
1555 let limited_producer = demo_producer
1556 .scope(&["worm-node".into(), "foobar".into()])
1557 .expect("should create limited producer");
1558
1559 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1561 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1562
1563 let mut consumer = limited_producer
1565 .consume()
1566 .scope(&["".into()])
1567 .expect("should create consumer with empty prefix");
1568
1569 let a1 = consumer.try_announced().expect("expected first announcement");
1571 let a2 = consumer.try_announced().expect("expected second announcement");
1572 consumer.assert_next_wait();
1573
1574 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1575 paths.sort();
1576 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1577 }
1578
1579 #[tokio::test]
1580 async fn test_select_narrowing_scope() {
1581 let origin = Origin::random().produce();
1582 let broadcast1 = Broadcast::new().produce();
1583 let broadcast2 = Broadcast::new().produce();
1584 let broadcast3 = Broadcast::new().produce();
1585
1586 let demo_producer = origin.with_root("demo").expect("should create demo root");
1588 let limited_producer = demo_producer
1589 .scope(&["worm-node".into(), "foobar".into()])
1590 .expect("should create limited producer");
1591
1592 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1594 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1595 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1596
1597 let mut worm_consumer = limited_producer
1599 .consume()
1600 .scope(&["worm-node".into()])
1601 .expect("should create worm-node consumer");
1602
1603 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1605 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1606 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1610 .consume()
1611 .scope(&["worm-node/foo".into()])
1612 .expect("should create worm-node/foo consumer");
1613
1614 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1615 foo_consumer.assert_next_wait(); }
1617
1618 #[tokio::test]
1619 async fn test_select_multiple_roots_with_empty_prefix() {
1620 let origin = Origin::random().produce();
1621 let broadcast1 = Broadcast::new().produce();
1622 let broadcast2 = Broadcast::new().produce();
1623 let broadcast3 = Broadcast::new().produce();
1624
1625 let limited_producer = origin
1627 .scope(&["app1".into(), "app2".into(), "shared".into()])
1628 .expect("should create limited producer");
1629
1630 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1632 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1633 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1634
1635 let mut consumer = limited_producer
1637 .consume()
1638 .scope(&["".into()])
1639 .expect("should create consumer with empty prefix");
1640
1641 consumer.assert_next("app1/data", &broadcast1.consume());
1643 consumer.assert_next("app2/config", &broadcast2.consume());
1644 consumer.assert_next("shared/resource", &broadcast3.consume());
1645 consumer.assert_next_wait();
1646 }
1647
1648 #[tokio::test]
1649 async fn test_publish_scope_with_empty_prefix() {
1650 let origin = Origin::random().produce();
1651 let broadcast = Broadcast::new().produce();
1652
1653 let limited_producer = origin
1655 .scope(&["services/api".into(), "services/web".into()])
1656 .expect("should create limited producer");
1657
1658 let same_producer = limited_producer
1660 .scope(&["".into()])
1661 .expect("should create producer with empty prefix");
1662
1663 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1665 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1666 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1667 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1668 }
1669
1670 #[tokio::test]
1671 async fn test_select_narrowing_to_deeper_path() {
1672 let origin = Origin::random().produce();
1673 let broadcast1 = Broadcast::new().produce();
1674 let broadcast2 = Broadcast::new().produce();
1675 let broadcast3 = Broadcast::new().produce();
1676
1677 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1679
1680 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1682 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1683 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1684
1685 let mut team2_consumer = limited_producer
1687 .consume()
1688 .scope(&["org/team2".into()])
1689 .expect("should create team2 consumer");
1690
1691 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1692 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1696 .consume()
1697 .scope(&["org/team1/project1".into()])
1698 .expect("should create project1 consumer");
1699
1700 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1702 project1_consumer.assert_next_wait();
1703 }
1704
1705 #[tokio::test]
1706 async fn test_select_with_non_matching_prefix() {
1707 let origin = Origin::random().produce();
1708
1709 let limited_producer = origin
1711 .scope(&["allowed/path".into()])
1712 .expect("should create limited producer");
1713
1714 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1716
1717 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1719 }
1720
1721 #[tokio::test]
1724 async fn test_with_root_trailing_slash_consumer() {
1725 let origin = Origin::random().produce();
1726
1727 let prefix = "some_prefix/".to_string();
1729 let mut consumer = origin.consume().with_root(prefix).unwrap();
1730
1731 let b = origin.create_broadcast("some_prefix/test").unwrap();
1732 consumer.assert_next("test", &b.consume());
1733 }
1734
1735 #[tokio::test]
1737 async fn test_with_root_trailing_slash_producer() {
1738 let origin = Origin::random().produce();
1739
1740 let prefix = "some_prefix/".to_string();
1742 let rooted = origin.with_root(prefix).unwrap();
1743
1744 let b = rooted.create_broadcast("test").unwrap();
1745
1746 let mut consumer = rooted.consume();
1747 consumer.assert_next("test", &b.consume());
1748 }
1749
1750 #[tokio::test]
1752 async fn test_with_root_trailing_slash_unannounce() {
1753 tokio::time::pause();
1754
1755 let origin = Origin::random().produce();
1756
1757 let prefix = "some_prefix/".to_string();
1758 let mut consumer = origin.consume().with_root(prefix).unwrap();
1759
1760 let b = origin.create_broadcast("some_prefix/test").unwrap();
1761 consumer.assert_next("test", &b.consume());
1762
1763 drop(b);
1765 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1766
1767 consumer.assert_next_none("test");
1769 }
1770
1771 #[tokio::test]
1772 async fn test_select_maintains_access_with_wider_prefix() {
1773 let origin = Origin::random().produce();
1774 let broadcast1 = Broadcast::new().produce();
1775 let broadcast2 = Broadcast::new().produce();
1776
1777 let demo_producer = origin.with_root("demo").expect("should create demo root");
1779 let user_producer = demo_producer
1780 .scope(&["worm-node".into(), "foobar".into()])
1781 .expect("should create user producer");
1782
1783 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1785 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1786
1787 let mut consumer = user_producer
1789 .consume()
1790 .scope(&["".into()])
1791 .expect("scope with empty prefix should not fail when user has specific permissions");
1792
1793 let a1 = consumer.try_announced().expect("expected first announcement");
1795 let a2 = consumer.try_announced().expect("expected second announcement");
1796 consumer.assert_next_wait();
1797
1798 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1799 paths.sort();
1800 assert_eq!(paths, ["foobar", "worm-node/data"]);
1801
1802 let mut narrow_consumer = user_producer
1804 .consume()
1805 .scope(&["worm-node".into()])
1806 .expect("should be able to narrow scope to worm-node");
1807
1808 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1809 narrow_consumer.assert_next_wait(); }
1811
1812 #[tokio::test]
1813 async fn test_duplicate_prefixes_deduped() {
1814 let origin = Origin::random().produce();
1815 let broadcast = Broadcast::new().produce();
1816
1817 let producer = origin
1819 .scope(&["demo".into(), "demo".into()])
1820 .expect("should create producer");
1821
1822 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1823
1824 let mut consumer = producer.consume();
1825 consumer.assert_next("demo/stream", &broadcast.consume());
1826 consumer.assert_next_wait();
1827 }
1828
1829 #[tokio::test]
1830 async fn test_overlapping_prefixes_deduped() {
1831 let origin = Origin::random().produce();
1832 let broadcast = Broadcast::new().produce();
1833
1834 let producer = origin
1836 .scope(&["demo".into(), "demo/foo".into()])
1837 .expect("should create producer");
1838
1839 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1841
1842 let mut consumer = producer.consume();
1843 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1844 consumer.assert_next_wait();
1845 }
1846
1847 #[tokio::test]
1848 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1849 let origin = Origin::random().produce();
1850 let broadcast = Broadcast::new().produce();
1851
1852 let producer = origin
1854 .scope(&["demo".into(), "demo/foo".into()])
1855 .expect("should create producer");
1856
1857 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1858
1859 let mut consumer = producer.consume();
1860 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1862 consumer.assert_next_wait();
1863 }
1864
1865 #[tokio::test]
1866 async fn test_allowed_returns_deduped_prefixes() {
1867 let origin = Origin::random().produce();
1868
1869 let producer = origin
1870 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1871 .expect("should create producer");
1872
1873 let allowed: Vec<_> = producer.allowed().collect();
1874 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1875 }
1876
1877 #[tokio::test]
1878 async fn test_announced_broadcast_already_announced() {
1879 let origin = Origin::random().produce();
1880 let broadcast = Broadcast::new().produce();
1881
1882 origin.publish_broadcast("test", broadcast.consume());
1883
1884 let consumer = origin.consume();
1885 let result = consumer.announced_broadcast("test").await.expect("should find it");
1886 assert!(result.is_clone(&broadcast.consume()));
1887 }
1888
1889 #[tokio::test]
1890 async fn test_announced_broadcast_delayed() {
1891 tokio::time::pause();
1892
1893 let origin = Origin::random().produce();
1894 let broadcast = Broadcast::new().produce();
1895
1896 let consumer = origin.consume();
1897
1898 let wait = tokio::spawn({
1900 let consumer = consumer.clone();
1901 async move { consumer.announced_broadcast("test").await }
1902 });
1903
1904 tokio::task::yield_now().await;
1906
1907 origin.publish_broadcast("test", broadcast.consume());
1908
1909 let result = wait.await.unwrap().expect("should find it");
1910 assert!(result.is_clone(&broadcast.consume()));
1911 }
1912
1913 #[tokio::test]
1914 async fn test_announced_broadcast_ignores_unrelated_paths() {
1915 tokio::time::pause();
1916
1917 let origin = Origin::random().produce();
1918 let other = Broadcast::new().produce();
1919 let target = Broadcast::new().produce();
1920
1921 let consumer = origin.consume();
1922
1923 let wait = tokio::spawn({
1924 let consumer = consumer.clone();
1925 async move { consumer.announced_broadcast("target").await }
1926 });
1927
1928 tokio::task::yield_now().await;
1929
1930 origin.publish_broadcast("other", other.consume());
1932 tokio::task::yield_now().await;
1933 assert!(!wait.is_finished(), "must not resolve on unrelated path");
1934
1935 origin.publish_broadcast("target", target.consume());
1936 let result = wait.await.unwrap().expect("should find target");
1937 assert!(result.is_clone(&target.consume()));
1938 }
1939
1940 #[tokio::test]
1941 async fn test_announced_broadcast_skips_nested_paths() {
1942 tokio::time::pause();
1943
1944 let origin = Origin::random().produce();
1945 let nested = Broadcast::new().produce();
1946 let exact = Broadcast::new().produce();
1947
1948 let consumer = origin.consume();
1949
1950 let wait = tokio::spawn({
1951 let consumer = consumer.clone();
1952 async move { consumer.announced_broadcast("foo").await }
1953 });
1954
1955 tokio::task::yield_now().await;
1956
1957 origin.publish_broadcast("foo/bar", nested.consume());
1959 tokio::task::yield_now().await;
1960 assert!(!wait.is_finished(), "must not resolve on a nested path");
1961
1962 origin.publish_broadcast("foo", exact.consume());
1963 let result = wait.await.unwrap().expect("should find foo exactly");
1964 assert!(result.is_clone(&exact.consume()));
1965 }
1966
1967 #[tokio::test]
1968 async fn test_announced_broadcast_disallowed() {
1969 let origin = Origin::random().produce();
1970 let limited = origin
1971 .consume()
1972 .scope(&["allowed".into()])
1973 .expect("should create limited");
1974
1975 assert!(limited.announced_broadcast("notallowed").await.is_none());
1977 }
1978
1979 #[tokio::test]
1980 async fn test_announced_broadcast_scope_too_narrow() {
1981 let origin = Origin::random().produce();
1984 let limited = origin
1985 .consume()
1986 .scope(&["foo/specific".into()])
1987 .expect("should create limited");
1988
1989 let result = limited
1991 .announced_broadcast("foo")
1992 .now_or_never()
1993 .expect("must not block");
1994 assert!(result.is_none());
1995 }
1996
1997 #[tokio::test]
2001 async fn test_coalesce_announce_then_unannounce() {
2002 tokio::time::pause();
2004
2005 let origin = Origin::random().produce();
2006 let mut consumer = origin.consume();
2007
2008 let broadcast = Broadcast::new().produce();
2009 origin.publish_broadcast("test", broadcast.consume());
2010 drop(broadcast);
2011
2012 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2013
2014 consumer.assert_next_wait();
2015 }
2016
2017 #[tokio::test]
2018 async fn test_coalesce_announce_unannounce_announce() {
2019 tokio::time::pause();
2022
2023 let origin = Origin::random().produce();
2024 let mut consumer = origin.consume();
2025
2026 let broadcast1 = Broadcast::new().produce();
2027 let broadcast2 = Broadcast::new().produce();
2028
2029 origin.publish_broadcast("test", broadcast1.consume());
2030 drop(broadcast1);
2031 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2032 origin.publish_broadcast("test", broadcast2.consume());
2033
2034 consumer.assert_next("test", &broadcast2.consume());
2035 consumer.assert_next_wait();
2036 }
2037
2038 #[tokio::test]
2039 async fn test_coalesce_unannounce_announce_preserved() {
2040 tokio::time::pause();
2043
2044 let origin = Origin::random().produce();
2045 let broadcast1 = Broadcast::new().produce();
2046 origin.publish_broadcast("test", broadcast1.consume());
2047
2048 let mut consumer = origin.consume();
2049 consumer.assert_next("test", &broadcast1.consume());
2050
2051 drop(broadcast1);
2053 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2054
2055 let broadcast2 = Broadcast::new().produce();
2056 origin.publish_broadcast("test", broadcast2.consume());
2057
2058 consumer.assert_next_none("test");
2060 consumer.assert_next("test", &broadcast2.consume());
2061 consumer.assert_next_wait();
2062 }
2063
2064 #[tokio::test]
2065 async fn test_coalesce_unannounce_announce_unannounce() {
2066 tokio::time::pause();
2069
2070 let origin = Origin::random().produce();
2071 let broadcast1 = Broadcast::new().produce();
2072 origin.publish_broadcast("test", broadcast1.consume());
2073
2074 let mut consumer = origin.consume();
2075 consumer.assert_next("test", &broadcast1.consume());
2076
2077 drop(broadcast1);
2078 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2079
2080 let broadcast2 = Broadcast::new().produce();
2081 origin.publish_broadcast("test", broadcast2.consume());
2082 drop(broadcast2);
2083 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2084
2085 consumer.assert_next_none("test");
2086 consumer.assert_next_wait();
2087 }
2088
2089 #[tokio::test]
2090 async fn test_coalesce_churn_bounded() {
2091 tokio::time::pause();
2096
2097 let origin = Origin::random().produce();
2098 let mut consumer = origin.consume();
2099
2100 for _ in 0..1000 {
2101 let broadcast = Broadcast::new().produce();
2102 origin.publish_broadcast("test", broadcast.consume());
2103 drop(broadcast);
2104 }
2105 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2106
2107 let mut collected = Vec::new();
2108 while let Some(update) = consumer.try_announced() {
2109 collected.push(update);
2110 }
2111 assert!(
2112 collected.len() <= 1,
2113 "expected at most one pending update, got {}",
2114 collected.len()
2115 );
2116 assert!(
2117 collected.iter().all(|(path, _)| path == &Path::new("test")),
2118 "unexpected path in pending updates",
2119 );
2120 }
2121}