1use std::{
2 collections::{BTreeMap, HashMap, VecDeque},
3 fmt,
4 sync::atomic::{AtomicU64, Ordering},
5 task::{Poll, ready},
6};
7
8use rand::RngExt;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13 AsPath, Broadcast, BroadcastProducer, Error, Path, PathOwned, PathPrefixes,
14 coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26 pub id: u64,
28}
29
30impl Origin {
31 pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35 pub fn random() -> Self {
44 let mut rng = rand::rng();
45 let id = rng.random_range(1..(1u64 << 53));
46 Self { id }
47 }
48
49 pub fn produce(self) -> OriginProducer {
51 OriginProducer::new(self)
52 }
53}
54
55impl From<u64> for Origin {
56 fn from(id: u64) -> Self {
57 Self { id }
58 }
59}
60
61impl fmt::Display for Origin {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 self.id.fmt(f)
64 }
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69 u64: Encode<V>,
70{
71 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72 self.id.encode(w, version)
73 }
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78 u64: Decode<V>,
79{
80 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81 let id = u64::decode(r, version)?;
82 if id >= 1u64 << 62 {
83 return Err(DecodeError::InvalidValue);
84 }
85 Ok(Self { id })
86 }
87}
88
89pub(crate) const MAX_HOPS: usize = 32;
95
96#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "too many origins (max {MAX_HOPS})")
112 }
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118 fn from(_: TooManyOrigins) -> Self {
119 DecodeError::BoundsExceeded
120 }
121}
122
123impl OriginList {
124 pub fn new() -> Self {
126 Self(Vec::new())
127 }
128
129 pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131 if self.0.len() >= MAX_HOPS {
132 return Err(TooManyOrigins);
133 }
134 self.0.push(origin);
135 Ok(())
136 }
137
138 pub fn replace_first(&mut self, target: Origin, replacement: Origin) -> bool {
141 for entry in &mut self.0 {
142 if *entry == target {
143 *entry = replacement;
144 return true;
145 }
146 }
147 false
148 }
149
150 pub fn contains(&self, origin: &Origin) -> bool {
152 self.0.contains(origin)
153 }
154
155 pub fn len(&self) -> usize {
157 self.0.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.0.is_empty()
163 }
164
165 pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
167 self.0.iter()
168 }
169
170 pub fn as_slice(&self) -> &[Origin] {
172 &self.0
173 }
174}
175
176impl TryFrom<Vec<Origin>> for OriginList {
177 type Error = TooManyOrigins;
178
179 fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
180 if v.len() > MAX_HOPS {
181 return Err(TooManyOrigins);
182 }
183 Ok(Self(v))
184 }
185}
186
187impl<'a> IntoIterator for &'a OriginList {
188 type Item = &'a Origin;
189 type IntoIter = std::slice::Iter<'a, Origin>;
190
191 fn into_iter(self) -> Self::IntoIter {
192 self.iter()
193 }
194}
195
196impl<V: Copy> Encode<V> for OriginList
197where
198 u64: Encode<V>,
199 Origin: Encode<V>,
200{
201 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
202 (self.0.len() as u64).encode(w, version)?;
203 for origin in &self.0 {
204 origin.encode(w, version)?;
205 }
206 Ok(())
207 }
208}
209
210impl<V: Copy> Decode<V> for OriginList
211where
212 u64: Decode<V>,
213 Origin: Decode<V>,
214{
215 fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
216 let count = u64::decode(r, version)? as usize;
217 if count > MAX_HOPS {
218 return Err(DecodeError::BoundsExceeded);
219 }
220 let mut list = Vec::with_capacity(count);
221 for _ in 0..count {
222 list.push(Origin::decode(r, version)?);
223 }
224 Ok(Self(list))
225 }
226}
227
228static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231struct ConsumerId(u64);
232
233impl ConsumerId {
234 fn new() -> Self {
235 Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
236 }
237}
238
239struct OriginBroadcast {
241 path: PathOwned,
242 active: BroadcastConsumer,
243 backup: VecDeque<BroadcastConsumer>,
244}
245
246fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
254 const SEED: u64 = 0x420C0DECB00B; const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
261
262 let mut hash = SEED;
263 for &byte in name.as_str().as_bytes() {
264 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
265 }
266 for hop in hops {
267 for &byte in &hop.id.to_le_bytes() {
268 hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
269 }
270 }
271
272 (hops.len(), hash)
273}
274
275enum PendingUpdate {
284 Announce(BroadcastConsumer),
285 Unannounce,
286 UnannounceAnnounce(BroadcastConsumer),
287}
288
289#[derive(Default)]
294struct OriginConsumerState {
295 pending: BTreeMap<PathOwned, PendingUpdate>,
296}
297
298impl OriginConsumerState {
299 fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
300 let new = match self.pending.remove(&path) {
301 None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
303 Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
305 PendingUpdate::UnannounceAnnounce(broadcast)
306 }
307 };
308 self.pending.insert(path, new);
309 }
310
311 fn apply_unannounce(&mut self, path: PathOwned) {
312 match self.pending.remove(&path) {
313 Some(PendingUpdate::Announce(_)) => {}
315 None | Some(PendingUpdate::Unannounce) => {
316 self.pending.insert(path, PendingUpdate::Unannounce);
317 }
318 Some(PendingUpdate::UnannounceAnnounce(_)) => {
321 self.pending.insert(path, PendingUpdate::Unannounce);
322 }
323 }
324 }
325
326 fn take(&mut self) -> Option<OriginAnnounce> {
328 let path = self.pending.keys().next()?.clone();
329 match self.pending.remove(&path).unwrap() {
330 PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
331 PendingUpdate::Unannounce => Some((path, None)),
332 PendingUpdate::UnannounceAnnounce(broadcast) => {
333 self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
336 Some((path, None))
337 }
338 }
339 }
340}
341
342#[derive(Clone)]
343struct OriginConsumerNotify {
344 root: PathOwned,
345 state: kio::Producer<OriginConsumerState>,
346}
347
348impl OriginConsumerNotify {
349 fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
350 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
351 self.state
352 .write()
353 .ok()
354 .expect("consumer closed")
355 .apply_announce(path, broadcast);
356 }
357
358 fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
359 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
360 let mut state = self.state.write().ok().expect("consumer closed");
361 state.apply_unannounce(path.clone());
362 state.apply_announce(path, broadcast);
363 }
364
365 fn unannounce(&self, path: impl AsPath) {
366 let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
367 self.state.write().ok().expect("consumer closed").apply_unannounce(path);
368 }
369}
370
371struct NotifyNode {
372 parent: Option<Lock<NotifyNode>>,
373
374 consumers: HashMap<ConsumerId, OriginConsumerNotify>,
377}
378
379impl NotifyNode {
380 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
381 Self {
382 parent,
383 consumers: HashMap::new(),
384 }
385 }
386
387 fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
388 for consumer in self.consumers.values() {
389 consumer.announce(path.as_path(), broadcast.clone());
390 }
391
392 if let Some(parent) = &self.parent {
393 parent.lock().announce(path, broadcast);
394 }
395 }
396
397 fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
398 for consumer in self.consumers.values() {
399 consumer.reannounce(path.as_path(), broadcast.clone());
400 }
401
402 if let Some(parent) = &self.parent {
403 parent.lock().reannounce(path, broadcast);
404 }
405 }
406
407 fn unannounce(&mut self, path: impl AsPath) {
408 for consumer in self.consumers.values() {
409 consumer.unannounce(path.as_path());
410 }
411
412 if let Some(parent) = &self.parent {
413 parent.lock().unannounce(path);
414 }
415 }
416}
417
418struct OriginNode {
419 broadcast: Option<OriginBroadcast>,
421
422 nested: HashMap<String, Lock<OriginNode>>,
424
425 notify: Lock<NotifyNode>,
427}
428
429impl OriginNode {
430 fn new(parent: Option<Lock<NotifyNode>>) -> Self {
431 Self {
432 broadcast: None,
433 nested: HashMap::new(),
434 notify: Lock::new(NotifyNode::new(parent)),
435 }
436 }
437
438 fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
439 let (dir, rest) = path.next_part().expect("leaf called with empty path");
440
441 let next = self.entry(dir);
442 if rest.is_empty() { next } else { next.lock().leaf(&rest) }
443 }
444
445 fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
446 match self.nested.get(dir) {
447 Some(next) => next.clone(),
448 None => {
449 let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
450 self.nested.insert(dir.to_string(), next.clone());
451 next
452 }
453 }
454 }
455
456 fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
457 let full = full.as_path();
458 let rest = relative.as_path();
459
460 if let Some((dir, relative)) = rest.next_part() {
462 self.entry(dir).lock().publish(&full, broadcast, &relative);
464 } else if let Some(existing) = &mut self.broadcast {
465 if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
473 return;
474 }
475
476 if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
477 let old = existing.active.clone();
478 existing.active = broadcast.clone();
479 existing.backup.push_back(old);
480
481 self.notify.lock().reannounce(full, broadcast);
482 } else {
483 existing.backup.push_back(broadcast.clone());
486 }
487 } else {
488 self.broadcast = Some(OriginBroadcast {
490 path: full.to_owned(),
491 active: broadcast.clone(),
492 backup: VecDeque::new(),
493 });
494 self.notify.lock().announce(full, broadcast);
495 }
496 }
497
498 fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
499 self.consume_initial(&mut notify);
500 self.notify.lock().consumers.insert(id, notify);
501 }
502
503 fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
504 if let Some(broadcast) = &self.broadcast {
505 notify.announce(&broadcast.path, broadcast.active.clone());
506 }
507
508 for nested in self.nested.values() {
510 nested.lock().consume_initial(notify);
511 }
512 }
513
514 fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
515 let rest = rest.as_path();
516
517 if let Some((dir, rest)) = rest.next_part() {
518 let node = self.nested.get(dir)?.lock();
519 node.consume_broadcast(&rest)
520 } else {
521 self.broadcast.as_ref().map(|b| b.active.clone())
522 }
523 }
524
525 fn unconsume(&mut self, id: ConsumerId) {
526 self.notify.lock().consumers.remove(&id).expect("consumer not found");
527 if self.is_empty() {
528 }
531 }
532
533 fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
535 let full = full.as_path();
536 let relative = relative.as_path();
537
538 if let Some((dir, relative)) = relative.next_part() {
539 let nested = self.entry(dir);
540 let mut locked = nested.lock();
541 locked.remove(&full, broadcast, &relative);
542
543 if locked.is_empty() {
544 drop(locked);
545 self.nested.remove(dir);
546 }
547 } else {
548 let entry = match &mut self.broadcast {
549 Some(existing) => existing,
550 None => return,
551 };
552
553 let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
555 if let Some(pos) = pos {
556 entry.backup.remove(pos);
557 return;
559 }
560
561 assert!(entry.active.is_clone(&broadcast));
563
564 let best = entry
567 .backup
568 .iter()
569 .enumerate()
570 .min_by_key(|(_, b)| route_key(&full, &b.hops))
571 .map(|(i, _)| i);
572 if let Some(idx) = best {
573 let active = entry.backup.remove(idx).expect("index in range");
574 entry.active = active;
575 self.notify.lock().reannounce(full, &entry.active);
576 } else {
577 self.broadcast = None;
579 self.notify.lock().unannounce(full);
580 }
581 }
582 }
583
584 fn is_empty(&self) -> bool {
585 self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
586 }
587}
588
589#[derive(Clone)]
590struct OriginNodes {
591 nodes: Vec<(PathOwned, Lock<OriginNode>)>,
592}
593
594impl OriginNodes {
595 pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
598 let mut roots = Vec::new();
599
600 for (root, state) in &self.nodes {
601 for prefix in prefixes {
602 if root.has_prefix(prefix) {
603 roots.push((root.to_owned(), state.clone()));
605 continue;
606 }
607
608 if let Some(suffix) = prefix.strip_prefix(root) {
609 let nested = state.lock().leaf(&suffix);
611 roots.push((prefix.to_owned(), nested));
612 }
613 }
614 }
615
616 if roots.is_empty() {
617 None
618 } else {
619 Some(Self { nodes: roots })
620 }
621 }
622
623 pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
624 let new_root = new_root.as_path();
625 let mut roots = Vec::new();
626
627 if new_root.is_empty() {
628 return Some(self.clone());
629 }
630
631 for (root, state) in &self.nodes {
632 if let Some(suffix) = root.strip_prefix(&new_root) {
633 roots.push((suffix.to_owned(), state.clone()));
635 } else if let Some(suffix) = new_root.strip_prefix(root) {
636 let nested = state.lock().leaf(&suffix);
639 roots.push(("".into(), nested));
640 }
641 }
642
643 if roots.is_empty() {
644 None
645 } else {
646 Some(Self { nodes: roots })
647 }
648 }
649
650 pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
652 let path = path.as_path();
653
654 for (root, state) in &self.nodes {
655 if let Some(suffix) = path.strip_prefix(root) {
656 return Some((state.clone(), suffix.to_owned()));
657 }
658 }
659
660 None
661 }
662}
663
664impl Default for OriginNodes {
665 fn default() -> Self {
666 Self {
667 nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
668 }
669 }
670}
671
672pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
674
675#[derive(Clone)]
677pub struct OriginProducer {
678 info: Origin,
682
683 nodes: OriginNodes,
686
687 root: PathOwned,
689
690 dynamic: kio::Producer<OriginDynamicState>,
694}
695
696impl std::ops::Deref for OriginProducer {
697 type Target = Origin;
698
699 fn deref(&self) -> &Self::Target {
700 &self.info
701 }
702}
703
704impl OriginProducer {
705 pub fn new(info: Origin) -> Self {
708 Self {
709 info,
710 nodes: OriginNodes::default(),
711 root: PathOwned::default(),
712 dynamic: kio::Producer::default(),
713 }
714 }
715
716 pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
721 let broadcast = Broadcast::new().produce();
722 self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
723 }
724
725 pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
737 let path = path.as_path();
738
739 if broadcast.hops.contains(&self.info) {
741 return false;
742 }
743
744 let (root, rest) = match self.nodes.get(&path) {
745 Some(root) => root,
746 None => return false,
747 };
748
749 let full = self.root.join(&path);
750
751 root.lock().publish(&full, &broadcast, &rest);
752 let root = root.clone();
753
754 web_async::spawn(async move {
755 broadcast.closed().await;
756 root.lock().remove(&full, broadcast, &rest);
757 });
758
759 true
760 }
761
762 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
768 let prefixes = PathPrefixes::new(prefixes);
769 Some(OriginProducer {
770 info: self.info,
771 nodes: self.nodes.select(&prefixes)?,
772 root: self.root.clone(),
773 dynamic: self.dynamic.clone(),
774 })
775 }
776
777 pub fn dynamic(&self) -> OriginDynamic {
786 OriginDynamic::new(self.info, self.root.clone(), self.dynamic.clone())
787 }
788
789 pub fn consume(&self) -> OriginConsumer {
791 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone(), self.dynamic.consume())
792 }
793
794 #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
799 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
800 let path = path.as_path();
801 let (root, rest) = self.nodes.get(&path)?;
802 let state = root.lock();
803 state.consume_broadcast(&rest)
804 }
805
806 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
811 let prefix = prefix.as_path();
812
813 Some(Self {
814 info: self.info,
815 root: self.root.join(&prefix).to_owned(),
816 nodes: self.nodes.root(&prefix)?,
817 dynamic: self.dynamic.clone(),
818 })
819 }
820
821 pub fn root(&self) -> &Path<'_> {
823 &self.root
824 }
825
826 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
829 self.nodes.nodes.iter().map(|(root, _)| root)
830 }
831
832 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
834 self.root.join(path)
835 }
836}
837
838pub struct OriginConsumer {
842 id: ConsumerId,
843 info: Origin,
845 nodes: OriginNodes,
846
847 state: kio::Producer<OriginConsumerState>,
850
851 root: PathOwned,
853
854 dynamic: kio::Consumer<OriginDynamicState>,
857}
858
859impl std::ops::Deref for OriginConsumer {
860 type Target = Origin;
861
862 fn deref(&self) -> &Self::Target {
863 &self.info
864 }
865}
866
867impl OriginConsumer {
868 fn new(info: Origin, root: PathOwned, nodes: OriginNodes, dynamic: kio::Consumer<OriginDynamicState>) -> Self {
869 let state = kio::Producer::<OriginConsumerState>::default();
870 let id = ConsumerId::new();
871
872 for (_, node) in &nodes.nodes {
873 let notify = OriginConsumerNotify {
874 root: root.clone(),
875 state: state.clone(),
876 };
877 node.lock().consume(id, notify);
878 }
879
880 Self {
881 id,
882 info,
883 nodes,
884 state,
885 root,
886 dynamic,
887 }
888 }
889
890 pub async fn announced(&mut self) -> Option<OriginAnnounce> {
898 kio::wait(|waiter| self.poll_announced(waiter)).await
899 }
900
901 pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
907 let mut state = match ready!(self.state.poll(waiter, |state| {
908 if state.pending.is_empty() {
909 Poll::Pending
910 } else {
911 Poll::Ready(())
912 }
913 })) {
914 Ok(state) => state,
915 Err(_) => return Poll::Ready(None),
917 };
918 Poll::Ready(Some(state.take().expect("predicate guaranteed an update")))
919 }
920
921 pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
926 self.state.write().ok()?.take()
927 }
928
929 pub fn consume(&self) -> Self {
931 self.clone()
932 }
933
934 pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
942 let path = path.as_path();
943 let (root, rest) = self.nodes.get(&path)?;
944 let state = root.lock();
945 state.consume_broadcast(&rest)
946 }
947
948 pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
957 let path = path.as_path();
958
959 let mut consumer = self.scope(std::slice::from_ref(&path))?;
961
962 if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
966 return None;
967 }
968
969 loop {
970 let (announced, broadcast) = consumer.announced().await?;
971 if announced.as_path() == path {
973 if let Some(broadcast) = broadcast {
974 return Some(broadcast);
975 }
976 }
977 }
978 }
979
980 pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
986 let prefixes = PathPrefixes::new(prefixes);
987 Some(OriginConsumer::new(
988 self.info,
989 self.root.clone(),
990 self.nodes.select(&prefixes)?,
991 self.dynamic.clone(),
992 ))
993 }
994
995 pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
1000 let prefix = prefix.as_path();
1001
1002 Some(Self::new(
1003 self.info,
1004 self.root.join(&prefix).to_owned(),
1005 self.nodes.root(&prefix)?,
1006 self.dynamic.clone(),
1007 ))
1008 }
1009
1010 pub fn request_broadcast(&self, path: impl AsPath) -> kio::Pending<BroadcastRequested> {
1026 let path = path.as_path();
1027
1028 if let Some(broadcast) = self.get_broadcast(&path) {
1030 return kio::Pending::new(BroadcastRequested::ready(broadcast));
1031 }
1032
1033 let absolute = self.root.join(&path).to_owned();
1036
1037 let Ok(mut state) = self.dynamic.write() else {
1038 return kio::Pending::new(BroadcastRequested::failed(Error::Dropped));
1039 };
1040
1041 let consumer = if let Some(producer) = state.requests.get(&absolute) {
1043 producer.consume()
1044 } else {
1045 if state.dynamic == 0 {
1046 return kio::Pending::new(BroadcastRequested::failed(Error::Unroutable));
1047 }
1048
1049 let producer = kio::Producer::<PendingBroadcast>::default();
1050 let consumer = producer.consume();
1051 state.requests.insert(absolute.clone(), producer);
1052 state.request_order.push_back(absolute);
1053 consumer
1054 };
1055
1056 kio::Pending::new(BroadcastRequested::pending(consumer))
1057 }
1058
1059 pub fn root(&self) -> &Path<'_> {
1061 &self.root
1062 }
1063
1064 pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
1067 self.nodes.nodes.iter().map(|(root, _)| root)
1068 }
1069
1070 pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
1072 self.root.join(path)
1073 }
1074}
1075
1076impl Drop for OriginConsumer {
1077 fn drop(&mut self) {
1078 for (_, root) in &self.nodes.nodes {
1079 root.lock().unconsume(self.id);
1080 }
1081 }
1082}
1083
1084impl Clone for OriginConsumer {
1085 fn clone(&self) -> Self {
1086 OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone(), self.dynamic.clone())
1087 }
1088}
1089
1090#[derive(Default)]
1096struct OriginDynamicState {
1097 requests: HashMap<PathOwned, kio::Producer<PendingBroadcast>>,
1102
1103 request_order: VecDeque<PathOwned>,
1105
1106 dynamic: usize,
1109}
1110
1111impl OriginDynamicState {
1112 fn reject_requests(&mut self) {
1115 self.requests.clear();
1116 self.request_order.clear();
1117 }
1118}
1119
1120#[derive(Default)]
1127struct PendingBroadcast {
1128 resolved: Option<Result<BroadcastConsumer, Error>>,
1129}
1130
1131pub struct OriginDynamic {
1142 info: Origin,
1143 root: PathOwned,
1144 state: kio::Producer<OriginDynamicState>,
1145}
1146
1147impl Clone for OriginDynamic {
1148 fn clone(&self) -> Self {
1149 if let Ok(mut state) = self.state.write() {
1153 state.dynamic += 1;
1154 }
1155
1156 Self {
1157 info: self.info,
1158 root: self.root.clone(),
1159 state: self.state.clone(),
1160 }
1161 }
1162}
1163
1164impl OriginDynamic {
1165 fn new(info: Origin, root: PathOwned, state: kio::Producer<OriginDynamicState>) -> Self {
1166 if let Ok(mut state) = state.write() {
1167 state.dynamic += 1;
1168 }
1169
1170 Self { info, root, state }
1171 }
1172
1173 pub fn info(&self) -> &Origin {
1175 &self.info
1176 }
1177
1178 fn poll<F>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<kio::Mut<'_, OriginDynamicState>, Error>>
1180 where
1181 F: FnMut(&kio::Ref<'_, OriginDynamicState>) -> Poll<()>,
1182 {
1183 Poll::Ready(match ready!(self.state.poll(waiter, f)) {
1184 Ok(state) => Ok(state),
1185 Err(_) => Err(Error::Dropped),
1186 })
1187 }
1188
1189 pub fn poll_requested_broadcast(&mut self, waiter: &kio::Waiter) -> Poll<Result<BroadcastRequest, Error>> {
1191 let mut state = ready!(self.poll(waiter, |state| {
1192 if state.request_order.is_empty() {
1193 Poll::Pending
1194 } else {
1195 Poll::Ready(())
1196 }
1197 }))?;
1198
1199 let path = state.request_order.pop_front().expect("predicate guaranteed a request");
1200 let producer = state.requests.remove(&path).expect("request_order out of sync");
1201 Poll::Ready(Ok(BroadcastRequest { path, producer }))
1202 }
1203
1204 pub async fn requested_broadcast(&mut self) -> Result<BroadcastRequest, Error> {
1207 kio::wait(|waiter| self.poll_requested_broadcast(waiter)).await
1208 }
1209
1210 pub fn root(&self) -> &Path<'_> {
1212 &self.root
1213 }
1214}
1215
1216impl Drop for OriginDynamic {
1217 fn drop(&mut self) {
1218 if let Ok(mut state) = self.state.write() {
1219 state.dynamic = state.dynamic.saturating_sub(1);
1221 if state.dynamic == 0 {
1222 state.reject_requests();
1224 }
1225 }
1226 }
1227}
1228
1229pub struct BroadcastRequest {
1236 path: PathOwned,
1238
1239 producer: kio::Producer<PendingBroadcast>,
1242}
1243
1244impl BroadcastRequest {
1245 pub fn path(&self) -> &Path<'_> {
1247 &self.path
1248 }
1249
1250 pub fn accept(self, broadcast: BroadcastConsumer) {
1256 if let Ok(mut state) = self.producer.write() {
1257 state.resolved = Some(Ok(broadcast));
1258 }
1259 }
1261
1262 pub fn reject(self, err: Error) {
1264 if let Ok(mut state) = self.producer.write() {
1265 state.resolved = Some(Err(err));
1266 }
1267 }
1268}
1269
1270pub struct BroadcastRequested {
1277 inner: Requested,
1278}
1279
1280enum Requested {
1281 Ready(BroadcastConsumer),
1283 Failed(Error),
1286 Pending(kio::Consumer<PendingBroadcast>),
1288}
1289
1290impl BroadcastRequested {
1291 fn ready(broadcast: BroadcastConsumer) -> Self {
1292 Self {
1293 inner: Requested::Ready(broadcast),
1294 }
1295 }
1296
1297 fn failed(error: Error) -> Self {
1298 Self {
1299 inner: Requested::Failed(error),
1300 }
1301 }
1302
1303 fn pending(consumer: kio::Consumer<PendingBroadcast>) -> Self {
1304 Self {
1305 inner: Requested::Pending(consumer),
1306 }
1307 }
1308
1309 pub fn poll_ok(&self, waiter: &kio::Waiter) -> Poll<Result<BroadcastConsumer, Error>> {
1311 match &self.inner {
1312 Requested::Ready(broadcast) => Poll::Ready(Ok(broadcast.clone())),
1313 Requested::Failed(error) => Poll::Ready(Err(error.clone())),
1314 Requested::Pending(consumer) => Poll::Ready(
1315 match ready!(consumer.poll(waiter, |state| match &state.resolved {
1316 Some(result) => Poll::Ready(result.clone()),
1317 None => Poll::Pending,
1318 })) {
1319 Ok(result) => result,
1320 Err(_closed) => Err(Error::Unroutable),
1322 },
1323 ),
1324 }
1325 }
1326}
1327
1328impl kio::Future for BroadcastRequested {
1329 type Output = Result<BroadcastConsumer, Error>;
1330
1331 fn poll(&self, waiter: &kio::Waiter) -> Poll<Self::Output> {
1332 self.poll_ok(waiter)
1333 }
1334}
1335
1336#[cfg(test)]
1337use futures::FutureExt;
1338
1339#[cfg(test)]
1340impl OriginConsumer {
1341 pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1342 let expected = expected.as_path();
1343 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1344 assert_eq!(path, expected, "wrong path");
1345 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1346 }
1347
1348 pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1349 let expected = expected.as_path();
1350 let (path, active) = self.try_announced().expect("no next");
1351 assert_eq!(path, expected, "wrong path");
1352 assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1353 }
1354
1355 pub fn assert_next_none(&mut self, expected: impl AsPath) {
1356 let expected = expected.as_path();
1357 let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1358 assert_eq!(path, expected, "wrong path");
1359 assert!(active.is_none(), "should be unannounced");
1360 }
1361
1362 pub fn assert_next_wait(&mut self) {
1363 if let Some(res) = self.announced().now_or_never() {
1364 panic!("next should block: got {:?}", res.map(|(path, _)| path));
1365 }
1366 }
1367
1368 }
1377
1378#[cfg(test)]
1379mod tests {
1380 use futures::FutureExt;
1381
1382 use crate::Broadcast;
1383
1384 use super::*;
1385
1386 #[test]
1387 fn origin_list_push_fails_at_limit() {
1388 let mut list = OriginList::new();
1389 for _ in 0..MAX_HOPS {
1390 list.push(Origin::random()).unwrap();
1391 }
1392 assert_eq!(list.len(), MAX_HOPS);
1393 assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1394 }
1395
1396 #[test]
1397 fn origin_list_replace_first() {
1398 let mut list = OriginList::new();
1399 for _ in 0..3 {
1400 list.push(Origin::UNKNOWN).unwrap();
1401 }
1402
1403 assert!(list.replace_first(Origin::UNKNOWN, Origin::from(7)));
1405 assert_eq!(list.as_slice(), &[Origin::from(7), Origin::UNKNOWN, Origin::UNKNOWN]);
1406
1407 assert!(!list.replace_first(Origin::from(99), Origin::from(8)));
1409 assert_eq!(list.len(), 3);
1410 }
1411
1412 #[test]
1413 fn origin_list_try_from_vec_enforces_limit() {
1414 let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1415 assert!(OriginList::try_from(under).is_ok());
1416
1417 let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1418 assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1419 }
1420
1421 #[tokio::test]
1422 async fn test_announce() {
1423 tokio::time::pause();
1424
1425 let origin = Origin::random().produce();
1426 let broadcast1 = Broadcast::new().produce();
1427 let broadcast2 = Broadcast::new().produce();
1428
1429 let mut consumer1 = origin.consume();
1430 consumer1.assert_next_wait();
1432
1433 origin.publish_broadcast("test1", broadcast1.consume());
1435
1436 consumer1.assert_next("test1", &broadcast1.consume());
1437 consumer1.assert_next_wait();
1438
1439 let mut consumer2 = origin.consume();
1442
1443 origin.publish_broadcast("test2", broadcast2.consume());
1445
1446 consumer1.assert_next("test2", &broadcast2.consume());
1447 consumer1.assert_next_wait();
1448
1449 consumer2.assert_next("test1", &broadcast1.consume());
1450 consumer2.assert_next("test2", &broadcast2.consume());
1451 consumer2.assert_next_wait();
1452
1453 drop(broadcast1);
1455
1456 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1458
1459 consumer1.assert_next_none("test1");
1461 consumer2.assert_next_none("test1");
1462 consumer1.assert_next_wait();
1463 consumer2.assert_next_wait();
1464
1465 let mut consumer3 = origin.consume();
1467 consumer3.assert_next("test2", &broadcast2.consume());
1468 consumer3.assert_next_wait();
1469
1470 drop(broadcast2);
1472
1473 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1475
1476 consumer1.assert_next_none("test2");
1477 consumer2.assert_next_none("test2");
1478 consumer3.assert_next_none("test2");
1479
1480 }
1486
1487 #[tokio::test]
1488 async fn test_duplicate() {
1489 tokio::time::pause();
1490
1491 let origin = Origin::random().produce();
1492
1493 let broadcast1 = Broadcast::new().produce();
1494 let broadcast2 = Broadcast::new().produce();
1495 let broadcast3 = Broadcast::new().produce();
1496
1497 let consumer1 = broadcast1.consume();
1498 let consumer2 = broadcast2.consume();
1499 let consumer3 = broadcast3.consume();
1500
1501 let mut consumer = origin.consume();
1502
1503 origin.publish_broadcast("test", consumer1.clone());
1504 origin.publish_broadcast("test", consumer2.clone());
1505 origin.publish_broadcast("test", consumer3.clone());
1506 assert!(consumer.get_broadcast("test").is_some());
1507
1508 consumer.assert_next("test", &consumer1);
1511 consumer.assert_next_wait();
1512
1513 drop(broadcast2);
1515
1516 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1518
1519 assert!(consumer.get_broadcast("test").is_some());
1520 consumer.assert_next_wait();
1521
1522 drop(broadcast1);
1524
1525 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1527
1528 assert!(consumer.get_broadcast("test").is_some());
1529 consumer.assert_next_none("test");
1530 consumer.assert_next("test", &consumer3);
1531
1532 drop(broadcast3);
1534
1535 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1537 assert!(consumer.get_broadcast("test").is_none());
1538
1539 consumer.assert_next_none("test");
1540 consumer.assert_next_wait();
1541 }
1542
1543 #[tokio::test]
1544 async fn test_duplicate_reverse() {
1545 tokio::time::pause();
1546
1547 let origin = Origin::random().produce();
1548 let broadcast1 = Broadcast::new().produce();
1549 let broadcast2 = Broadcast::new().produce();
1550
1551 origin.publish_broadcast("test", broadcast1.consume());
1552 origin.publish_broadcast("test", broadcast2.consume());
1553 assert!(origin.consume().get_broadcast("test").is_some());
1554
1555 drop(broadcast2);
1557
1558 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1560 assert!(origin.consume().get_broadcast("test").is_some());
1561
1562 drop(broadcast1);
1563
1564 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1566 assert!(origin.consume().get_broadcast("test").is_none());
1567 }
1568
1569 #[tokio::test]
1570 async fn test_deterministic_tiebreak() {
1571 tokio::time::pause();
1572
1573 fn route(ids: &[u64]) -> BroadcastProducer {
1575 let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1576 Broadcast { hops }.produce()
1577 }
1578
1579 fn winner(first: &[u64], second: &[u64]) -> OriginList {
1581 let origin = Origin::random().produce();
1582 let a = route(first);
1583 let b = route(second);
1584 origin.publish_broadcast("test", a.consume());
1585 origin.publish_broadcast("test", b.consume());
1586 let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1587 drop((a, b));
1589 hops
1590 }
1591
1592 let forward = winner(&[10, 20], &[30, 40]);
1595 let reverse = winner(&[30, 40], &[10, 20]);
1596 assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1597
1598 assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1600 assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1601 }
1602
1603 #[tokio::test]
1604 async fn test_double_publish() {
1605 tokio::time::pause();
1606
1607 let origin = Origin::random().produce();
1608 let broadcast = Broadcast::new().produce();
1609
1610 origin.publish_broadcast("test", broadcast.consume());
1612 origin.publish_broadcast("test", broadcast.consume());
1613
1614 assert!(origin.consume().get_broadcast("test").is_some());
1615
1616 drop(broadcast);
1617
1618 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1620 assert!(origin.consume().get_broadcast("test").is_none());
1621 }
1622 #[tokio::test]
1627 async fn test_many_announces() {
1628 let origin = Origin::random().produce();
1629 let broadcast = Broadcast::new().produce();
1630
1631 let mut consumer = origin.consume();
1632 for i in 0..256 {
1633 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1634 }
1635
1636 for i in 0..256 {
1637 consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1638 }
1639 consumer.assert_next_wait();
1640 }
1641
1642 #[tokio::test]
1643 async fn test_many_announces_try() {
1644 let origin = Origin::random().produce();
1645 let broadcast = Broadcast::new().produce();
1646
1647 let mut consumer = origin.consume();
1648 for i in 0..256 {
1649 origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1650 }
1651
1652 for i in 0..256 {
1653 consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1654 }
1655 }
1656
1657 #[tokio::test]
1658 async fn test_with_root_basic() {
1659 let origin = Origin::random().produce();
1660 let broadcast = Broadcast::new().produce();
1661
1662 let foo_producer = origin.with_root("foo").expect("should create root");
1664 assert_eq!(foo_producer.root().as_str(), "foo");
1665
1666 let mut consumer = origin.consume();
1667
1668 assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1670 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1672
1673 let mut foo_consumer = foo_producer.consume();
1675 foo_consumer.assert_next("bar/baz", &broadcast.consume());
1676 }
1677
1678 #[tokio::test]
1679 async fn test_with_root_nested() {
1680 let origin = Origin::random().produce();
1681 let broadcast = Broadcast::new().produce();
1682
1683 let foo_producer = origin.with_root("foo").expect("should create foo root");
1685 let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1686 assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1687
1688 let mut consumer = origin.consume();
1689
1690 assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1692 consumer.assert_next("foo/bar/baz", &broadcast.consume());
1694
1695 let mut foo_bar_consumer = foo_bar_producer.consume();
1697 foo_bar_consumer.assert_next("baz", &broadcast.consume());
1698 }
1699
1700 #[tokio::test]
1701 async fn test_publish_scope_allows() {
1702 let origin = Origin::random().produce();
1703 let broadcast = Broadcast::new().produce();
1704
1705 let limited_producer = origin
1707 .scope(&["allowed/path1".into(), "allowed/path2".into()])
1708 .expect("should create limited producer");
1709
1710 assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1712 assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1713 assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1714
1715 assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1717 assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1719 }
1720
1721 #[tokio::test]
1722 async fn test_publish_scope_empty() {
1723 let origin = Origin::random().produce();
1724
1725 assert!(origin.scope(&[]).is_none());
1727 }
1728
1729 #[tokio::test]
1730 async fn test_consume_scope_filters() {
1731 let origin = Origin::random().produce();
1732 let broadcast1 = Broadcast::new().produce();
1733 let broadcast2 = Broadcast::new().produce();
1734 let broadcast3 = Broadcast::new().produce();
1735
1736 let mut consumer = origin.consume();
1737
1738 origin.publish_broadcast("allowed", broadcast1.consume());
1740 origin.publish_broadcast("allowed/nested", broadcast2.consume());
1741 origin.publish_broadcast("notallowed", broadcast3.consume());
1742
1743 let mut limited_consumer = origin
1745 .consume()
1746 .scope(&["allowed".into()])
1747 .expect("should create limited consumer");
1748
1749 limited_consumer.assert_next("allowed", &broadcast1.consume());
1751 limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1752 limited_consumer.assert_next_wait(); consumer.assert_next("allowed", &broadcast1.consume());
1756 consumer.assert_next("allowed/nested", &broadcast2.consume());
1757 consumer.assert_next("notallowed", &broadcast3.consume());
1758 }
1759
1760 #[tokio::test]
1761 async fn test_consume_scope_multiple_prefixes() {
1762 let origin = Origin::random().produce();
1763 let broadcast1 = Broadcast::new().produce();
1764 let broadcast2 = Broadcast::new().produce();
1765 let broadcast3 = Broadcast::new().produce();
1766
1767 origin.publish_broadcast("foo/test", broadcast1.consume());
1768 origin.publish_broadcast("bar/test", broadcast2.consume());
1769 origin.publish_broadcast("baz/test", broadcast3.consume());
1770
1771 let mut limited_consumer = origin
1773 .consume()
1774 .scope(&["foo".into(), "bar".into()])
1775 .expect("should create limited consumer");
1776
1777 limited_consumer.assert_next("bar/test", &broadcast2.consume());
1779 limited_consumer.assert_next("foo/test", &broadcast1.consume());
1780 limited_consumer.assert_next_wait(); }
1782
1783 #[tokio::test]
1784 async fn test_with_root_and_publish_scope() {
1785 let origin = Origin::random().produce();
1786 let broadcast = Broadcast::new().produce();
1787
1788 let foo_producer = origin.with_root("foo").expect("should create foo root");
1790
1791 let limited_producer = foo_producer
1793 .scope(&["bar".into(), "goop/pee".into()])
1794 .expect("should create limited producer");
1795
1796 let mut consumer = origin.consume();
1797
1798 assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1800 assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1801 assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1802 assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1803
1804 assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1806 assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1808
1809 consumer.assert_next("foo/bar", &broadcast.consume());
1811 consumer.assert_next("foo/bar/nested", &broadcast.consume());
1812 consumer.assert_next("foo/goop/pee", &broadcast.consume());
1813 consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1814 }
1815
1816 #[tokio::test]
1817 async fn test_with_root_and_consume_scope() {
1818 let origin = Origin::random().produce();
1819 let broadcast1 = Broadcast::new().produce();
1820 let broadcast2 = Broadcast::new().produce();
1821 let broadcast3 = Broadcast::new().produce();
1822
1823 origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1825 origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1826 origin.publish_broadcast("foo/other/test", broadcast3.consume());
1827
1828 let foo_producer = origin.with_root("foo").expect("should create foo root");
1830
1831 let mut limited_consumer = foo_producer
1833 .consume()
1834 .scope(&["bar".into(), "goop/pee".into()])
1835 .expect("should create limited consumer");
1836
1837 limited_consumer.assert_next("bar/test", &broadcast1.consume());
1839 limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1840 limited_consumer.assert_next_wait(); }
1842
1843 #[tokio::test]
1844 async fn test_with_root_unauthorized() {
1845 let origin = Origin::random().produce();
1846
1847 let limited_producer = origin
1849 .scope(&["allowed".into()])
1850 .expect("should create limited producer");
1851
1852 assert!(limited_producer.with_root("notallowed").is_none());
1854
1855 let allowed_root = limited_producer
1857 .with_root("allowed")
1858 .expect("should create allowed root");
1859 assert_eq!(allowed_root.root().as_str(), "allowed");
1860 }
1861
1862 #[tokio::test]
1863 async fn test_wildcard_permission() {
1864 let origin = Origin::random().produce();
1865 let broadcast = Broadcast::new().produce();
1866
1867 let root_producer = origin.clone();
1869
1870 assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1872 assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1873
1874 let foo_producer = root_producer.with_root("foo").expect("should create any root");
1876 assert_eq!(foo_producer.root().as_str(), "foo");
1877 }
1878
1879 #[tokio::test]
1880 async fn test_consume_broadcast_with_permissions() {
1881 let origin = Origin::random().produce();
1882 let broadcast1 = Broadcast::new().produce();
1883 let broadcast2 = Broadcast::new().produce();
1884
1885 origin.publish_broadcast("allowed/test", broadcast1.consume());
1886 origin.publish_broadcast("notallowed/test", broadcast2.consume());
1887
1888 let limited_consumer = origin
1890 .consume()
1891 .scope(&["allowed".into()])
1892 .expect("should create limited consumer");
1893
1894 let result = limited_consumer.get_broadcast("allowed/test");
1896 assert!(result.is_some());
1897 assert!(result.unwrap().is_clone(&broadcast1.consume()));
1898
1899 assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1901
1902 let consumer = origin.consume();
1904 assert!(consumer.get_broadcast("allowed/test").is_some());
1905 assert!(consumer.get_broadcast("notallowed/test").is_some());
1906 }
1907
1908 #[tokio::test]
1909 async fn test_nested_paths_with_permissions() {
1910 let origin = Origin::random().produce();
1911 let broadcast = Broadcast::new().produce();
1912
1913 let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1915
1916 assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1918 assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1919 assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1920
1921 assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1923 assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1924 assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1925 }
1926
1927 #[tokio::test]
1928 async fn test_multiple_consumers_with_different_permissions() {
1929 let origin = Origin::random().produce();
1930 let broadcast1 = Broadcast::new().produce();
1931 let broadcast2 = Broadcast::new().produce();
1932 let broadcast3 = Broadcast::new().produce();
1933
1934 origin.publish_broadcast("foo/test", broadcast1.consume());
1936 origin.publish_broadcast("bar/test", broadcast2.consume());
1937 origin.publish_broadcast("baz/test", broadcast3.consume());
1938
1939 let mut foo_consumer = origin
1941 .consume()
1942 .scope(&["foo".into()])
1943 .expect("should create foo consumer");
1944
1945 let mut bar_consumer = origin
1946 .consume()
1947 .scope(&["bar".into()])
1948 .expect("should create bar consumer");
1949
1950 let mut foobar_consumer = origin
1951 .consume()
1952 .scope(&["foo".into(), "bar".into()])
1953 .expect("should create foobar consumer");
1954
1955 foo_consumer.assert_next("foo/test", &broadcast1.consume());
1957 foo_consumer.assert_next_wait();
1958
1959 bar_consumer.assert_next("bar/test", &broadcast2.consume());
1960 bar_consumer.assert_next_wait();
1961
1962 foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1963 foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1964 foobar_consumer.assert_next_wait();
1965 }
1966
1967 #[tokio::test]
1968 async fn test_select_with_empty_prefix() {
1969 let origin = Origin::random().produce();
1970 let broadcast1 = Broadcast::new().produce();
1971 let broadcast2 = Broadcast::new().produce();
1972
1973 let demo_producer = origin.with_root("demo").expect("should create demo root");
1975 let limited_producer = demo_producer
1976 .scope(&["worm-node".into(), "foobar".into()])
1977 .expect("should create limited producer");
1978
1979 assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1981 assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1982
1983 let mut consumer = limited_producer
1985 .consume()
1986 .scope(&["".into()])
1987 .expect("should create consumer with empty prefix");
1988
1989 let a1 = consumer.try_announced().expect("expected first announcement");
1991 let a2 = consumer.try_announced().expect("expected second announcement");
1992 consumer.assert_next_wait();
1993
1994 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1995 paths.sort();
1996 assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1997 }
1998
1999 #[tokio::test]
2000 async fn test_select_narrowing_scope() {
2001 let origin = Origin::random().produce();
2002 let broadcast1 = Broadcast::new().produce();
2003 let broadcast2 = Broadcast::new().produce();
2004 let broadcast3 = Broadcast::new().produce();
2005
2006 let demo_producer = origin.with_root("demo").expect("should create demo root");
2008 let limited_producer = demo_producer
2009 .scope(&["worm-node".into(), "foobar".into()])
2010 .expect("should create limited producer");
2011
2012 assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
2014 assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
2015 assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
2016
2017 let mut worm_consumer = limited_producer
2019 .consume()
2020 .scope(&["worm-node".into()])
2021 .expect("should create worm-node consumer");
2022
2023 worm_consumer.assert_next("worm-node", &broadcast1.consume());
2025 worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
2026 worm_consumer.assert_next_wait(); let mut foo_consumer = limited_producer
2030 .consume()
2031 .scope(&["worm-node/foo".into()])
2032 .expect("should create worm-node/foo consumer");
2033
2034 foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
2035 foo_consumer.assert_next_wait(); }
2037
2038 #[tokio::test]
2039 async fn test_select_multiple_roots_with_empty_prefix() {
2040 let origin = Origin::random().produce();
2041 let broadcast1 = Broadcast::new().produce();
2042 let broadcast2 = Broadcast::new().produce();
2043 let broadcast3 = Broadcast::new().produce();
2044
2045 let limited_producer = origin
2047 .scope(&["app1".into(), "app2".into(), "shared".into()])
2048 .expect("should create limited producer");
2049
2050 assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
2052 assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
2053 assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
2054
2055 let mut consumer = limited_producer
2057 .consume()
2058 .scope(&["".into()])
2059 .expect("should create consumer with empty prefix");
2060
2061 consumer.assert_next("app1/data", &broadcast1.consume());
2063 consumer.assert_next("app2/config", &broadcast2.consume());
2064 consumer.assert_next("shared/resource", &broadcast3.consume());
2065 consumer.assert_next_wait();
2066 }
2067
2068 #[tokio::test]
2069 async fn test_publish_scope_with_empty_prefix() {
2070 let origin = Origin::random().produce();
2071 let broadcast = Broadcast::new().produce();
2072
2073 let limited_producer = origin
2075 .scope(&["services/api".into(), "services/web".into()])
2076 .expect("should create limited producer");
2077
2078 let same_producer = limited_producer
2080 .scope(&["".into()])
2081 .expect("should create producer with empty prefix");
2082
2083 assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
2085 assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
2086 assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
2087 assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
2088 }
2089
2090 #[tokio::test]
2091 async fn test_select_narrowing_to_deeper_path() {
2092 let origin = Origin::random().produce();
2093 let broadcast1 = Broadcast::new().produce();
2094 let broadcast2 = Broadcast::new().produce();
2095 let broadcast3 = Broadcast::new().produce();
2096
2097 let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
2099
2100 assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
2102 assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
2103 assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
2104
2105 let mut team2_consumer = limited_producer
2107 .consume()
2108 .scope(&["org/team2".into()])
2109 .expect("should create team2 consumer");
2110
2111 team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
2112 team2_consumer.assert_next_wait(); let mut project1_consumer = limited_producer
2116 .consume()
2117 .scope(&["org/team1/project1".into()])
2118 .expect("should create project1 consumer");
2119
2120 project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
2122 project1_consumer.assert_next_wait();
2123 }
2124
2125 #[tokio::test]
2126 async fn test_select_with_non_matching_prefix() {
2127 let origin = Origin::random().produce();
2128
2129 let limited_producer = origin
2131 .scope(&["allowed/path".into()])
2132 .expect("should create limited producer");
2133
2134 assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
2136
2137 assert!(limited_producer.scope(&["other/path".into()]).is_none());
2139 }
2140
2141 #[tokio::test]
2144 async fn test_with_root_trailing_slash_consumer() {
2145 let origin = Origin::random().produce();
2146
2147 let prefix = "some_prefix/".to_string();
2149 let mut consumer = origin.consume().with_root(prefix).unwrap();
2150
2151 let b = origin.create_broadcast("some_prefix/test").unwrap();
2152 consumer.assert_next("test", &b.consume());
2153 }
2154
2155 #[tokio::test]
2157 async fn test_with_root_trailing_slash_producer() {
2158 let origin = Origin::random().produce();
2159
2160 let prefix = "some_prefix/".to_string();
2162 let rooted = origin.with_root(prefix).unwrap();
2163
2164 let b = rooted.create_broadcast("test").unwrap();
2165
2166 let mut consumer = rooted.consume();
2167 consumer.assert_next("test", &b.consume());
2168 }
2169
2170 #[tokio::test]
2172 async fn test_with_root_trailing_slash_unannounce() {
2173 tokio::time::pause();
2174
2175 let origin = Origin::random().produce();
2176
2177 let prefix = "some_prefix/".to_string();
2178 let mut consumer = origin.consume().with_root(prefix).unwrap();
2179
2180 let b = origin.create_broadcast("some_prefix/test").unwrap();
2181 consumer.assert_next("test", &b.consume());
2182
2183 drop(b);
2185 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2186
2187 consumer.assert_next_none("test");
2189 }
2190
2191 #[tokio::test]
2192 async fn test_select_maintains_access_with_wider_prefix() {
2193 let origin = Origin::random().produce();
2194 let broadcast1 = Broadcast::new().produce();
2195 let broadcast2 = Broadcast::new().produce();
2196
2197 let demo_producer = origin.with_root("demo").expect("should create demo root");
2199 let user_producer = demo_producer
2200 .scope(&["worm-node".into(), "foobar".into()])
2201 .expect("should create user producer");
2202
2203 assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
2205 assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
2206
2207 let mut consumer = user_producer
2209 .consume()
2210 .scope(&["".into()])
2211 .expect("scope with empty prefix should not fail when user has specific permissions");
2212
2213 let a1 = consumer.try_announced().expect("expected first announcement");
2215 let a2 = consumer.try_announced().expect("expected second announcement");
2216 consumer.assert_next_wait();
2217
2218 let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
2219 paths.sort();
2220 assert_eq!(paths, ["foobar", "worm-node/data"]);
2221
2222 let mut narrow_consumer = user_producer
2224 .consume()
2225 .scope(&["worm-node".into()])
2226 .expect("should be able to narrow scope to worm-node");
2227
2228 narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
2229 narrow_consumer.assert_next_wait(); }
2231
2232 #[tokio::test]
2233 async fn test_duplicate_prefixes_deduped() {
2234 let origin = Origin::random().produce();
2235 let broadcast = Broadcast::new().produce();
2236
2237 let producer = origin
2239 .scope(&["demo".into(), "demo".into()])
2240 .expect("should create producer");
2241
2242 assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
2243
2244 let mut consumer = producer.consume();
2245 consumer.assert_next("demo/stream", &broadcast.consume());
2246 consumer.assert_next_wait();
2247 }
2248
2249 #[tokio::test]
2250 async fn test_overlapping_prefixes_deduped() {
2251 let origin = Origin::random().produce();
2252 let broadcast = Broadcast::new().produce();
2253
2254 let producer = origin
2256 .scope(&["demo".into(), "demo/foo".into()])
2257 .expect("should create producer");
2258
2259 assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
2261
2262 let mut consumer = producer.consume();
2263 consumer.assert_next("demo/bar/stream", &broadcast.consume());
2264 consumer.assert_next_wait();
2265 }
2266
2267 #[tokio::test]
2268 async fn test_overlapping_prefixes_no_duplicate_announcements() {
2269 let origin = Origin::random().produce();
2270 let broadcast = Broadcast::new().produce();
2271
2272 let producer = origin
2274 .scope(&["demo".into(), "demo/foo".into()])
2275 .expect("should create producer");
2276
2277 assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
2278
2279 let mut consumer = producer.consume();
2280 consumer.assert_next("demo/foo/stream", &broadcast.consume());
2282 consumer.assert_next_wait();
2283 }
2284
2285 #[tokio::test]
2286 async fn test_allowed_returns_deduped_prefixes() {
2287 let origin = Origin::random().produce();
2288
2289 let producer = origin
2290 .scope(&["demo".into(), "demo/foo".into(), "anon".into()])
2291 .expect("should create producer");
2292
2293 let allowed: Vec<_> = producer.allowed().collect();
2294 assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
2295 }
2296
2297 #[tokio::test]
2298 async fn test_announced_broadcast_already_announced() {
2299 let origin = Origin::random().produce();
2300 let broadcast = Broadcast::new().produce();
2301
2302 origin.publish_broadcast("test", broadcast.consume());
2303
2304 let consumer = origin.consume();
2305 let result = consumer.announced_broadcast("test").await.expect("should find it");
2306 assert!(result.is_clone(&broadcast.consume()));
2307 }
2308
2309 #[tokio::test]
2310 async fn test_announced_broadcast_delayed() {
2311 tokio::time::pause();
2312
2313 let origin = Origin::random().produce();
2314 let broadcast = Broadcast::new().produce();
2315
2316 let consumer = origin.consume();
2317
2318 let wait = tokio::spawn({
2320 let consumer = consumer.clone();
2321 async move { consumer.announced_broadcast("test").await }
2322 });
2323
2324 tokio::task::yield_now().await;
2326
2327 origin.publish_broadcast("test", broadcast.consume());
2328
2329 let result = wait.await.unwrap().expect("should find it");
2330 assert!(result.is_clone(&broadcast.consume()));
2331 }
2332
2333 #[tokio::test]
2334 async fn test_announced_broadcast_ignores_unrelated_paths() {
2335 tokio::time::pause();
2336
2337 let origin = Origin::random().produce();
2338 let other = Broadcast::new().produce();
2339 let target = Broadcast::new().produce();
2340
2341 let consumer = origin.consume();
2342
2343 let wait = tokio::spawn({
2344 let consumer = consumer.clone();
2345 async move { consumer.announced_broadcast("target").await }
2346 });
2347
2348 tokio::task::yield_now().await;
2349
2350 origin.publish_broadcast("other", other.consume());
2352 tokio::task::yield_now().await;
2353 assert!(!wait.is_finished(), "must not resolve on unrelated path");
2354
2355 origin.publish_broadcast("target", target.consume());
2356 let result = wait.await.unwrap().expect("should find target");
2357 assert!(result.is_clone(&target.consume()));
2358 }
2359
2360 #[tokio::test]
2361 async fn test_announced_broadcast_skips_nested_paths() {
2362 tokio::time::pause();
2363
2364 let origin = Origin::random().produce();
2365 let nested = Broadcast::new().produce();
2366 let exact = Broadcast::new().produce();
2367
2368 let consumer = origin.consume();
2369
2370 let wait = tokio::spawn({
2371 let consumer = consumer.clone();
2372 async move { consumer.announced_broadcast("foo").await }
2373 });
2374
2375 tokio::task::yield_now().await;
2376
2377 origin.publish_broadcast("foo/bar", nested.consume());
2379 tokio::task::yield_now().await;
2380 assert!(!wait.is_finished(), "must not resolve on a nested path");
2381
2382 origin.publish_broadcast("foo", exact.consume());
2383 let result = wait.await.unwrap().expect("should find foo exactly");
2384 assert!(result.is_clone(&exact.consume()));
2385 }
2386
2387 #[tokio::test]
2388 async fn test_announced_broadcast_disallowed() {
2389 let origin = Origin::random().produce();
2390 let limited = origin
2391 .consume()
2392 .scope(&["allowed".into()])
2393 .expect("should create limited");
2394
2395 assert!(limited.announced_broadcast("notallowed").await.is_none());
2397 }
2398
2399 #[tokio::test]
2400 async fn test_announced_broadcast_scope_too_narrow() {
2401 let origin = Origin::random().produce();
2404 let limited = origin
2405 .consume()
2406 .scope(&["foo/specific".into()])
2407 .expect("should create limited");
2408
2409 let result = limited
2411 .announced_broadcast("foo")
2412 .now_or_never()
2413 .expect("must not block");
2414 assert!(result.is_none());
2415 }
2416
2417 #[tokio::test]
2421 async fn test_coalesce_announce_then_unannounce() {
2422 tokio::time::pause();
2424
2425 let origin = Origin::random().produce();
2426 let mut consumer = origin.consume();
2427
2428 let broadcast = Broadcast::new().produce();
2429 origin.publish_broadcast("test", broadcast.consume());
2430 drop(broadcast);
2431
2432 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2433
2434 consumer.assert_next_wait();
2435 }
2436
2437 #[tokio::test]
2438 async fn test_coalesce_announce_unannounce_announce() {
2439 tokio::time::pause();
2442
2443 let origin = Origin::random().produce();
2444 let mut consumer = origin.consume();
2445
2446 let broadcast1 = Broadcast::new().produce();
2447 let broadcast2 = Broadcast::new().produce();
2448
2449 origin.publish_broadcast("test", broadcast1.consume());
2450 drop(broadcast1);
2451 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2452 origin.publish_broadcast("test", broadcast2.consume());
2453
2454 consumer.assert_next("test", &broadcast2.consume());
2455 consumer.assert_next_wait();
2456 }
2457
2458 #[tokio::test]
2459 async fn test_coalesce_unannounce_announce_preserved() {
2460 tokio::time::pause();
2463
2464 let origin = Origin::random().produce();
2465 let broadcast1 = Broadcast::new().produce();
2466 origin.publish_broadcast("test", broadcast1.consume());
2467
2468 let mut consumer = origin.consume();
2469 consumer.assert_next("test", &broadcast1.consume());
2470
2471 drop(broadcast1);
2473 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2474
2475 let broadcast2 = Broadcast::new().produce();
2476 origin.publish_broadcast("test", broadcast2.consume());
2477
2478 consumer.assert_next_none("test");
2480 consumer.assert_next("test", &broadcast2.consume());
2481 consumer.assert_next_wait();
2482 }
2483
2484 #[tokio::test]
2485 async fn test_coalesce_unannounce_announce_unannounce() {
2486 tokio::time::pause();
2489
2490 let origin = Origin::random().produce();
2491 let broadcast1 = Broadcast::new().produce();
2492 origin.publish_broadcast("test", broadcast1.consume());
2493
2494 let mut consumer = origin.consume();
2495 consumer.assert_next("test", &broadcast1.consume());
2496
2497 drop(broadcast1);
2498 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2499
2500 let broadcast2 = Broadcast::new().produce();
2501 origin.publish_broadcast("test", broadcast2.consume());
2502 drop(broadcast2);
2503 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2504
2505 consumer.assert_next_none("test");
2506 consumer.assert_next_wait();
2507 }
2508
2509 #[tokio::test]
2510 async fn test_coalesce_churn_bounded() {
2511 tokio::time::pause();
2516
2517 let origin = Origin::random().produce();
2518 let mut consumer = origin.consume();
2519
2520 for _ in 0..1000 {
2521 let broadcast = Broadcast::new().produce();
2522 origin.publish_broadcast("test", broadcast.consume());
2523 drop(broadcast);
2524 }
2525 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2526
2527 let mut collected = Vec::new();
2528 while let Some(update) = consumer.try_announced() {
2529 collected.push(update);
2530 }
2531 assert!(
2532 collected.len() <= 1,
2533 "expected at most one pending update, got {}",
2534 collected.len()
2535 );
2536 assert!(
2537 collected.iter().all(|(path, _)| path == &Path::new("test")),
2538 "unexpected path in pending updates",
2539 );
2540 }
2541
2542 #[tokio::test]
2544 async fn dynamic_request_unroutable_without_handler() {
2545 let origin = Origin::random().produce();
2546 let consumer = origin.consume();
2547 assert!(matches!(
2548 consumer.request_broadcast("missing").await,
2549 Err(Error::Unroutable)
2550 ));
2551 }
2552
2553 #[tokio::test(start_paused = true)]
2555 async fn dynamic_request_served_not_announced() {
2556 let origin = Origin::random().produce();
2557 let mut dynamic = origin.dynamic();
2558 let consumer = origin.consume();
2559
2560 let mut announced = origin.consume();
2562 announced.assert_next_wait();
2563
2564 let request_fut = consumer.request_broadcast("fallback");
2567
2568 let served = Broadcast::new().produce();
2570
2571 let request = dynamic.requested_broadcast().await.unwrap();
2572 assert_eq!(request.path(), &Path::new("fallback"));
2573 request.accept(served.consume());
2574
2575 let broadcast = request_fut.await.unwrap();
2576 assert!(broadcast.is_clone(&served.consume()));
2577
2578 announced.assert_next_wait();
2580 }
2581
2582 #[tokio::test(start_paused = true)]
2584 async fn dynamic_request_coalesces() {
2585 let origin = Origin::random().produce();
2586 let mut dynamic = origin.dynamic();
2587 let consumer = origin.consume();
2588
2589 let f1 = consumer.request_broadcast("dup");
2591 let f2 = consumer.request_broadcast("dup");
2592
2593 let request = dynamic.requested_broadcast().await.unwrap();
2595 assert_eq!(request.path(), &Path::new("dup"));
2596 assert!(
2597 dynamic.requested_broadcast().now_or_never().is_none(),
2598 "a coalesced request must not be served twice"
2599 );
2600
2601 let served = Broadcast::new().produce();
2603 request.accept(served.consume());
2604 assert!(f1.await.unwrap().is_clone(&served.consume()));
2605 assert!(f2.await.unwrap().is_clone(&served.consume()));
2606 }
2607
2608 #[tokio::test(start_paused = true)]
2610 async fn dynamic_request_rejected() {
2611 let origin = Origin::random().produce();
2612 let mut dynamic = origin.dynamic();
2613 let consumer = origin.consume();
2614
2615 let request_fut = consumer.request_broadcast("fallback");
2616
2617 let request = dynamic.requested_broadcast().await.unwrap();
2618 request.reject(Error::Cancel);
2619
2620 assert!(matches!(request_fut.await, Err(Error::Cancel)));
2621 }
2622
2623 #[tokio::test(start_paused = true)]
2626 async fn dynamic_request_handler_dropped() {
2627 let origin = Origin::random().produce();
2628 let dynamic = origin.dynamic();
2629 let consumer = origin.consume();
2630
2631 let request_fut = consumer.request_broadcast("fallback");
2632 drop(dynamic);
2633 assert!(matches!(request_fut.await, Err(Error::Unroutable)));
2634
2635 assert!(matches!(
2637 consumer.request_broadcast("again").await,
2638 Err(Error::Unroutable)
2639 ));
2640 }
2641
2642 #[tokio::test(start_paused = true)]
2646 async fn dynamic_request_accept_after_handler_dropped() {
2647 let origin = Origin::random().produce();
2648 let mut dynamic = origin.dynamic();
2649 let consumer = origin.consume();
2650
2651 let request_fut = consumer.request_broadcast("fallback");
2652
2653 let request = dynamic.requested_broadcast().await.unwrap();
2655 drop(dynamic);
2656
2657 let served = Broadcast::new().produce();
2659 request.accept(served.consume());
2660 assert!(request_fut.await.unwrap().is_clone(&served.consume()));
2661 }
2662
2663 #[tokio::test(start_paused = true)]
2665 async fn dynamic_request_prefers_announced() {
2666 let origin = Origin::random().produce();
2667 let mut dynamic = origin.dynamic();
2668 let consumer = origin.consume();
2669
2670 let broadcast = Broadcast::new().produce();
2671 assert!(origin.publish_broadcast("live", broadcast.consume()));
2672
2673 let got = consumer.request_broadcast("live").await.unwrap();
2674 assert!(
2675 got.is_clone(&broadcast.consume()),
2676 "should return the announced broadcast"
2677 );
2678 assert!(
2679 dynamic.requested_broadcast().now_or_never().is_none(),
2680 "an announced path must not queue a fallback request"
2681 );
2682 }
2683
2684 #[tokio::test(start_paused = true)]
2686 async fn dynamic_clone_keeps_alive() {
2687 let origin = Origin::random().produce();
2688 let dynamic = origin.dynamic();
2689 let consumer = origin.consume();
2690
2691 drop(dynamic.clone());
2692
2693 let request_fut = consumer.request_broadcast("fallback");
2696 assert!(
2697 request_fut.now_or_never().is_none(),
2698 "request should stay pending until served"
2699 );
2700 }
2701}