1use std::{
2 collections::{HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5};
6
7use rand::Rng;
8use tokio::sync::mpsc;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn contains(&self, origin: &Origin) -> bool {
140 self.0.contains(origin)
141 }
142
143 pub fn len(&self) -> usize {
145 self.0.len()
146 }
147
148 pub fn is_empty(&self) -> bool {
150 self.0.is_empty()
151 }
152
153 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155 self.0.iter()
156 }
157
158 pub fn as_slice(&self) -> &[Origin] {
160 &self.0
161 }
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165 type Error = TooManyOrigins;
166
167 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168 if v.len() > MAX_HOPS {
169 return Err(TooManyOrigins);
170 }
171 Ok(Self(v))
172 }
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176 type Item = &'a Origin;
177 type IntoIter = std::slice::Iter<'a, Origin>;
178
179 fn into_iter(self) -> Self::IntoIter {
180 self.iter()
181 }
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186 u64: Encode<V>,
187 Origin: Encode<V>,
188{
189 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190 (self.0.len() as u64).encode(w, version)?;
191 for origin in &self.0 {
192 origin.encode(w, version)?;
193 }
194 Ok(())
195 }
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200 u64: Decode<V>,
201 Origin: Decode<V>,
202{
203 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204 let count = u64::decode(r, version)? as usize;
205 if count > MAX_HOPS {
206 return Err(DecodeError::BoundsExceeded);
207 }
208 let mut list = Vec::with_capacity(count);
209 for _ in 0..count {
210 list.push(Origin::decode(r, version)?);
211 }
212 Ok(Self(list))
213 }
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222 fn new() -> Self {
223 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224 }
225}
226
227struct OriginBroadcast {
229 path: PathOwned,
230 active: BroadcastConsumer,
231 backup: VecDeque<BroadcastConsumer>,
232}
233
234#[derive(Clone)]
235struct OriginConsumerNotify {
236 root: PathOwned,
237 tx: mpsc::UnboundedSender<OriginAnnounce>,
238}
239
240impl OriginConsumerNotify {
241 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
242 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
243 self.tx.send((path, Some(broadcast))).expect("consumer closed");
244 }
245
246 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
247 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
248 self.tx.send((path.clone(), None)).expect("consumer closed");
249 self.tx.send((path, Some(broadcast))).expect("consumer closed");
250 }
251
252 fn unannounce(&self, path: impl AsPath) {
253 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
254 self.tx.send((path, None)).expect("consumer closed");
255 }
256}
257
258struct NotifyNode {
259 parent: Option<Lock<NotifyNode>>,
260
261 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
264}
265
266impl NotifyNode {
267 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
268 Self {
269 parent,
270 consumers: HashMap::new(),
271 }
272 }
273
274 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
275 for consumer in self.consumers.values() {
276 consumer.announce(path.as_path(), broadcast.clone());
277 }
278
279 if let Some(parent) = &self.parent {
280 parent.lock().announce(path, broadcast);
281 }
282 }
283
284 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
285 for consumer in self.consumers.values() {
286 consumer.reannounce(path.as_path(), broadcast.clone());
287 }
288
289 if let Some(parent) = &self.parent {
290 parent.lock().reannounce(path, broadcast);
291 }
292 }
293
294 fn unannounce(&mut self, path: impl AsPath) {
295 for consumer in self.consumers.values() {
296 consumer.unannounce(path.as_path());
297 }
298
299 if let Some(parent) = &self.parent {
300 parent.lock().unannounce(path);
301 }
302 }
303}
304
305struct OriginNode {
306 broadcast: Option<OriginBroadcast>,
308
309 nested: HashMap<String, Lock<OriginNode>>,
311
312 notify: Lock<NotifyNode>,
314}
315
316impl OriginNode {
317 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
318 Self {
319 broadcast: None,
320 nested: HashMap::new(),
321 notify: Lock::new(NotifyNode::new(parent)),
322 }
323 }
324
325 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
326 let (dir, rest) = path.next_part().expect("leaf called with empty path");
327
328 let next = self.entry(dir);
329 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
330 }
331
332 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
333 match self.nested.get(dir) {
334 Some(next) => next.clone(),
335 None => {
336 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
337 self.nested.insert(dir.to_string(), next.clone());
338 next
339 }
340 }
341 }
342
343 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
344 let full = full.as_path();
345 let rest = relative.as_path();
346
347 if let Some((dir, relative)) = rest.next_part() {
349 self.entry(dir).lock().publish(&full, broadcast, &relative);
351 } else if let Some(existing) = &mut self.broadcast {
352 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
359 return;
360 }
361
362 if broadcast.hops.len() <= existing.active.hops.len() {
363 let old = existing.active.clone();
364 existing.active = broadcast.clone();
365 existing.backup.push_back(old);
366
367 self.notify.lock().reannounce(full, broadcast);
368 } else {
369 existing.backup.push_back(broadcast.clone());
371 }
372 } else {
373 self.broadcast = Some(OriginBroadcast {
375 path: full.to_owned(),
376 active: broadcast.clone(),
377 backup: VecDeque::new(),
378 });
379 self.notify.lock().announce(full, broadcast);
380 }
381 }
382
383 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
384 self.consume_initial(&mut notify);
385 self.notify.lock().consumers.insert(id, notify);
386 }
387
388 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
389 if let Some(broadcast) = &self.broadcast {
390 notify.announce(&broadcast.path, broadcast.active.clone());
391 }
392
393 for nested in self.nested.values() {
395 nested.lock().consume_initial(notify);
396 }
397 }
398
399 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
400 let rest = rest.as_path();
401
402 if let Some((dir, rest)) = rest.next_part() {
403 let node = self.nested.get(dir)?.lock();
404 node.consume_broadcast(&rest)
405 } else {
406 self.broadcast.as_ref().map(|b| b.active.clone())
407 }
408 }
409
410 fn unconsume(&mut self, id: ConsumerId) {
411 self.notify.lock().consumers.remove(&id).expect("consumer not found");
412 if self.is_empty() {
413 }
416 }
417
418 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
420 let full = full.as_path();
421 let relative = relative.as_path();
422
423 if let Some((dir, relative)) = relative.next_part() {
424 let nested = self.entry(dir);
425 let mut locked = nested.lock();
426 locked.remove(&full, broadcast, &relative);
427
428 if locked.is_empty() {
429 drop(locked);
430 self.nested.remove(dir);
431 }
432 } else {
433 let entry = match &mut self.broadcast {
434 Some(existing) => existing,
435 None => return,
436 };
437
438 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
440 if let Some(pos) = pos {
441 entry.backup.remove(pos);
442 return;
444 }
445
446 assert!(entry.active.is_clone(&broadcast));
448
449 let best = entry
452 .backup
453 .iter()
454 .enumerate()
455 .min_by_key(|(_, b)| b.hops.len())
456 .map(|(i, _)| i);
457 if let Some(idx) = best {
458 let active = entry.backup.remove(idx).expect("index in range");
459 entry.active = active;
460 self.notify.lock().reannounce(full, &entry.active);
461 } else {
462 self.broadcast = None;
464 self.notify.lock().unannounce(full);
465 }
466 }
467 }
468
469 fn is_empty(&self) -> bool {
470 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
471 }
472}
473
474#[derive(Clone)]
475struct OriginNodes {
476 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
477}
478
479impl OriginNodes {
480 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
483 let mut roots = Vec::new();
484
485 for (root, state) in &self.nodes {
486 for prefix in prefixes {
487 if root.has_prefix(prefix) {
488 roots.push((root.to_owned(), state.clone()));
490 continue;
491 }
492
493 if let Some(suffix) = prefix.strip_prefix(root) {
494 let nested = state.lock().leaf(&suffix);
496 roots.push((prefix.to_owned(), nested));
497 }
498 }
499 }
500
501 if roots.is_empty() {
502 None
503 } else {
504 Some(Self { nodes: roots })
505 }
506 }
507
508 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
509 let new_root = new_root.as_path();
510 let mut roots = Vec::new();
511
512 if new_root.is_empty() {
513 return Some(self.clone());
514 }
515
516 for (root, state) in &self.nodes {
517 if let Some(suffix) = root.strip_prefix(&new_root) {
518 roots.push((suffix.to_owned(), state.clone()));
520 } else if let Some(suffix) = new_root.strip_prefix(root) {
521 let nested = state.lock().leaf(&suffix);
524 roots.push(("".into(), nested));
525 }
526 }
527
528 if roots.is_empty() {
529 None
530 } else {
531 Some(Self { nodes: roots })
532 }
533 }
534
535 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
537 let path = path.as_path();
538
539 for (root, state) in &self.nodes {
540 if let Some(suffix) = path.strip_prefix(root) {
541 return Some((state.clone(), suffix.to_owned()));
542 }
543 }
544
545 None
546 }
547}
548
549impl Default for OriginNodes {
550 fn default() -> Self {
551 Self {
552 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
553 }
554 }
555}
556
557pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
559
560#[derive(Clone)]
562pub struct OriginProducer {
563 info: Origin,
567
568 nodes: OriginNodes,
571
572 root: PathOwned,
574}
575
576impl std::ops::Deref for OriginProducer {
577 type Target = Origin;
578
579 fn deref(&self) -> &Self::Target {
580 &self.info
581 }
582}
583
584impl OriginProducer {
585 pub fn new(info: Origin) -> Self {
588 Self {
589 info,
590 nodes: OriginNodes::default(),
591 root: PathOwned::default(),
592 }
593 }
594
595 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
600 let broadcast = Broadcast::new().produce();
601 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
602 }
603
604 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
614 let path = path.as_path();
615
616 if broadcast.hops.contains(&self.info) {
618 return false;
619 }
620
621 let (root, rest) = match self.nodes.get(&path) {
622 Some(root) => root,
623 None => return false,
624 };
625
626 let full = self.root.join(&path);
627
628 root.lock().publish(&full, &broadcast, &rest);
629 let root = root.clone();
630
631 web_async::spawn(async move {
632 broadcast.closed().await;
633 root.lock().remove(&full, broadcast, &rest);
634 });
635
636 true
637 }
638
639 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
645 let prefixes = PathPrefixes::new(prefixes);
646 Some(OriginProducer {
647 info: self.info,
648 nodes: self.nodes.select(&prefixes)?,
649 root: self.root.clone(),
650 })
651 }
652
653 pub fn consume(&self) -> OriginConsumer {
655 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
656 }
657
658 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
663 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
664 let path = path.as_path();
665 let (root, rest) = self.nodes.get(&path)?;
666 let state = root.lock();
667 state.consume_broadcast(&rest)
668 }
669
670 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
675 let prefix = prefix.as_path();
676
677 Some(Self {
678 info: self.info,
679 root: self.root.join(&prefix).to_owned(),
680 nodes: self.nodes.root(&prefix)?,
681 })
682 }
683
684 pub fn root(&self) -> &Path<'_> {
686 &self.root
687 }
688
689 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
692 self.nodes.nodes.iter().map(|(root, _)| root)
693 }
694
695 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
697 self.root.join(path)
698 }
699}
700
701pub struct OriginConsumer {
705 id: ConsumerId,
706 info: Origin,
708 nodes: OriginNodes,
709 updates: mpsc::UnboundedReceiver<OriginAnnounce>,
710
711 root: PathOwned,
713}
714
715impl std::ops::Deref for OriginConsumer {
716 type Target = Origin;
717
718 fn deref(&self) -> &Self::Target {
719 &self.info
720 }
721}
722
723impl OriginConsumer {
724 fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
725 let (tx, rx) = mpsc::unbounded_channel();
726
727 let id = ConsumerId::new();
728
729 for (_, state) in &nodes.nodes {
730 let notify = OriginConsumerNotify {
731 root: root.clone(),
732 tx: tx.clone(),
733 };
734 state.lock().consume(id, notify);
735 }
736
737 Self {
738 id,
739 info,
740 nodes,
741 updates: rx,
742 root,
743 }
744 }
745
746 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
754 self.updates.recv().await
755 }
756
757 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
762 self.updates.try_recv().ok()
763 }
764
765 pub fn consume(&self) -> Self {
767 self.clone()
768 }
769
770 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
778 let path = path.as_path();
779 let (root, rest) = self.nodes.get(&path)?;
780 let state = root.lock();
781 state.consume_broadcast(&rest)
782 }
783
784 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
793 let path = path.as_path();
794
795 let mut consumer = self.scope(std::slice::from_ref(&path))?;
797
798 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
802 return None;
803 }
804
805 loop {
806 let (announced, broadcast) = consumer.announced().await?;
807 if announced.as_path() == path {
809 if let Some(broadcast) = broadcast {
810 return Some(broadcast);
811 }
812 }
813 }
814 }
815
816 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
822 let prefixes = PathPrefixes::new(prefixes);
823 Some(OriginConsumer::new(
824 self.info,
825 self.root.clone(),
826 self.nodes.select(&prefixes)?,
827 ))
828 }
829
830 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
835 let prefix = prefix.as_path();
836
837 Some(Self::new(
838 self.info,
839 self.root.join(&prefix).to_owned(),
840 self.nodes.root(&prefix)?,
841 ))
842 }
843
844 pub fn root(&self) -> &Path<'_> {
846 &self.root
847 }
848
849 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
852 self.nodes.nodes.iter().map(|(root, _)| root)
853 }
854
855 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
857 self.root.join(path)
858 }
859}
860
861impl Drop for OriginConsumer {
862 fn drop(&mut self) {
863 for (_, root) in &self.nodes.nodes {
864 root.lock().unconsume(self.id);
865 }
866 }
867}
868
869impl Clone for OriginConsumer {
870 fn clone(&self) -> Self {
871 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
872 }
873}
874
875#[cfg(test)]
876use futures::FutureExt;
877
878#[cfg(test)]
879impl OriginConsumer {
880 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
881 let expected = expected.as_path();
882 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
883 assert_eq!(path, expected, "wrong path");
884 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
885 }
886
887 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
888 let expected = expected.as_path();
889 let (path, active) = self.try_announced().expect("no next");
890 assert_eq!(path, expected, "wrong path");
891 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
892 }
893
894 pub fn assert_next_none(&mut self, expected: impl AsPath) {
895 let expected = expected.as_path();
896 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
897 assert_eq!(path, expected, "wrong path");
898 assert!(active.is_none(), "should be unannounced");
899 }
900
901 pub fn assert_next_wait(&mut self) {
902 if let Some(res) = self.announced().now_or_never() {
903 panic!("next should block: got {:?}", res.map(|(path, _)| path));
904 }
905 }
906
907 }
916
917#[cfg(test)]
918mod tests {
919 use crate::Broadcast;
920
921 use super::*;
922
923 #[test]
924 fn origin_list_push_fails_at_limit() {
925 let mut list = OriginList::new();
926 for _ in 0..MAX_HOPS {
927 list.push(Origin::random()).unwrap();
928 }
929 assert_eq!(list.len(), MAX_HOPS);
930 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
931 }
932
933 #[test]
934 fn origin_list_try_from_vec_enforces_limit() {
935 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
936 assert!(OriginList::try_from(under).is_ok());
937
938 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
939 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
940 }
941
942 #[tokio::test]
943 async fn test_announce() {
944 tokio::time::pause();
945
946 let origin = Origin::random().produce();
947 let broadcast1 = Broadcast::new().produce();
948 let broadcast2 = Broadcast::new().produce();
949
950 let mut consumer1 = origin.consume();
951 consumer1.assert_next_wait();
953
954 origin.publish_broadcast("test1", broadcast1.consume());
956
957 consumer1.assert_next("test1", &broadcast1.consume());
958 consumer1.assert_next_wait();
959
960 let mut consumer2 = origin.consume();
963
964 origin.publish_broadcast("test2", broadcast2.consume());
966
967 consumer1.assert_next("test2", &broadcast2.consume());
968 consumer1.assert_next_wait();
969
970 consumer2.assert_next("test1", &broadcast1.consume());
971 consumer2.assert_next("test2", &broadcast2.consume());
972 consumer2.assert_next_wait();
973
974 drop(broadcast1);
976
977 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
979
980 consumer1.assert_next_none("test1");
982 consumer2.assert_next_none("test1");
983 consumer1.assert_next_wait();
984 consumer2.assert_next_wait();
985
986 let mut consumer3 = origin.consume();
988 consumer3.assert_next("test2", &broadcast2.consume());
989 consumer3.assert_next_wait();
990
991 drop(broadcast2);
993
994 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
996
997 consumer1.assert_next_none("test2");
998 consumer2.assert_next_none("test2");
999 consumer3.assert_next_none("test2");
1000
1001 }
1007
1008 #[tokio::test]
1009 async fn test_duplicate() {
1010 tokio::time::pause();
1011
1012 let origin = Origin::random().produce();
1013
1014 let broadcast1 = Broadcast::new().produce();
1015 let broadcast2 = Broadcast::new().produce();
1016 let broadcast3 = Broadcast::new().produce();
1017
1018 let consumer1 = broadcast1.consume();
1019 let consumer2 = broadcast2.consume();
1020 let consumer3 = broadcast3.consume();
1021
1022 let mut consumer = origin.consume();
1023
1024 origin.publish_broadcast("test", consumer1.clone());
1025 origin.publish_broadcast("test", consumer2.clone());
1026 origin.publish_broadcast("test", consumer3.clone());
1027 assert!(consumer.get_broadcast("test").is_some());
1028
1029 consumer.assert_next("test", &consumer1);
1031 consumer.assert_next_none("test");
1032 consumer.assert_next("test", &consumer2);
1033 consumer.assert_next_none("test");
1034 consumer.assert_next("test", &consumer3);
1035
1036 drop(broadcast2);
1038
1039 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1041
1042 assert!(consumer.get_broadcast("test").is_some());
1043 consumer.assert_next_wait();
1044
1045 drop(broadcast3);
1047
1048 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1050
1051 assert!(consumer.get_broadcast("test").is_some());
1052 consumer.assert_next_none("test");
1053 consumer.assert_next("test", &consumer1);
1054
1055 drop(broadcast1);
1057
1058 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1060 assert!(consumer.get_broadcast("test").is_none());
1061
1062 consumer.assert_next_none("test");
1063 consumer.assert_next_wait();
1064 }
1065
1066 #[tokio::test]
1067 async fn test_duplicate_reverse() {
1068 tokio::time::pause();
1069
1070 let origin = Origin::random().produce();
1071 let broadcast1 = Broadcast::new().produce();
1072 let broadcast2 = Broadcast::new().produce();
1073
1074 origin.publish_broadcast("test", broadcast1.consume());
1075 origin.publish_broadcast("test", broadcast2.consume());
1076 assert!(origin.consume().get_broadcast("test").is_some());
1077
1078 drop(broadcast2);
1080
1081 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1083 assert!(origin.consume().get_broadcast("test").is_some());
1084
1085 drop(broadcast1);
1086
1087 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1089 assert!(origin.consume().get_broadcast("test").is_none());
1090 }
1091
1092 #[tokio::test]
1093 async fn test_double_publish() {
1094 tokio::time::pause();
1095
1096 let origin = Origin::random().produce();
1097 let broadcast = Broadcast::new().produce();
1098
1099 origin.publish_broadcast("test", broadcast.consume());
1101 origin.publish_broadcast("test", broadcast.consume());
1102
1103 assert!(origin.consume().get_broadcast("test").is_some());
1104
1105 drop(broadcast);
1106
1107 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1109 assert!(origin.consume().get_broadcast("test").is_none());
1110 }
1111 #[tokio::test]
1113 #[should_panic]
1114 async fn test_128() {
1115 let origin = Origin::random().produce();
1116 let broadcast = Broadcast::new().produce();
1117
1118 let mut consumer = origin.consume();
1119 for i in 0..256 {
1120 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1121 }
1122
1123 for i in 0..256 {
1124 consumer.assert_next(format!("test{i}"), &broadcast.consume());
1125 }
1126 }
1127
1128 #[tokio::test]
1129 async fn test_128_fix() {
1130 let origin = Origin::random().produce();
1131 let broadcast = Broadcast::new().produce();
1132
1133 let mut consumer = origin.consume();
1134 for i in 0..256 {
1135 origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1136 }
1137
1138 for i in 0..256 {
1139 consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
1141 }
1142 }
1143
1144 #[tokio::test]
1145 async fn test_with_root_basic() {
1146 let origin = Origin::random().produce();
1147 let broadcast = Broadcast::new().produce();
1148
1149 let foo_producer = origin.with_root("foo").expect("should create root");
1151 assert_eq!(foo_producer.root().as_str(), "foo");
1152
1153 let mut consumer = origin.consume();
1154
1155 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1157 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1159
1160 let mut foo_consumer = foo_producer.consume();
1162 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1163 }
1164
1165 #[tokio::test]
1166 async fn test_with_root_nested() {
1167 let origin = Origin::random().produce();
1168 let broadcast = Broadcast::new().produce();
1169
1170 let foo_producer = origin.with_root("foo").expect("should create foo root");
1172 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1173 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1174
1175 let mut consumer = origin.consume();
1176
1177 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1179 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1181
1182 let mut foo_bar_consumer = foo_bar_producer.consume();
1184 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1185 }
1186
1187 #[tokio::test]
1188 async fn test_publish_scope_allows() {
1189 let origin = Origin::random().produce();
1190 let broadcast = Broadcast::new().produce();
1191
1192 let limited_producer = origin
1194 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1195 .expect("should create limited producer");
1196
1197 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1199 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1200 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1201
1202 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1204 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1206 }
1207
1208 #[tokio::test]
1209 async fn test_publish_scope_empty() {
1210 let origin = Origin::random().produce();
1211
1212 assert!(origin.scope(&[]).is_none());
1214 }
1215
1216 #[tokio::test]
1217 async fn test_consume_scope_filters() {
1218 let origin = Origin::random().produce();
1219 let broadcast1 = Broadcast::new().produce();
1220 let broadcast2 = Broadcast::new().produce();
1221 let broadcast3 = Broadcast::new().produce();
1222
1223 let mut consumer = origin.consume();
1224
1225 origin.publish_broadcast("allowed", broadcast1.consume());
1227 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1228 origin.publish_broadcast("notallowed", broadcast3.consume());
1229
1230 let mut limited_consumer = origin
1232 .consume()
1233 .scope(&["allowed".into()])
1234 .expect("should create limited consumer");
1235
1236 limited_consumer.assert_next("allowed", &broadcast1.consume());
1238 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1239 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1243 consumer.assert_next("allowed/nested", &broadcast2.consume());
1244 consumer.assert_next("notallowed", &broadcast3.consume());
1245 }
1246
1247 #[tokio::test]
1248 async fn test_consume_scope_multiple_prefixes() {
1249 let origin = Origin::random().produce();
1250 let broadcast1 = Broadcast::new().produce();
1251 let broadcast2 = Broadcast::new().produce();
1252 let broadcast3 = Broadcast::new().produce();
1253
1254 origin.publish_broadcast("foo/test", broadcast1.consume());
1255 origin.publish_broadcast("bar/test", broadcast2.consume());
1256 origin.publish_broadcast("baz/test", broadcast3.consume());
1257
1258 let mut limited_consumer = origin
1260 .consume()
1261 .scope(&["foo".into(), "bar".into()])
1262 .expect("should create limited consumer");
1263
1264 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1266 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1267 limited_consumer.assert_next_wait(); }
1269
1270 #[tokio::test]
1271 async fn test_with_root_and_publish_scope() {
1272 let origin = Origin::random().produce();
1273 let broadcast = Broadcast::new().produce();
1274
1275 let foo_producer = origin.with_root("foo").expect("should create foo root");
1277
1278 let limited_producer = foo_producer
1280 .scope(&["bar".into(), "goop/pee".into()])
1281 .expect("should create limited producer");
1282
1283 let mut consumer = origin.consume();
1284
1285 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1287 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1288 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1289 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1290
1291 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1293 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1295
1296 consumer.assert_next("foo/bar", &broadcast.consume());
1298 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1299 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1300 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1301 }
1302
1303 #[tokio::test]
1304 async fn test_with_root_and_consume_scope() {
1305 let origin = Origin::random().produce();
1306 let broadcast1 = Broadcast::new().produce();
1307 let broadcast2 = Broadcast::new().produce();
1308 let broadcast3 = Broadcast::new().produce();
1309
1310 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1312 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1313 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1314
1315 let foo_producer = origin.with_root("foo").expect("should create foo root");
1317
1318 let mut limited_consumer = foo_producer
1320 .consume()
1321 .scope(&["bar".into(), "goop/pee".into()])
1322 .expect("should create limited consumer");
1323
1324 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1326 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1327 limited_consumer.assert_next_wait(); }
1329
1330 #[tokio::test]
1331 async fn test_with_root_unauthorized() {
1332 let origin = Origin::random().produce();
1333
1334 let limited_producer = origin
1336 .scope(&["allowed".into()])
1337 .expect("should create limited producer");
1338
1339 assert!(limited_producer.with_root("notallowed").is_none());
1341
1342 let allowed_root = limited_producer
1344 .with_root("allowed")
1345 .expect("should create allowed root");
1346 assert_eq!(allowed_root.root().as_str(), "allowed");
1347 }
1348
1349 #[tokio::test]
1350 async fn test_wildcard_permission() {
1351 let origin = Origin::random().produce();
1352 let broadcast = Broadcast::new().produce();
1353
1354 let root_producer = origin.clone();
1356
1357 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1359 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1360
1361 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1363 assert_eq!(foo_producer.root().as_str(), "foo");
1364 }
1365
1366 #[tokio::test]
1367 async fn test_consume_broadcast_with_permissions() {
1368 let origin = Origin::random().produce();
1369 let broadcast1 = Broadcast::new().produce();
1370 let broadcast2 = Broadcast::new().produce();
1371
1372 origin.publish_broadcast("allowed/test", broadcast1.consume());
1373 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1374
1375 let limited_consumer = origin
1377 .consume()
1378 .scope(&["allowed".into()])
1379 .expect("should create limited consumer");
1380
1381 let result = limited_consumer.get_broadcast("allowed/test");
1383 assert!(result.is_some());
1384 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1385
1386 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1388
1389 let consumer = origin.consume();
1391 assert!(consumer.get_broadcast("allowed/test").is_some());
1392 assert!(consumer.get_broadcast("notallowed/test").is_some());
1393 }
1394
1395 #[tokio::test]
1396 async fn test_nested_paths_with_permissions() {
1397 let origin = Origin::random().produce();
1398 let broadcast = Broadcast::new().produce();
1399
1400 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1402
1403 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1405 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1406 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1407
1408 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1410 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1411 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1412 }
1413
1414 #[tokio::test]
1415 async fn test_multiple_consumers_with_different_permissions() {
1416 let origin = Origin::random().produce();
1417 let broadcast1 = Broadcast::new().produce();
1418 let broadcast2 = Broadcast::new().produce();
1419 let broadcast3 = Broadcast::new().produce();
1420
1421 origin.publish_broadcast("foo/test", broadcast1.consume());
1423 origin.publish_broadcast("bar/test", broadcast2.consume());
1424 origin.publish_broadcast("baz/test", broadcast3.consume());
1425
1426 let mut foo_consumer = origin
1428 .consume()
1429 .scope(&["foo".into()])
1430 .expect("should create foo consumer");
1431
1432 let mut bar_consumer = origin
1433 .consume()
1434 .scope(&["bar".into()])
1435 .expect("should create bar consumer");
1436
1437 let mut foobar_consumer = origin
1438 .consume()
1439 .scope(&["foo".into(), "bar".into()])
1440 .expect("should create foobar consumer");
1441
1442 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1444 foo_consumer.assert_next_wait();
1445
1446 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1447 bar_consumer.assert_next_wait();
1448
1449 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1450 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1451 foobar_consumer.assert_next_wait();
1452 }
1453
1454 #[tokio::test]
1455 async fn test_select_with_empty_prefix() {
1456 let origin = Origin::random().produce();
1457 let broadcast1 = Broadcast::new().produce();
1458 let broadcast2 = Broadcast::new().produce();
1459
1460 let demo_producer = origin.with_root("demo").expect("should create demo root");
1462 let limited_producer = demo_producer
1463 .scope(&["worm-node".into(), "foobar".into()])
1464 .expect("should create limited producer");
1465
1466 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1468 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1469
1470 let mut consumer = limited_producer
1472 .consume()
1473 .scope(&["".into()])
1474 .expect("should create consumer with empty prefix");
1475
1476 let a1 = consumer.try_announced().expect("expected first announcement");
1478 let a2 = consumer.try_announced().expect("expected second announcement");
1479 consumer.assert_next_wait();
1480
1481 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1482 paths.sort();
1483 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1484 }
1485
1486 #[tokio::test]
1487 async fn test_select_narrowing_scope() {
1488 let origin = Origin::random().produce();
1489 let broadcast1 = Broadcast::new().produce();
1490 let broadcast2 = Broadcast::new().produce();
1491 let broadcast3 = Broadcast::new().produce();
1492
1493 let demo_producer = origin.with_root("demo").expect("should create demo root");
1495 let limited_producer = demo_producer
1496 .scope(&["worm-node".into(), "foobar".into()])
1497 .expect("should create limited producer");
1498
1499 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1501 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1502 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1503
1504 let mut worm_consumer = limited_producer
1506 .consume()
1507 .scope(&["worm-node".into()])
1508 .expect("should create worm-node consumer");
1509
1510 worm_consumer.assert_next("worm-node", &broadcast1.consume());
1512 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1513 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
1517 .consume()
1518 .scope(&["worm-node/foo".into()])
1519 .expect("should create worm-node/foo consumer");
1520
1521 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1522 foo_consumer.assert_next_wait(); }
1524
1525 #[tokio::test]
1526 async fn test_select_multiple_roots_with_empty_prefix() {
1527 let origin = Origin::random().produce();
1528 let broadcast1 = Broadcast::new().produce();
1529 let broadcast2 = Broadcast::new().produce();
1530 let broadcast3 = Broadcast::new().produce();
1531
1532 let limited_producer = origin
1534 .scope(&["app1".into(), "app2".into(), "shared".into()])
1535 .expect("should create limited producer");
1536
1537 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1539 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1540 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1541
1542 let mut consumer = limited_producer
1544 .consume()
1545 .scope(&["".into()])
1546 .expect("should create consumer with empty prefix");
1547
1548 consumer.assert_next("app1/data", &broadcast1.consume());
1550 consumer.assert_next("app2/config", &broadcast2.consume());
1551 consumer.assert_next("shared/resource", &broadcast3.consume());
1552 consumer.assert_next_wait();
1553 }
1554
1555 #[tokio::test]
1556 async fn test_publish_scope_with_empty_prefix() {
1557 let origin = Origin::random().produce();
1558 let broadcast = Broadcast::new().produce();
1559
1560 let limited_producer = origin
1562 .scope(&["services/api".into(), "services/web".into()])
1563 .expect("should create limited producer");
1564
1565 let same_producer = limited_producer
1567 .scope(&["".into()])
1568 .expect("should create producer with empty prefix");
1569
1570 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1572 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1573 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1574 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1575 }
1576
1577 #[tokio::test]
1578 async fn test_select_narrowing_to_deeper_path() {
1579 let origin = Origin::random().produce();
1580 let broadcast1 = Broadcast::new().produce();
1581 let broadcast2 = Broadcast::new().produce();
1582 let broadcast3 = Broadcast::new().produce();
1583
1584 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1586
1587 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1589 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1590 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1591
1592 let mut team2_consumer = limited_producer
1594 .consume()
1595 .scope(&["org/team2".into()])
1596 .expect("should create team2 consumer");
1597
1598 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1599 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
1603 .consume()
1604 .scope(&["org/team1/project1".into()])
1605 .expect("should create project1 consumer");
1606
1607 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1609 project1_consumer.assert_next_wait();
1610 }
1611
1612 #[tokio::test]
1613 async fn test_select_with_non_matching_prefix() {
1614 let origin = Origin::random().produce();
1615
1616 let limited_producer = origin
1618 .scope(&["allowed/path".into()])
1619 .expect("should create limited producer");
1620
1621 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1623
1624 assert!(limited_producer.scope(&["other/path".into()]).is_none());
1626 }
1627
1628 #[tokio::test]
1631 async fn test_with_root_trailing_slash_consumer() {
1632 let origin = Origin::random().produce();
1633
1634 let prefix = "some_prefix/".to_string();
1636 let mut consumer = origin.consume().with_root(prefix).unwrap();
1637
1638 let b = origin.create_broadcast("some_prefix/test").unwrap();
1639 consumer.assert_next("test", &b.consume());
1640 }
1641
1642 #[tokio::test]
1644 async fn test_with_root_trailing_slash_producer() {
1645 let origin = Origin::random().produce();
1646
1647 let prefix = "some_prefix/".to_string();
1649 let rooted = origin.with_root(prefix).unwrap();
1650
1651 let b = rooted.create_broadcast("test").unwrap();
1652
1653 let mut consumer = rooted.consume();
1654 consumer.assert_next("test", &b.consume());
1655 }
1656
1657 #[tokio::test]
1659 async fn test_with_root_trailing_slash_unannounce() {
1660 tokio::time::pause();
1661
1662 let origin = Origin::random().produce();
1663
1664 let prefix = "some_prefix/".to_string();
1665 let mut consumer = origin.consume().with_root(prefix).unwrap();
1666
1667 let b = origin.create_broadcast("some_prefix/test").unwrap();
1668 consumer.assert_next("test", &b.consume());
1669
1670 drop(b);
1672 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1673
1674 consumer.assert_next_none("test");
1676 }
1677
1678 #[tokio::test]
1679 async fn test_select_maintains_access_with_wider_prefix() {
1680 let origin = Origin::random().produce();
1681 let broadcast1 = Broadcast::new().produce();
1682 let broadcast2 = Broadcast::new().produce();
1683
1684 let demo_producer = origin.with_root("demo").expect("should create demo root");
1686 let user_producer = demo_producer
1687 .scope(&["worm-node".into(), "foobar".into()])
1688 .expect("should create user producer");
1689
1690 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1692 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1693
1694 let mut consumer = user_producer
1696 .consume()
1697 .scope(&["".into()])
1698 .expect("scope with empty prefix should not fail when user has specific permissions");
1699
1700 let a1 = consumer.try_announced().expect("expected first announcement");
1702 let a2 = consumer.try_announced().expect("expected second announcement");
1703 consumer.assert_next_wait();
1704
1705 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1706 paths.sort();
1707 assert_eq!(paths, ["foobar", "worm-node/data"]);
1708
1709 let mut narrow_consumer = user_producer
1711 .consume()
1712 .scope(&["worm-node".into()])
1713 .expect("should be able to narrow scope to worm-node");
1714
1715 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1716 narrow_consumer.assert_next_wait(); }
1718
1719 #[tokio::test]
1720 async fn test_duplicate_prefixes_deduped() {
1721 let origin = Origin::random().produce();
1722 let broadcast = Broadcast::new().produce();
1723
1724 let producer = origin
1726 .scope(&["demo".into(), "demo".into()])
1727 .expect("should create producer");
1728
1729 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1730
1731 let mut consumer = producer.consume();
1732 consumer.assert_next("demo/stream", &broadcast.consume());
1733 consumer.assert_next_wait();
1734 }
1735
1736 #[tokio::test]
1737 async fn test_overlapping_prefixes_deduped() {
1738 let origin = Origin::random().produce();
1739 let broadcast = Broadcast::new().produce();
1740
1741 let producer = origin
1743 .scope(&["demo".into(), "demo/foo".into()])
1744 .expect("should create producer");
1745
1746 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1748
1749 let mut consumer = producer.consume();
1750 consumer.assert_next("demo/bar/stream", &broadcast.consume());
1751 consumer.assert_next_wait();
1752 }
1753
1754 #[tokio::test]
1755 async fn test_overlapping_prefixes_no_duplicate_announcements() {
1756 let origin = Origin::random().produce();
1757 let broadcast = Broadcast::new().produce();
1758
1759 let producer = origin
1761 .scope(&["demo".into(), "demo/foo".into()])
1762 .expect("should create producer");
1763
1764 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1765
1766 let mut consumer = producer.consume();
1767 consumer.assert_next("demo/foo/stream", &broadcast.consume());
1769 consumer.assert_next_wait();
1770 }
1771
1772 #[tokio::test]
1773 async fn test_allowed_returns_deduped_prefixes() {
1774 let origin = Origin::random().produce();
1775
1776 let producer = origin
1777 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1778 .expect("should create producer");
1779
1780 let allowed: Vec<_> = producer.allowed().collect();
1781 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1782 }
1783
1784 #[tokio::test]
1785 async fn test_announced_broadcast_already_announced() {
1786 let origin = Origin::random().produce();
1787 let broadcast = Broadcast::new().produce();
1788
1789 origin.publish_broadcast("test", broadcast.consume());
1790
1791 let consumer = origin.consume();
1792 let result = consumer.announced_broadcast("test").await.expect("should find it");
1793 assert!(result.is_clone(&broadcast.consume()));
1794 }
1795
1796 #[tokio::test]
1797 async fn test_announced_broadcast_delayed() {
1798 tokio::time::pause();
1799
1800 let origin = Origin::random().produce();
1801 let broadcast = Broadcast::new().produce();
1802
1803 let consumer = origin.consume();
1804
1805 let wait = tokio::spawn({
1807 let consumer = consumer.clone();
1808 async move { consumer.announced_broadcast("test").await }
1809 });
1810
1811 tokio::task::yield_now().await;
1813
1814 origin.publish_broadcast("test", broadcast.consume());
1815
1816 let result = wait.await.unwrap().expect("should find it");
1817 assert!(result.is_clone(&broadcast.consume()));
1818 }
1819
1820 #[tokio::test]
1821 async fn test_announced_broadcast_ignores_unrelated_paths() {
1822 tokio::time::pause();
1823
1824 let origin = Origin::random().produce();
1825 let other = Broadcast::new().produce();
1826 let target = Broadcast::new().produce();
1827
1828 let consumer = origin.consume();
1829
1830 let wait = tokio::spawn({
1831 let consumer = consumer.clone();
1832 async move { consumer.announced_broadcast("target").await }
1833 });
1834
1835 tokio::task::yield_now().await;
1836
1837 origin.publish_broadcast("other", other.consume());
1839 tokio::task::yield_now().await;
1840 assert!(!wait.is_finished(), "must not resolve on unrelated path");
1841
1842 origin.publish_broadcast("target", target.consume());
1843 let result = wait.await.unwrap().expect("should find target");
1844 assert!(result.is_clone(&target.consume()));
1845 }
1846
1847 #[tokio::test]
1848 async fn test_announced_broadcast_skips_nested_paths() {
1849 tokio::time::pause();
1850
1851 let origin = Origin::random().produce();
1852 let nested = Broadcast::new().produce();
1853 let exact = Broadcast::new().produce();
1854
1855 let consumer = origin.consume();
1856
1857 let wait = tokio::spawn({
1858 let consumer = consumer.clone();
1859 async move { consumer.announced_broadcast("foo").await }
1860 });
1861
1862 tokio::task::yield_now().await;
1863
1864 origin.publish_broadcast("foo/bar", nested.consume());
1866 tokio::task::yield_now().await;
1867 assert!(!wait.is_finished(), "must not resolve on a nested path");
1868
1869 origin.publish_broadcast("foo", exact.consume());
1870 let result = wait.await.unwrap().expect("should find foo exactly");
1871 assert!(result.is_clone(&exact.consume()));
1872 }
1873
1874 #[tokio::test]
1875 async fn test_announced_broadcast_disallowed() {
1876 let origin = Origin::random().produce();
1877 let limited = origin
1878 .consume()
1879 .scope(&["allowed".into()])
1880 .expect("should create limited");
1881
1882 assert!(limited.announced_broadcast("notallowed").await.is_none());
1884 }
1885
1886 #[tokio::test]
1887 async fn test_announced_broadcast_scope_too_narrow() {
1888 let origin = Origin::random().produce();
1891 let limited = origin
1892 .consume()
1893 .scope(&["foo/specific".into()])
1894 .expect("should create limited");
1895
1896 let result = limited
1898 .announced_broadcast("foo")
1899 .now_or_never()
1900 .expect("must not block");
1901 assert!(result.is_none());
1902 }
1903}