Skip to main content

moq_net/model/
origin.rs

1use std::{
2	collections::{BTreeMap, HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5	task::Poll,
6};
7
8use rand::Rng;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	///
38	/// TEMPORARY: the wire format allows 62 bits, but older `@moq/lite` JS
39	/// clients decode `AnnounceInterest.exclude_hop` as a u53 (number) and
40	/// throw on anything > 2^53-1. To keep those clients alive against
41	/// fresh relays, we cap the random id at 53 bits. Restore to 62 bits
42	/// once the JS u62 fix has propagated to deployed bundles.
43	pub fn random() -> Self {
44		let mut rng = rand::rng();
45		let id = rng.random_range(1..(1u64 << 53));
46		Self { id }
47	}
48
49	/// Consume this [Origin] to create a producer that carries its id.
50	pub fn produce(self) -> OriginProducer {
51		OriginProducer::new(self)
52	}
53}
54
55impl From<u64> for Origin {
56	fn from(id: u64) -> Self {
57		Self { id }
58	}
59}
60
61impl fmt::Display for Origin {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		self.id.fmt(f)
64	}
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69	u64: Encode<V>,
70{
71	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72		self.id.encode(w, version)
73	}
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78	u64: Decode<V>,
79{
80	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81		let id = u64::decode(r, version)?;
82		if id >= 1u64 << 62 {
83			return Err(DecodeError::InvalidValue);
84		}
85		Ok(Self { id })
86	}
87}
88
89/// Maximum number of origins (hops) an [`OriginList`] can hold.
90///
91/// Caps pathological or loop-induced announcements at a reasonable cluster
92/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
93/// silently truncating.
94pub(crate) const MAX_HOPS: usize = 32;
95
96/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
97///
98/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
99/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111		write!(f, "too many origins (max {MAX_HOPS})")
112	}
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118	fn from(_: TooManyOrigins) -> Self {
119		DecodeError::BoundsExceeded
120	}
121}
122
123impl OriginList {
124	/// Create an empty list.
125	pub fn new() -> Self {
126		Self(Vec::new())
127	}
128
129	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
130	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131		if self.0.len() >= MAX_HOPS {
132			return Err(TooManyOrigins);
133		}
134		self.0.push(origin);
135		Ok(())
136	}
137
138	/// Returns true if any entry matches `origin`.
139	pub fn contains(&self, origin: &Origin) -> bool {
140		self.0.contains(origin)
141	}
142
143	/// Number of entries currently in the list (always `<= MAX_HOPS`).
144	pub fn len(&self) -> usize {
145		self.0.len()
146	}
147
148	/// Whether the list contains no entries.
149	pub fn is_empty(&self) -> bool {
150		self.0.is_empty()
151	}
152
153	/// Iterate over the entries in hop order (oldest first).
154	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155		self.0.iter()
156	}
157
158	/// Borrow the entries as a slice.
159	pub fn as_slice(&self) -> &[Origin] {
160		&self.0
161	}
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165	type Error = TooManyOrigins;
166
167	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168		if v.len() > MAX_HOPS {
169			return Err(TooManyOrigins);
170		}
171		Ok(Self(v))
172	}
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176	type Item = &'a Origin;
177	type IntoIter = std::slice::Iter<'a, Origin>;
178
179	fn into_iter(self) -> Self::IntoIter {
180		self.iter()
181	}
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186	u64: Encode<V>,
187	Origin: Encode<V>,
188{
189	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190		(self.0.len() as u64).encode(w, version)?;
191		for origin in &self.0 {
192			origin.encode(w, version)?;
193		}
194		Ok(())
195	}
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200	u64: Decode<V>,
201	Origin: Decode<V>,
202{
203	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204		let count = u64::decode(r, version)? as usize;
205		if count > MAX_HOPS {
206			return Err(DecodeError::BoundsExceeded);
207		}
208		let mut list = Vec::with_capacity(count);
209		for _ in 0..count {
210			list.push(Origin::decode(r, version)?);
211		}
212		Ok(Self(list))
213	}
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222	fn new() -> Self {
223		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224	}
225}
226
227// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
228struct OriginBroadcast {
229	path: PathOwned,
230	active: BroadcastConsumer,
231	backup: VecDeque<BroadcastConsumer>,
232}
233
234/// Ordering key used to pick the active route among broadcasts at the same path.
235///
236/// Lower wins. Shorter hop chains sort first; equal-length chains are broken by a
237/// deterministic hash of the broadcast name and hop chain, so every node in the
238/// cluster, given the same candidate routes, converges on the same winner instead
239/// of relying on arrival order. Mixing the name in spreads equal-length routes
240/// across different upstreams rather than funneling every broadcast onto one.
241fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
242	// FNV-1a, not the std hasher: its output is fixed across Rust versions and
243	// builds, which matters when nodes run mismatched binaries during a rolling
244	// deploy and still need to agree on the same route. SEED is a custom basis
245	// (any nonzero u64 works, the textbook one is just as arbitrary); FNV_PRIME is
246	// the standard FNV-64 prime and should stay put.
247	const SEED: u64 = 0x420C0DECB00B; // 420 C0DEC B00B
248	const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
249
250	let mut hash = SEED;
251	for &byte in name.as_str().as_bytes() {
252		hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
253	}
254	for hop in hops {
255		for &byte in &hop.id.to_le_bytes() {
256			hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
257		}
258	}
259
260	(hops.len(), hash)
261}
262
263/// One coalesced update queued for an `OriginConsumer`.
264///
265/// At most one entry exists per path, so a slow consumer's pending set is bounded
266/// by the number of distinct paths. `UnannounceAnnounce` preserves the
267/// signal that the broadcast at a path was replaced (the consumer must see
268/// `(path, None)` before `(path, Some(new))`), while a stale
269/// `Announce` cancels with a subsequent `unannounce` because the consumer
270/// has not yet observed it.
271enum PendingUpdate {
272	Announce(BroadcastConsumer),
273	Unannounce,
274	UnannounceAnnounce(BroadcastConsumer),
275}
276
277/// Pending updates keyed by path. `BTreeMap` keeps memory strictly bounded by
278/// the number of distinct paths with outstanding work (collapsed pairs are
279/// fully erased) and gives a deterministic lexicographic delivery order so
280/// tests can predict it.
281#[derive(Default)]
282struct OriginConsumerState {
283	pending: BTreeMap<PathOwned, PendingUpdate>,
284}
285
286impl OriginConsumerState {
287	fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
288		let new = match self.pending.remove(&path) {
289			// First announce, or a stale announce being replaced.
290			None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
291			// Consumer needs to observe the unannounce before this announce.
292			Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
293				PendingUpdate::UnannounceAnnounce(broadcast)
294			}
295		};
296		self.pending.insert(path, new);
297	}
298
299	fn apply_unannounce(&mut self, path: PathOwned) {
300		match self.pending.remove(&path) {
301			// Consumer has not seen the pending announce; drop both entirely.
302			Some(PendingUpdate::Announce(_)) => {}
303			None | Some(PendingUpdate::Unannounce) => {
304				self.pending.insert(path, PendingUpdate::Unannounce);
305			}
306			// The embedded announce cancels with this unannounce; the consumer
307			// still needs the leading unannounce.
308			Some(PendingUpdate::UnannounceAnnounce(_)) => {
309				self.pending.insert(path, PendingUpdate::Unannounce);
310			}
311		}
312	}
313
314	/// Take one update to deliver to the consumer, if any.
315	fn take(&mut self) -> Option<OriginAnnounce> {
316		let path = self.pending.keys().next()?.clone();
317		match self.pending.remove(&path).unwrap() {
318			PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
319			PendingUpdate::Unannounce => Some((path, None)),
320			PendingUpdate::UnannounceAnnounce(broadcast) => {
321				// Deliver the unannounce now; leave the trailing announce pending so
322				// the next take returns it for the same path.
323				self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
324				Some((path, None))
325			}
326		}
327	}
328}
329
330#[derive(Clone)]
331struct OriginConsumerNotify {
332	root: PathOwned,
333	state: kio::Producer<OriginConsumerState>,
334}
335
336impl OriginConsumerNotify {
337	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
338		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
339		self.state
340			.write()
341			.ok()
342			.expect("consumer closed")
343			.apply_announce(path, broadcast);
344	}
345
346	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
347		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
348		let mut state = self.state.write().ok().expect("consumer closed");
349		state.apply_unannounce(path.clone());
350		state.apply_announce(path, broadcast);
351	}
352
353	fn unannounce(&self, path: impl AsPath) {
354		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
355		self.state.write().ok().expect("consumer closed").apply_unannounce(path);
356	}
357}
358
359struct NotifyNode {
360	parent: Option<Lock<NotifyNode>>,
361
362	// Consumers that are subscribed to this node.
363	// We store a consumer ID so we can remove it easily when it closes.
364	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
365}
366
367impl NotifyNode {
368	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
369		Self {
370			parent,
371			consumers: HashMap::new(),
372		}
373	}
374
375	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
376		for consumer in self.consumers.values() {
377			consumer.announce(path.as_path(), broadcast.clone());
378		}
379
380		if let Some(parent) = &self.parent {
381			parent.lock().announce(path, broadcast);
382		}
383	}
384
385	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
386		for consumer in self.consumers.values() {
387			consumer.reannounce(path.as_path(), broadcast.clone());
388		}
389
390		if let Some(parent) = &self.parent {
391			parent.lock().reannounce(path, broadcast);
392		}
393	}
394
395	fn unannounce(&mut self, path: impl AsPath) {
396		for consumer in self.consumers.values() {
397			consumer.unannounce(path.as_path());
398		}
399
400		if let Some(parent) = &self.parent {
401			parent.lock().unannounce(path);
402		}
403	}
404}
405
406struct OriginNode {
407	// The broadcast that is published to this node.
408	broadcast: Option<OriginBroadcast>,
409
410	// Nested nodes, one level down the tree.
411	nested: HashMap<String, Lock<OriginNode>>,
412
413	// Unfortunately, to notify consumers we need to traverse back up the tree.
414	notify: Lock<NotifyNode>,
415}
416
417impl OriginNode {
418	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
419		Self {
420			broadcast: None,
421			nested: HashMap::new(),
422			notify: Lock::new(NotifyNode::new(parent)),
423		}
424	}
425
426	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
427		let (dir, rest) = path.next_part().expect("leaf called with empty path");
428
429		let next = self.entry(dir);
430		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
431	}
432
433	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
434		match self.nested.get(dir) {
435			Some(next) => next.clone(),
436			None => {
437				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
438				self.nested.insert(dir.to_string(), next.clone());
439				next
440			}
441		}
442	}
443
444	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
445		let full = full.as_path();
446		let rest = relative.as_path();
447
448		// If the path has a directory component, then publish it to the nested node.
449		if let Some((dir, relative)) = rest.next_part() {
450			// Not using entry to avoid allocating a string most of the time.
451			self.entry(dir).lock().publish(&full, broadcast, &relative);
452		} else if let Some(existing) = &mut self.broadcast {
453			// This node is a leaf with an existing broadcast. Prefer the route with the
454			// lower ordering key (shorter hop chain, deterministic hash on ties), so every
455			// node converges on the same route regardless of the order announces arrive.
456			//
457			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
458			// backup queue can't accumulate clones of the active entry and trigger redundant
459			// reannouncements when a peer churns.
460			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
461				return;
462			}
463
464			if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
465				let old = existing.active.clone();
466				existing.active = broadcast.clone();
467				existing.backup.push_back(old);
468
469				self.notify.lock().reannounce(full, broadcast);
470			} else {
471				// Loses the ordering (longer path, or the tie-break): keep as a backup
472				// in case the active one drops.
473				existing.backup.push_back(broadcast.clone());
474			}
475		} else {
476			// This node is a leaf with no existing broadcast.
477			self.broadcast = Some(OriginBroadcast {
478				path: full.to_owned(),
479				active: broadcast.clone(),
480				backup: VecDeque::new(),
481			});
482			self.notify.lock().announce(full, broadcast);
483		}
484	}
485
486	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
487		self.consume_initial(&mut notify);
488		self.notify.lock().consumers.insert(id, notify);
489	}
490
491	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
492		if let Some(broadcast) = &self.broadcast {
493			notify.announce(&broadcast.path, broadcast.active.clone());
494		}
495
496		// Recursively subscribe to all nested nodes.
497		for nested in self.nested.values() {
498			nested.lock().consume_initial(notify);
499		}
500	}
501
502	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
503		let rest = rest.as_path();
504
505		if let Some((dir, rest)) = rest.next_part() {
506			let node = self.nested.get(dir)?.lock();
507			node.consume_broadcast(&rest)
508		} else {
509			self.broadcast.as_ref().map(|b| b.active.clone())
510		}
511	}
512
513	fn unconsume(&mut self, id: ConsumerId) {
514		self.notify.lock().consumers.remove(&id).expect("consumer not found");
515		if self.is_empty() {
516			//tracing::warn!("TODO: empty node; memory leak");
517			// This happens when consuming a path that is not being broadcasted.
518		}
519	}
520
521	// Returns true if the broadcast should be unannounced.
522	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
523		let full = full.as_path();
524		let relative = relative.as_path();
525
526		if let Some((dir, relative)) = relative.next_part() {
527			let nested = self.entry(dir);
528			let mut locked = nested.lock();
529			locked.remove(&full, broadcast, &relative);
530
531			if locked.is_empty() {
532				drop(locked);
533				self.nested.remove(dir);
534			}
535		} else {
536			let entry = match &mut self.broadcast {
537				Some(existing) => existing,
538				None => return,
539			};
540
541			// See if we can remove the broadcast from the backup list.
542			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
543			if let Some(pos) = pos {
544				entry.backup.remove(pos);
545				// Nothing else to do
546				return;
547			}
548
549			// Okay so it must be the active broadcast or else we fucked up.
550			assert!(entry.active.is_clone(&broadcast));
551
552			// Promote the backup with the lowest ordering key, the same rule used when
553			// publishing, so the route a node heals to still matches its peers.
554			let best = entry
555				.backup
556				.iter()
557				.enumerate()
558				.min_by_key(|(_, b)| route_key(&full, &b.hops))
559				.map(|(i, _)| i);
560			if let Some(idx) = best {
561				let active = entry.backup.remove(idx).expect("index in range");
562				entry.active = active;
563				self.notify.lock().reannounce(full, &entry.active);
564			} else {
565				// No more backups, so remove the entry.
566				self.broadcast = None;
567				self.notify.lock().unannounce(full);
568			}
569		}
570	}
571
572	fn is_empty(&self) -> bool {
573		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
574	}
575}
576
577#[derive(Clone)]
578struct OriginNodes {
579	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
580}
581
582impl OriginNodes {
583	// Returns nested roots that match the prefixes.
584	// PathPrefixes guarantees no duplicates or overlapping prefixes.
585	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
586		let mut roots = Vec::new();
587
588		for (root, state) in &self.nodes {
589			for prefix in prefixes {
590				if root.has_prefix(prefix) {
591					// Keep the existing node if we're allowed to access it.
592					roots.push((root.to_owned(), state.clone()));
593					continue;
594				}
595
596				if let Some(suffix) = prefix.strip_prefix(root) {
597					// If the requested prefix is larger than the allowed prefix, then we further scope it.
598					let nested = state.lock().leaf(&suffix);
599					roots.push((prefix.to_owned(), nested));
600				}
601			}
602		}
603
604		if roots.is_empty() {
605			None
606		} else {
607			Some(Self { nodes: roots })
608		}
609	}
610
611	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
612		let new_root = new_root.as_path();
613		let mut roots = Vec::new();
614
615		if new_root.is_empty() {
616			return Some(self.clone());
617		}
618
619		for (root, state) in &self.nodes {
620			if let Some(suffix) = root.strip_prefix(&new_root) {
621				// If the old root is longer than the new root, shorten the keys.
622				roots.push((suffix.to_owned(), state.clone()));
623			} else if let Some(suffix) = new_root.strip_prefix(root) {
624				// If the new root is longer than the old root, add a new root.
625				// NOTE: suffix can't be empty
626				let nested = state.lock().leaf(&suffix);
627				roots.push(("".into(), nested));
628			}
629		}
630
631		if roots.is_empty() {
632			None
633		} else {
634			Some(Self { nodes: roots })
635		}
636	}
637
638	// Returns the root that has this prefix.
639	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
640		let path = path.as_path();
641
642		for (root, state) in &self.nodes {
643			if let Some(suffix) = path.strip_prefix(root) {
644				return Some((state.clone(), suffix.to_owned()));
645			}
646		}
647
648		None
649	}
650}
651
652impl Default for OriginNodes {
653	fn default() -> Self {
654		Self {
655			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
656		}
657	}
658}
659
660/// A broadcast path and its associated consumer, or None if closed.
661pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
662
663/// Announces broadcasts to consumers over the network.
664#[derive(Clone)]
665pub struct OriginProducer {
666	// Identity for this origin. Appended to broadcast hops when
667	// re-announcing so downstream relays can detect loops and prefer the
668	// shortest path.
669	info: Origin,
670
671	// The roots of the tree that we are allowed to publish.
672	// A path of "" means we can publish anything.
673	nodes: OriginNodes,
674
675	// The prefix that is automatically stripped from all paths.
676	root: PathOwned,
677}
678
679impl std::ops::Deref for OriginProducer {
680	type Target = Origin;
681
682	fn deref(&self) -> &Self::Target {
683		&self.info
684	}
685}
686
687impl OriginProducer {
688	/// Build a producer for the given origin id with no scoped prefix and no
689	/// pre-existing broadcasts. Prefer [`Origin::produce`].
690	pub fn new(info: Origin) -> Self {
691		Self {
692			info,
693			nodes: OriginNodes::default(),
694			root: PathOwned::default(),
695		}
696	}
697
698	/// Create and publish a new broadcast, returning the producer.
699	///
700	/// This is a helper method when you only want to publish a broadcast to a single origin.
701	/// Returns [None] if the broadcast is not allowed to be published.
702	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
703		let broadcast = Broadcast::new().produce();
704		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
705	}
706
707	/// Publish a broadcast, announcing it to all consumers.
708	///
709	/// The broadcast will be unannounced when it is closed.
710	/// If there is already a broadcast with the same path, the new one replaces the active only
711	/// if it has a shorter hop path, or an equal-length path that wins a deterministic tie-break
712	/// (a hash of the broadcast name and hop chain); otherwise it is queued as a backup. The
713	/// tie-break is identical on every node, so a cluster converges on the same route.
714	/// When the active broadcast closes, the backup that wins the same ordering is promoted and
715	/// reannounced. Backups that close before being promoted are silently dropped.
716	///
717	/// Returns false if the broadcast is not allowed to be published.
718	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
719		let path = path.as_path();
720
721		// Loop detection: refuse broadcasts whose hop chain already contains our id.
722		if broadcast.hops.contains(&self.info) {
723			return false;
724		}
725
726		let (root, rest) = match self.nodes.get(&path) {
727			Some(root) => root,
728			None => return false,
729		};
730
731		let full = self.root.join(&path);
732
733		root.lock().publish(&full, &broadcast, &rest);
734		let root = root.clone();
735
736		web_async::spawn(async move {
737			broadcast.closed().await;
738			root.lock().remove(&full, broadcast, &rest);
739		});
740
741		true
742	}
743
744	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
745	///
746	/// Returns None if there are no legal prefixes (the requested prefixes are
747	/// disjoint from this producer's current scope).
748	// TODO accept PathPrefixes instead of &[Path]
749	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
750		let prefixes = PathPrefixes::new(prefixes);
751		Some(OriginProducer {
752			info: self.info,
753			nodes: self.nodes.select(&prefixes)?,
754			root: self.root.clone(),
755		})
756	}
757
758	/// Subscribe to all announced broadcasts.
759	pub fn consume(&self) -> OriginConsumer {
760		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
761	}
762
763	/// Get a broadcast by path if it has *already* been published.
764	///
765	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
766	/// announcement-cursor allocation, which is currently relatively expensive.
767	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
768	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
769		let path = path.as_path();
770		let (root, rest) = self.nodes.get(&path)?;
771		let state = root.lock();
772		state.consume_broadcast(&rest)
773	}
774
775	/// Returns a new OriginProducer that automatically strips out the provided prefix.
776	///
777	/// Returns None if the provided root is not authorized; when [`Self::scope`]
778	/// was already used without a wildcard.
779	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
780		let prefix = prefix.as_path();
781
782		Some(Self {
783			info: self.info,
784			root: self.root.join(&prefix).to_owned(),
785			nodes: self.nodes.root(&prefix)?,
786		})
787	}
788
789	/// Returns the root that is automatically stripped from all paths.
790	pub fn root(&self) -> &Path<'_> {
791		&self.root
792	}
793
794	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
795	// TODO return PathPrefixes
796	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
797		self.nodes.nodes.iter().map(|(root, _)| root)
798	}
799
800	/// Converts a relative path to an absolute path.
801	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
802		self.root.join(path)
803	}
804}
805
806/// Consumes announced broadcasts matching against an optional prefix.
807///
808/// NOTE: Clone is expensive, try to avoid it.
809pub struct OriginConsumer {
810	id: ConsumerId,
811	// Identity of the origin this consumer was derived from.
812	info: Origin,
813	nodes: OriginNodes,
814
815	// Pending updates queued for this consumer. Coalesced so a slow consumer
816	// can't accumulate redundant announce/unannounce pairs.
817	state: kio::Producer<OriginConsumerState>,
818
819	// A prefix that is automatically stripped from all paths.
820	root: PathOwned,
821}
822
823impl std::ops::Deref for OriginConsumer {
824	type Target = Origin;
825
826	fn deref(&self) -> &Self::Target {
827		&self.info
828	}
829}
830
831impl OriginConsumer {
832	fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
833		let state = kio::Producer::<OriginConsumerState>::default();
834		let id = ConsumerId::new();
835
836		for (_, node) in &nodes.nodes {
837			let notify = OriginConsumerNotify {
838				root: root.clone(),
839				state: state.clone(),
840			};
841			node.lock().consume(id, notify);
842		}
843
844		Self {
845			id,
846			info,
847			nodes,
848			state,
849			root,
850		}
851	}
852
853	/// Returns the next (un)announced broadcast and the absolute path.
854	///
855	/// The broadcast will only be announced if it was previously unannounced.
856	/// The same path won't be announced/unannounced twice, instead it will toggle.
857	/// Returns None if the consumer is closed.
858	///
859	/// Note: The returned path is absolute and will always match this consumer's prefix.
860	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
861		kio::wait(|waiter| self.poll_announced(waiter)).await
862	}
863
864	/// Poll for the next (un)announced broadcast, without blocking.
865	///
866	/// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the
867	/// consumer is closed, or `Poll::Pending` after registering `waiter` to be
868	/// notified when the next update arrives.
869	pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
870		match self.state.poll(waiter, |state| match state.take() {
871			Some(item) => Poll::Ready(item),
872			None => Poll::Pending,
873		}) {
874			Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
875			// Closed: discard the Ref so its MutexGuard doesn't escape this call.
876			Poll::Ready(Err(_)) => Poll::Ready(None),
877			Poll::Pending => Poll::Pending,
878		}
879	}
880
881	/// Returns the next (un)announced broadcast and the absolute path without blocking.
882	///
883	/// Returns None if there is no update available; NOT because the consumer is closed.
884	/// You have to use `is_closed` to check if the consumer is closed.
885	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
886		self.state.write().ok()?.take()
887	}
888
889	/// Create another consumer with its own announcement cursor over the same origin.
890	pub fn consume(&self) -> Self {
891		self.clone()
892	}
893
894	/// Get a broadcast by path if it has *already* been announced.
895	///
896	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
897	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
898	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
899	/// (blocks until announced) unless you can guarantee the announcement has already
900	/// landed (e.g. you're responding to an `announced()` callback).
901	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
902		let path = path.as_path();
903		let (root, rest) = self.nodes.get(&path)?;
904		let state = root.lock();
905		state.consume_broadcast(&rest)
906	}
907
908	/// Block until a broadcast with the given path is announced and return it.
909	///
910	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
911	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
912	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
913	///
914	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
915	/// cannot guarantee the announcement has already been received.
916	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
917		let path = path.as_path();
918
919		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
920		let mut consumer = self.scope(std::slice::from_ref(&path))?;
921
922		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
923		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
924		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
925		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
926			return None;
927		}
928
929		loop {
930			let (announced, broadcast) = consumer.announced().await?;
931			// `scope` narrows by prefix, but we only want an exact-path match.
932			if announced.as_path() == path {
933				if let Some(broadcast) = broadcast {
934					return Some(broadcast);
935				}
936			}
937		}
938	}
939
940	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
941	///
942	/// Returns None if there are no legal prefixes (the requested prefixes are
943	/// disjoint from this consumer's current scope, so it would always return None).
944	// TODO accept PathPrefixes instead of &[Path]
945	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
946		let prefixes = PathPrefixes::new(prefixes);
947		Some(OriginConsumer::new(
948			self.info,
949			self.root.clone(),
950			self.nodes.select(&prefixes)?,
951		))
952	}
953
954	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
955	///
956	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
957	/// already used without a wildcard.
958	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
959		let prefix = prefix.as_path();
960
961		Some(Self::new(
962			self.info,
963			self.root.join(&prefix).to_owned(),
964			self.nodes.root(&prefix)?,
965		))
966	}
967
968	/// Returns the prefix that is automatically stripped from all paths.
969	pub fn root(&self) -> &Path<'_> {
970		&self.root
971	}
972
973	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
974	// TODO return PathPrefixes
975	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
976		self.nodes.nodes.iter().map(|(root, _)| root)
977	}
978
979	/// Converts a relative path to an absolute path.
980	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
981		self.root.join(path)
982	}
983}
984
985impl Drop for OriginConsumer {
986	fn drop(&mut self) {
987		for (_, root) in &self.nodes.nodes {
988			root.lock().unconsume(self.id);
989		}
990	}
991}
992
993impl Clone for OriginConsumer {
994	fn clone(&self) -> Self {
995		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
996	}
997}
998
999#[cfg(test)]
1000use futures::FutureExt;
1001
1002#[cfg(test)]
1003impl OriginConsumer {
1004	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1005		let expected = expected.as_path();
1006		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1007		assert_eq!(path, expected, "wrong path");
1008		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1009	}
1010
1011	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1012		let expected = expected.as_path();
1013		let (path, active) = self.try_announced().expect("no next");
1014		assert_eq!(path, expected, "wrong path");
1015		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1016	}
1017
1018	pub fn assert_next_none(&mut self, expected: impl AsPath) {
1019		let expected = expected.as_path();
1020		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1021		assert_eq!(path, expected, "wrong path");
1022		assert!(active.is_none(), "should be unannounced");
1023	}
1024
1025	pub fn assert_next_wait(&mut self) {
1026		if let Some(res) = self.announced().now_or_never() {
1027			panic!("next should block: got {:?}", res.map(|(path, _)| path));
1028		}
1029	}
1030
1031	/*
1032	pub fn assert_next_closed(&mut self) {
1033		assert!(
1034			self.announced().now_or_never().expect("next blocked").is_none(),
1035			"next should be closed"
1036		);
1037	}
1038	*/
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043	use crate::Broadcast;
1044
1045	use super::*;
1046
1047	#[test]
1048	fn origin_list_push_fails_at_limit() {
1049		let mut list = OriginList::new();
1050		for _ in 0..MAX_HOPS {
1051			list.push(Origin::random()).unwrap();
1052		}
1053		assert_eq!(list.len(), MAX_HOPS);
1054		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1055	}
1056
1057	#[test]
1058	fn origin_list_try_from_vec_enforces_limit() {
1059		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1060		assert!(OriginList::try_from(under).is_ok());
1061
1062		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1063		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1064	}
1065
1066	#[tokio::test]
1067	async fn test_announce() {
1068		tokio::time::pause();
1069
1070		let origin = Origin::random().produce();
1071		let broadcast1 = Broadcast::new().produce();
1072		let broadcast2 = Broadcast::new().produce();
1073
1074		let mut consumer1 = origin.consume();
1075		// Make a new consumer that should get it.
1076		consumer1.assert_next_wait();
1077
1078		// Publish the first broadcast.
1079		origin.publish_broadcast("test1", broadcast1.consume());
1080
1081		consumer1.assert_next("test1", &broadcast1.consume());
1082		consumer1.assert_next_wait();
1083
1084		// Make a new consumer that should get the existing broadcast.
1085		// But we don't consume it yet.
1086		let mut consumer2 = origin.consume();
1087
1088		// Publish the second broadcast.
1089		origin.publish_broadcast("test2", broadcast2.consume());
1090
1091		consumer1.assert_next("test2", &broadcast2.consume());
1092		consumer1.assert_next_wait();
1093
1094		consumer2.assert_next("test1", &broadcast1.consume());
1095		consumer2.assert_next("test2", &broadcast2.consume());
1096		consumer2.assert_next_wait();
1097
1098		// Close the first broadcast.
1099		drop(broadcast1);
1100
1101		// Wait for the async task to run.
1102		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1103
1104		// All consumers should get a None now.
1105		consumer1.assert_next_none("test1");
1106		consumer2.assert_next_none("test1");
1107		consumer1.assert_next_wait();
1108		consumer2.assert_next_wait();
1109
1110		// And a new consumer only gets the last broadcast.
1111		let mut consumer3 = origin.consume();
1112		consumer3.assert_next("test2", &broadcast2.consume());
1113		consumer3.assert_next_wait();
1114
1115		// Close the other producer and make sure it cleans up
1116		drop(broadcast2);
1117
1118		// Wait for the async task to run.
1119		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1120
1121		consumer1.assert_next_none("test2");
1122		consumer2.assert_next_none("test2");
1123		consumer3.assert_next_none("test2");
1124
1125		/* TODO close the origin consumer when the producer is dropped
1126		consumer1.assert_next_closed();
1127		consumer2.assert_next_closed();
1128		consumer3.assert_next_closed();
1129		*/
1130	}
1131
1132	#[tokio::test]
1133	async fn test_duplicate() {
1134		tokio::time::pause();
1135
1136		let origin = Origin::random().produce();
1137
1138		let broadcast1 = Broadcast::new().produce();
1139		let broadcast2 = Broadcast::new().produce();
1140		let broadcast3 = Broadcast::new().produce();
1141
1142		let consumer1 = broadcast1.consume();
1143		let consumer2 = broadcast2.consume();
1144		let consumer3 = broadcast3.consume();
1145
1146		let mut consumer = origin.consume();
1147
1148		origin.publish_broadcast("test", consumer1.clone());
1149		origin.publish_broadcast("test", consumer2.clone());
1150		origin.publish_broadcast("test", consumer3.clone());
1151		assert!(consumer.get_broadcast("test").is_some());
1152
1153		// Identical (empty) hop chains tie on the deterministic key, so the first publish
1154		// stays active and the rest queue as backups. No churn, no reannounce.
1155		consumer.assert_next("test", &consumer1);
1156		consumer.assert_next_wait();
1157
1158		// Drop a backup, nothing should change.
1159		drop(broadcast2);
1160
1161		// Wait for the async task to run.
1162		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1163
1164		assert!(consumer.get_broadcast("test").is_some());
1165		consumer.assert_next_wait();
1166
1167		// Drop the active, we should reannounce with the remaining backup.
1168		drop(broadcast1);
1169
1170		// Wait for the async task to run.
1171		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1172
1173		assert!(consumer.get_broadcast("test").is_some());
1174		consumer.assert_next_none("test");
1175		consumer.assert_next("test", &consumer3);
1176
1177		// Drop the final broadcast, we should unannounce.
1178		drop(broadcast3);
1179
1180		// Wait for the async task to run.
1181		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1182		assert!(consumer.get_broadcast("test").is_none());
1183
1184		consumer.assert_next_none("test");
1185		consumer.assert_next_wait();
1186	}
1187
1188	#[tokio::test]
1189	async fn test_duplicate_reverse() {
1190		tokio::time::pause();
1191
1192		let origin = Origin::random().produce();
1193		let broadcast1 = Broadcast::new().produce();
1194		let broadcast2 = Broadcast::new().produce();
1195
1196		origin.publish_broadcast("test", broadcast1.consume());
1197		origin.publish_broadcast("test", broadcast2.consume());
1198		assert!(origin.consume().get_broadcast("test").is_some());
1199
1200		// This is harder, dropping the new broadcast first.
1201		drop(broadcast2);
1202
1203		// Wait for the cleanup async task to run.
1204		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1205		assert!(origin.consume().get_broadcast("test").is_some());
1206
1207		drop(broadcast1);
1208
1209		// Wait for the cleanup async task to run.
1210		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1211		assert!(origin.consume().get_broadcast("test").is_none());
1212	}
1213
1214	#[tokio::test]
1215	async fn test_deterministic_tiebreak() {
1216		tokio::time::pause();
1217
1218		// Build a broadcast carrying a specific hop chain.
1219		fn route(ids: &[u64]) -> BroadcastProducer {
1220			let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1221			Broadcast { hops }.produce()
1222		}
1223
1224		// Resolve the active route for "test" after publishing both routes in the given order.
1225		fn winner(first: &[u64], second: &[u64]) -> OriginList {
1226			let origin = Origin::random().produce();
1227			let a = route(first);
1228			let b = route(second);
1229			origin.publish_broadcast("test", a.consume());
1230			origin.publish_broadcast("test", b.consume());
1231			let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1232			// Keep the producers alive until after we read the active route.
1233			drop((a, b));
1234			hops
1235		}
1236
1237		// Two routes with equal hop counts but distinct chains. The winner is decided by
1238		// the deterministic key, not arrival order, so both publish orders converge.
1239		let forward = winner(&[10, 20], &[30, 40]);
1240		let reverse = winner(&[30, 40], &[10, 20]);
1241		assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1242
1243		// A strictly shorter chain always wins regardless of the hash.
1244		assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1245		assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1246	}
1247
1248	#[tokio::test]
1249	async fn test_double_publish() {
1250		tokio::time::pause();
1251
1252		let origin = Origin::random().produce();
1253		let broadcast = Broadcast::new().produce();
1254
1255		// Ensure it doesn't crash.
1256		origin.publish_broadcast("test", broadcast.consume());
1257		origin.publish_broadcast("test", broadcast.consume());
1258
1259		assert!(origin.consume().get_broadcast("test").is_some());
1260
1261		drop(broadcast);
1262
1263		// Wait for the async task to run.
1264		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1265		assert!(origin.consume().get_broadcast("test").is_none());
1266	}
1267	// A previous mpsc-based implementation could only deliver the first 127 broadcasts
1268	// instantly via `assert_next` (which uses `now_or_never`). The kio-backed
1269	// implementation polls synchronously and can deliver all of them without yielding.
1270	// Names are zero-padded so lexicographic delivery order matches the loop index.
1271	#[tokio::test]
1272	async fn test_many_announces() {
1273		let origin = Origin::random().produce();
1274		let broadcast = Broadcast::new().produce();
1275
1276		let mut consumer = origin.consume();
1277		for i in 0..256 {
1278			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1279		}
1280
1281		for i in 0..256 {
1282			consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1283		}
1284		consumer.assert_next_wait();
1285	}
1286
1287	#[tokio::test]
1288	async fn test_many_announces_try() {
1289		let origin = Origin::random().produce();
1290		let broadcast = Broadcast::new().produce();
1291
1292		let mut consumer = origin.consume();
1293		for i in 0..256 {
1294			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1295		}
1296
1297		for i in 0..256 {
1298			consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1299		}
1300	}
1301
1302	#[tokio::test]
1303	async fn test_with_root_basic() {
1304		let origin = Origin::random().produce();
1305		let broadcast = Broadcast::new().produce();
1306
1307		// Create a producer with root "/foo"
1308		let foo_producer = origin.with_root("foo").expect("should create root");
1309		assert_eq!(foo_producer.root().as_str(), "foo");
1310
1311		let mut consumer = origin.consume();
1312
1313		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1314		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1315		// The original consumer should see the full path
1316		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1317
1318		// A consumer created from the rooted producer should see the stripped path
1319		let mut foo_consumer = foo_producer.consume();
1320		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1321	}
1322
1323	#[tokio::test]
1324	async fn test_with_root_nested() {
1325		let origin = Origin::random().produce();
1326		let broadcast = Broadcast::new().produce();
1327
1328		// Create nested roots
1329		let foo_producer = origin.with_root("foo").expect("should create foo root");
1330		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1331		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1332
1333		let mut consumer = origin.consume();
1334
1335		// Publishing to "baz" should actually publish to "foo/bar/baz"
1336		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1337		// The original consumer sees the full path
1338		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1339
1340		// Consumer from foo_bar_producer sees just "baz"
1341		let mut foo_bar_consumer = foo_bar_producer.consume();
1342		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1343	}
1344
1345	#[tokio::test]
1346	async fn test_publish_scope_allows() {
1347		let origin = Origin::random().produce();
1348		let broadcast = Broadcast::new().produce();
1349
1350		// Create a producer that can only publish to "allowed" paths
1351		let limited_producer = origin
1352			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1353			.expect("should create limited producer");
1354
1355		// Should be able to publish to allowed paths
1356		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1357		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1358		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1359
1360		// Should not be able to publish to disallowed paths
1361		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1362		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1363		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1364	}
1365
1366	#[tokio::test]
1367	async fn test_publish_scope_empty() {
1368		let origin = Origin::random().produce();
1369
1370		// Creating a producer with no allowed paths should return None
1371		assert!(origin.scope(&[]).is_none());
1372	}
1373
1374	#[tokio::test]
1375	async fn test_consume_scope_filters() {
1376		let origin = Origin::random().produce();
1377		let broadcast1 = Broadcast::new().produce();
1378		let broadcast2 = Broadcast::new().produce();
1379		let broadcast3 = Broadcast::new().produce();
1380
1381		let mut consumer = origin.consume();
1382
1383		// Publish to different paths
1384		origin.publish_broadcast("allowed", broadcast1.consume());
1385		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1386		origin.publish_broadcast("notallowed", broadcast3.consume());
1387
1388		// Create a consumer that only sees "allowed" paths
1389		let mut limited_consumer = origin
1390			.consume()
1391			.scope(&["allowed".into()])
1392			.expect("should create limited consumer");
1393
1394		// Should only receive broadcasts under "allowed"
1395		limited_consumer.assert_next("allowed", &broadcast1.consume());
1396		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1397		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1398
1399		// Unscoped consumer should see all
1400		consumer.assert_next("allowed", &broadcast1.consume());
1401		consumer.assert_next("allowed/nested", &broadcast2.consume());
1402		consumer.assert_next("notallowed", &broadcast3.consume());
1403	}
1404
1405	#[tokio::test]
1406	async fn test_consume_scope_multiple_prefixes() {
1407		let origin = Origin::random().produce();
1408		let broadcast1 = Broadcast::new().produce();
1409		let broadcast2 = Broadcast::new().produce();
1410		let broadcast3 = Broadcast::new().produce();
1411
1412		origin.publish_broadcast("foo/test", broadcast1.consume());
1413		origin.publish_broadcast("bar/test", broadcast2.consume());
1414		origin.publish_broadcast("baz/test", broadcast3.consume());
1415
1416		// Consumer that only sees "foo" and "bar" paths
1417		let mut limited_consumer = origin
1418			.consume()
1419			.scope(&["foo".into(), "bar".into()])
1420			.expect("should create limited consumer");
1421
1422		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1423		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1424		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1425		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1426	}
1427
1428	#[tokio::test]
1429	async fn test_with_root_and_publish_scope() {
1430		let origin = Origin::random().produce();
1431		let broadcast = Broadcast::new().produce();
1432
1433		// User connects to /foo root
1434		let foo_producer = origin.with_root("foo").expect("should create foo root");
1435
1436		// Limit them to publish only to "bar" and "goop/pee" within /foo
1437		let limited_producer = foo_producer
1438			.scope(&["bar".into(), "goop/pee".into()])
1439			.expect("should create limited producer");
1440
1441		let mut consumer = origin.consume();
1442
1443		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1444		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1445		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1446		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1447		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1448
1449		// Should not be able to publish outside allowed paths
1450		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1451		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1452		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1453
1454		// Original consumer sees full paths
1455		consumer.assert_next("foo/bar", &broadcast.consume());
1456		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1457		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1458		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1459	}
1460
1461	#[tokio::test]
1462	async fn test_with_root_and_consume_scope() {
1463		let origin = Origin::random().produce();
1464		let broadcast1 = Broadcast::new().produce();
1465		let broadcast2 = Broadcast::new().produce();
1466		let broadcast3 = Broadcast::new().produce();
1467
1468		// Publish broadcasts
1469		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1470		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1471		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1472
1473		// User connects to /foo root
1474		let foo_producer = origin.with_root("foo").expect("should create foo root");
1475
1476		// Create consumer limited to "bar" and "goop/pee" within /foo
1477		let mut limited_consumer = foo_producer
1478			.consume()
1479			.scope(&["bar".into(), "goop/pee".into()])
1480			.expect("should create limited consumer");
1481
1482		// Should only see allowed paths (without foo prefix)
1483		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1484		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1485		limited_consumer.assert_next_wait(); // Should not see "other/test"
1486	}
1487
1488	#[tokio::test]
1489	async fn test_with_root_unauthorized() {
1490		let origin = Origin::random().produce();
1491
1492		// First limit the producer to specific paths
1493		let limited_producer = origin
1494			.scope(&["allowed".into()])
1495			.expect("should create limited producer");
1496
1497		// Trying to create a root outside allowed paths should fail
1498		assert!(limited_producer.with_root("notallowed").is_none());
1499
1500		// But creating a root within allowed paths should work
1501		let allowed_root = limited_producer
1502			.with_root("allowed")
1503			.expect("should create allowed root");
1504		assert_eq!(allowed_root.root().as_str(), "allowed");
1505	}
1506
1507	#[tokio::test]
1508	async fn test_wildcard_permission() {
1509		let origin = Origin::random().produce();
1510		let broadcast = Broadcast::new().produce();
1511
1512		// Producer with root access (empty string means wildcard)
1513		let root_producer = origin.clone();
1514
1515		// Should be able to publish anywhere
1516		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1517		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1518
1519		// Can create any root
1520		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1521		assert_eq!(foo_producer.root().as_str(), "foo");
1522	}
1523
1524	#[tokio::test]
1525	async fn test_consume_broadcast_with_permissions() {
1526		let origin = Origin::random().produce();
1527		let broadcast1 = Broadcast::new().produce();
1528		let broadcast2 = Broadcast::new().produce();
1529
1530		origin.publish_broadcast("allowed/test", broadcast1.consume());
1531		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1532
1533		// Create limited consumer
1534		let limited_consumer = origin
1535			.consume()
1536			.scope(&["allowed".into()])
1537			.expect("should create limited consumer");
1538
1539		// Should be able to get allowed broadcast
1540		let result = limited_consumer.get_broadcast("allowed/test");
1541		assert!(result.is_some());
1542		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1543
1544		// Should not be able to get disallowed broadcast
1545		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1546
1547		// Original consumer can get both
1548		let consumer = origin.consume();
1549		assert!(consumer.get_broadcast("allowed/test").is_some());
1550		assert!(consumer.get_broadcast("notallowed/test").is_some());
1551	}
1552
1553	#[tokio::test]
1554	async fn test_nested_paths_with_permissions() {
1555		let origin = Origin::random().produce();
1556		let broadcast = Broadcast::new().produce();
1557
1558		// Create producer limited to "a/b/c"
1559		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1560
1561		// Should be able to publish to exact path and nested paths
1562		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1563		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1564		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1565
1566		// Should not be able to publish to parent or sibling paths
1567		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1568		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1569		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1570	}
1571
1572	#[tokio::test]
1573	async fn test_multiple_consumers_with_different_permissions() {
1574		let origin = Origin::random().produce();
1575		let broadcast1 = Broadcast::new().produce();
1576		let broadcast2 = Broadcast::new().produce();
1577		let broadcast3 = Broadcast::new().produce();
1578
1579		// Publish to different paths
1580		origin.publish_broadcast("foo/test", broadcast1.consume());
1581		origin.publish_broadcast("bar/test", broadcast2.consume());
1582		origin.publish_broadcast("baz/test", broadcast3.consume());
1583
1584		// Create consumers with different permissions
1585		let mut foo_consumer = origin
1586			.consume()
1587			.scope(&["foo".into()])
1588			.expect("should create foo consumer");
1589
1590		let mut bar_consumer = origin
1591			.consume()
1592			.scope(&["bar".into()])
1593			.expect("should create bar consumer");
1594
1595		let mut foobar_consumer = origin
1596			.consume()
1597			.scope(&["foo".into(), "bar".into()])
1598			.expect("should create foobar consumer");
1599
1600		// Each consumer should only see their allowed paths
1601		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1602		foo_consumer.assert_next_wait();
1603
1604		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1605		bar_consumer.assert_next_wait();
1606
1607		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1608		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1609		foobar_consumer.assert_next_wait();
1610	}
1611
1612	#[tokio::test]
1613	async fn test_select_with_empty_prefix() {
1614		let origin = Origin::random().produce();
1615		let broadcast1 = Broadcast::new().produce();
1616		let broadcast2 = Broadcast::new().produce();
1617
1618		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1619		let demo_producer = origin.with_root("demo").expect("should create demo root");
1620		let limited_producer = demo_producer
1621			.scope(&["worm-node".into(), "foobar".into()])
1622			.expect("should create limited producer");
1623
1624		// Publish some broadcasts
1625		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1626		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1627
1628		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1629		let mut consumer = limited_producer
1630			.consume()
1631			.scope(&["".into()])
1632			.expect("should create consumer with empty prefix");
1633
1634		// Should see both broadcasts (order depends on PathPrefixes sort)
1635		let a1 = consumer.try_announced().expect("expected first announcement");
1636		let a2 = consumer.try_announced().expect("expected second announcement");
1637		consumer.assert_next_wait();
1638
1639		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1640		paths.sort();
1641		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1642	}
1643
1644	#[tokio::test]
1645	async fn test_select_narrowing_scope() {
1646		let origin = Origin::random().produce();
1647		let broadcast1 = Broadcast::new().produce();
1648		let broadcast2 = Broadcast::new().produce();
1649		let broadcast3 = Broadcast::new().produce();
1650
1651		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1652		let demo_producer = origin.with_root("demo").expect("should create demo root");
1653		let limited_producer = demo_producer
1654			.scope(&["worm-node".into(), "foobar".into()])
1655			.expect("should create limited producer");
1656
1657		// Publish broadcasts at different levels
1658		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1659		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1660		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1661
1662		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1663		let mut worm_consumer = limited_producer
1664			.consume()
1665			.scope(&["worm-node".into()])
1666			.expect("should create worm-node consumer");
1667
1668		// Should see worm-node content with paths stripped to ""
1669		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1670		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1671		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1672
1673		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1674		let mut foo_consumer = limited_producer
1675			.consume()
1676			.scope(&["worm-node/foo".into()])
1677			.expect("should create worm-node/foo consumer");
1678
1679		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1680		foo_consumer.assert_next_wait(); // Should NOT see other content
1681	}
1682
1683	#[tokio::test]
1684	async fn test_select_multiple_roots_with_empty_prefix() {
1685		let origin = Origin::random().produce();
1686		let broadcast1 = Broadcast::new().produce();
1687		let broadcast2 = Broadcast::new().produce();
1688		let broadcast3 = Broadcast::new().produce();
1689
1690		// Producer with multiple allowed roots
1691		let limited_producer = origin
1692			.scope(&["app1".into(), "app2".into(), "shared".into()])
1693			.expect("should create limited producer");
1694
1695		// Publish to each root
1696		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1697		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1698		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1699
1700		// scope with empty prefix should maintain all roots
1701		let mut consumer = limited_producer
1702			.consume()
1703			.scope(&["".into()])
1704			.expect("should create consumer with empty prefix");
1705
1706		// Should see all broadcasts from all roots
1707		consumer.assert_next("app1/data", &broadcast1.consume());
1708		consumer.assert_next("app2/config", &broadcast2.consume());
1709		consumer.assert_next("shared/resource", &broadcast3.consume());
1710		consumer.assert_next_wait();
1711	}
1712
1713	#[tokio::test]
1714	async fn test_publish_scope_with_empty_prefix() {
1715		let origin = Origin::random().produce();
1716		let broadcast = Broadcast::new().produce();
1717
1718		// Producer with specific allowed paths
1719		let limited_producer = origin
1720			.scope(&["services/api".into(), "services/web".into()])
1721			.expect("should create limited producer");
1722
1723		// scope with empty prefix should keep the same restrictions
1724		let same_producer = limited_producer
1725			.scope(&["".into()])
1726			.expect("should create producer with empty prefix");
1727
1728		// Should still have the same publishing restrictions
1729		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1730		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1731		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1732		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1733	}
1734
1735	#[tokio::test]
1736	async fn test_select_narrowing_to_deeper_path() {
1737		let origin = Origin::random().produce();
1738		let broadcast1 = Broadcast::new().produce();
1739		let broadcast2 = Broadcast::new().produce();
1740		let broadcast3 = Broadcast::new().produce();
1741
1742		// Producer with broad permission
1743		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1744
1745		// Publish at various depths
1746		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1747		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1748		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1749
1750		// Narrow down to team2 only
1751		let mut team2_consumer = limited_producer
1752			.consume()
1753			.scope(&["org/team2".into()])
1754			.expect("should create team2 consumer");
1755
1756		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1757		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1758
1759		// Further narrow down to team1/project1
1760		let mut project1_consumer = limited_producer
1761			.consume()
1762			.scope(&["org/team1/project1".into()])
1763			.expect("should create project1 consumer");
1764
1765		// Should only see project1 content at root
1766		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1767		project1_consumer.assert_next_wait();
1768	}
1769
1770	#[tokio::test]
1771	async fn test_select_with_non_matching_prefix() {
1772		let origin = Origin::random().produce();
1773
1774		// Producer with specific allowed paths
1775		let limited_producer = origin
1776			.scope(&["allowed/path".into()])
1777			.expect("should create limited producer");
1778
1779		// Trying to scope with a completely different prefix should return None
1780		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1781
1782		// Similarly for scope
1783		assert!(limited_producer.scope(&["other/path".into()]).is_none());
1784	}
1785
1786	// Regression test for https://github.com/moq-dev/moq/issues/910
1787	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1788	#[tokio::test]
1789	async fn test_with_root_trailing_slash_consumer() {
1790		let origin = Origin::random().produce();
1791
1792		// Use an owned String so the trailing slash is NOT normalized away.
1793		let prefix = "some_prefix/".to_string();
1794		let mut consumer = origin.consume().with_root(prefix).unwrap();
1795
1796		let b = origin.create_broadcast("some_prefix/test").unwrap();
1797		consumer.assert_next("test", &b.consume());
1798	}
1799
1800	// Same issue but for the producer side of with_root
1801	#[tokio::test]
1802	async fn test_with_root_trailing_slash_producer() {
1803		let origin = Origin::random().produce();
1804
1805		// Use an owned String so the trailing slash is NOT normalized away.
1806		let prefix = "some_prefix/".to_string();
1807		let rooted = origin.with_root(prefix).unwrap();
1808
1809		let b = rooted.create_broadcast("test").unwrap();
1810
1811		let mut consumer = rooted.consume();
1812		consumer.assert_next("test", &b.consume());
1813	}
1814
1815	// Verify unannounce also doesn't panic with trailing slash
1816	#[tokio::test]
1817	async fn test_with_root_trailing_slash_unannounce() {
1818		tokio::time::pause();
1819
1820		let origin = Origin::random().produce();
1821
1822		let prefix = "some_prefix/".to_string();
1823		let mut consumer = origin.consume().with_root(prefix).unwrap();
1824
1825		let b = origin.create_broadcast("some_prefix/test").unwrap();
1826		consumer.assert_next("test", &b.consume());
1827
1828		// Drop the broadcast producer to trigger unannounce
1829		drop(b);
1830		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1831
1832		// unannounce also calls strip_prefix(&self.root).unwrap()
1833		consumer.assert_next_none("test");
1834	}
1835
1836	#[tokio::test]
1837	async fn test_select_maintains_access_with_wider_prefix() {
1838		let origin = Origin::random().produce();
1839		let broadcast1 = Broadcast::new().produce();
1840		let broadcast2 = Broadcast::new().produce();
1841
1842		// Setup: user with root "demo" allowed to subscribe to specific paths
1843		let demo_producer = origin.with_root("demo").expect("should create demo root");
1844		let user_producer = demo_producer
1845			.scope(&["worm-node".into(), "foobar".into()])
1846			.expect("should create user producer");
1847
1848		// Publish some data
1849		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1850		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1851
1852		// Key test: scope with "" should maintain access to allowed roots
1853		let mut consumer = user_producer
1854			.consume()
1855			.scope(&["".into()])
1856			.expect("scope with empty prefix should not fail when user has specific permissions");
1857
1858		// Should still receive broadcasts from allowed paths (order not guaranteed)
1859		let a1 = consumer.try_announced().expect("expected first announcement");
1860		let a2 = consumer.try_announced().expect("expected second announcement");
1861		consumer.assert_next_wait();
1862
1863		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1864		paths.sort();
1865		assert_eq!(paths, ["foobar", "worm-node/data"]);
1866
1867		// Also test that we can still narrow the scope
1868		let mut narrow_consumer = user_producer
1869			.consume()
1870			.scope(&["worm-node".into()])
1871			.expect("should be able to narrow scope to worm-node");
1872
1873		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1874		narrow_consumer.assert_next_wait(); // Should not see foobar
1875	}
1876
1877	#[tokio::test]
1878	async fn test_duplicate_prefixes_deduped() {
1879		let origin = Origin::random().produce();
1880		let broadcast = Broadcast::new().produce();
1881
1882		// scope with duplicate prefixes should work (deduped internally)
1883		let producer = origin
1884			.scope(&["demo".into(), "demo".into()])
1885			.expect("should create producer");
1886
1887		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1888
1889		let mut consumer = producer.consume();
1890		consumer.assert_next("demo/stream", &broadcast.consume());
1891		consumer.assert_next_wait();
1892	}
1893
1894	#[tokio::test]
1895	async fn test_overlapping_prefixes_deduped() {
1896		let origin = Origin::random().produce();
1897		let broadcast = Broadcast::new().produce();
1898
1899		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1900		let producer = origin
1901			.scope(&["demo".into(), "demo/foo".into()])
1902			.expect("should create producer");
1903
1904		// Can still publish under "demo/bar" since "demo" covers everything
1905		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1906
1907		let mut consumer = producer.consume();
1908		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1909		consumer.assert_next_wait();
1910	}
1911
1912	#[tokio::test]
1913	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1914		let origin = Origin::random().produce();
1915		let broadcast = Broadcast::new().produce();
1916
1917		// Both "demo" and "demo/foo" are requested — should only have one node
1918		let producer = origin
1919			.scope(&["demo".into(), "demo/foo".into()])
1920			.expect("should create producer");
1921
1922		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1923
1924		let mut consumer = producer.consume();
1925		// Should only get ONE announcement (not two from overlapping nodes)
1926		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1927		consumer.assert_next_wait();
1928	}
1929
1930	#[tokio::test]
1931	async fn test_allowed_returns_deduped_prefixes() {
1932		let origin = Origin::random().produce();
1933
1934		let producer = origin
1935			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1936			.expect("should create producer");
1937
1938		let allowed: Vec<_> = producer.allowed().collect();
1939		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1940	}
1941
1942	#[tokio::test]
1943	async fn test_announced_broadcast_already_announced() {
1944		let origin = Origin::random().produce();
1945		let broadcast = Broadcast::new().produce();
1946
1947		origin.publish_broadcast("test", broadcast.consume());
1948
1949		let consumer = origin.consume();
1950		let result = consumer.announced_broadcast("test").await.expect("should find it");
1951		assert!(result.is_clone(&broadcast.consume()));
1952	}
1953
1954	#[tokio::test]
1955	async fn test_announced_broadcast_delayed() {
1956		tokio::time::pause();
1957
1958		let origin = Origin::random().produce();
1959		let broadcast = Broadcast::new().produce();
1960
1961		let consumer = origin.consume();
1962
1963		// Start waiting before it's announced.
1964		let wait = tokio::spawn({
1965			let consumer = consumer.clone();
1966			async move { consumer.announced_broadcast("test").await }
1967		});
1968
1969		// Give the spawned task a chance to subscribe.
1970		tokio::task::yield_now().await;
1971
1972		origin.publish_broadcast("test", broadcast.consume());
1973
1974		let result = wait.await.unwrap().expect("should find it");
1975		assert!(result.is_clone(&broadcast.consume()));
1976	}
1977
1978	#[tokio::test]
1979	async fn test_announced_broadcast_ignores_unrelated_paths() {
1980		tokio::time::pause();
1981
1982		let origin = Origin::random().produce();
1983		let other = Broadcast::new().produce();
1984		let target = Broadcast::new().produce();
1985
1986		let consumer = origin.consume();
1987
1988		let wait = tokio::spawn({
1989			let consumer = consumer.clone();
1990			async move { consumer.announced_broadcast("target").await }
1991		});
1992
1993		tokio::task::yield_now().await;
1994
1995		// Publish an unrelated broadcast first — announced_broadcast should skip it.
1996		origin.publish_broadcast("other", other.consume());
1997		tokio::task::yield_now().await;
1998		assert!(!wait.is_finished(), "must not resolve on unrelated path");
1999
2000		origin.publish_broadcast("target", target.consume());
2001		let result = wait.await.unwrap().expect("should find target");
2002		assert!(result.is_clone(&target.consume()));
2003	}
2004
2005	#[tokio::test]
2006	async fn test_announced_broadcast_skips_nested_paths() {
2007		tokio::time::pause();
2008
2009		let origin = Origin::random().produce();
2010		let nested = Broadcast::new().produce();
2011		let exact = Broadcast::new().produce();
2012
2013		let consumer = origin.consume();
2014
2015		let wait = tokio::spawn({
2016			let consumer = consumer.clone();
2017			async move { consumer.announced_broadcast("foo").await }
2018		});
2019
2020		tokio::task::yield_now().await;
2021
2022		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
2023		origin.publish_broadcast("foo/bar", nested.consume());
2024		tokio::task::yield_now().await;
2025		assert!(!wait.is_finished(), "must not resolve on a nested path");
2026
2027		origin.publish_broadcast("foo", exact.consume());
2028		let result = wait.await.unwrap().expect("should find foo exactly");
2029		assert!(result.is_clone(&exact.consume()));
2030	}
2031
2032	#[tokio::test]
2033	async fn test_announced_broadcast_disallowed() {
2034		let origin = Origin::random().produce();
2035		let limited = origin
2036			.consume()
2037			.scope(&["allowed".into()])
2038			.expect("should create limited");
2039
2040		// Path is outside allowed prefixes — should return None immediately.
2041		assert!(limited.announced_broadcast("notallowed").await.is_none());
2042	}
2043
2044	#[tokio::test]
2045	async fn test_announced_broadcast_scope_too_narrow() {
2046		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
2047		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
2048		let origin = Origin::random().produce();
2049		let limited = origin
2050			.consume()
2051			.scope(&["foo/specific".into()])
2052			.expect("should create limited");
2053
2054		// now_or_never so we fail fast instead of hanging if the guard regresses.
2055		let result = limited
2056			.announced_broadcast("foo")
2057			.now_or_never()
2058			.expect("must not block");
2059		assert!(result.is_none());
2060	}
2061
2062	// Coalescing tests: a slow consumer that doesn't drain between updates
2063	// should observe a bounded number of deliveries.
2064
2065	#[tokio::test]
2066	async fn test_coalesce_announce_then_unannounce() {
2067		// announce + unannounce that the consumer hasn't observed yet collapses to nothing.
2068		tokio::time::pause();
2069
2070		let origin = Origin::random().produce();
2071		let mut consumer = origin.consume();
2072
2073		let broadcast = Broadcast::new().produce();
2074		origin.publish_broadcast("test", broadcast.consume());
2075		drop(broadcast);
2076
2077		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2078
2079		consumer.assert_next_wait();
2080	}
2081
2082	#[tokio::test]
2083	async fn test_coalesce_announce_unannounce_announce() {
2084		// announce, unannounce, announce that the consumer hasn't drained collapses
2085		// to a single Announce of the latest broadcast.
2086		tokio::time::pause();
2087
2088		let origin = Origin::random().produce();
2089		let mut consumer = origin.consume();
2090
2091		let broadcast1 = Broadcast::new().produce();
2092		let broadcast2 = Broadcast::new().produce();
2093
2094		origin.publish_broadcast("test", broadcast1.consume());
2095		drop(broadcast1);
2096		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2097		origin.publish_broadcast("test", broadcast2.consume());
2098
2099		consumer.assert_next("test", &broadcast2.consume());
2100		consumer.assert_next_wait();
2101	}
2102
2103	#[tokio::test]
2104	async fn test_coalesce_unannounce_announce_preserved() {
2105		// unannounce followed by announce of a different broadcast must be preserved
2106		// as two deliveries so the consumer learns the origin changed.
2107		tokio::time::pause();
2108
2109		let origin = Origin::random().produce();
2110		let broadcast1 = Broadcast::new().produce();
2111		origin.publish_broadcast("test", broadcast1.consume());
2112
2113		let mut consumer = origin.consume();
2114		consumer.assert_next("test", &broadcast1.consume());
2115
2116		// Drop, then publish a fresh broadcast at the same path.
2117		drop(broadcast1);
2118		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2119
2120		let broadcast2 = Broadcast::new().produce();
2121		origin.publish_broadcast("test", broadcast2.consume());
2122
2123		// The consumer must see the unannounce before the new announce.
2124		consumer.assert_next_none("test");
2125		consumer.assert_next("test", &broadcast2.consume());
2126		consumer.assert_next_wait();
2127	}
2128
2129	#[tokio::test]
2130	async fn test_coalesce_unannounce_announce_unannounce() {
2131		// unannounce + announce + unannounce collapses to a single unannounce: the
2132		// embedded announce was never observed.
2133		tokio::time::pause();
2134
2135		let origin = Origin::random().produce();
2136		let broadcast1 = Broadcast::new().produce();
2137		origin.publish_broadcast("test", broadcast1.consume());
2138
2139		let mut consumer = origin.consume();
2140		consumer.assert_next("test", &broadcast1.consume());
2141
2142		drop(broadcast1);
2143		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2144
2145		let broadcast2 = Broadcast::new().produce();
2146		origin.publish_broadcast("test", broadcast2.consume());
2147		drop(broadcast2);
2148		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2149
2150		consumer.assert_next_none("test");
2151		consumer.assert_next_wait();
2152	}
2153
2154	#[tokio::test]
2155	async fn test_coalesce_churn_bounded() {
2156		// A churn loop on a single path should keep the pending set bounded.
2157		// Backup promotion during cleanup can leave the consumer with zero or one
2158		// pending update for "test" depending on the order tasks run; we only
2159		// require that churn doesn't accumulate across iterations.
2160		tokio::time::pause();
2161
2162		let origin = Origin::random().produce();
2163		let mut consumer = origin.consume();
2164
2165		for _ in 0..1000 {
2166			let broadcast = Broadcast::new().produce();
2167			origin.publish_broadcast("test", broadcast.consume());
2168			drop(broadcast);
2169		}
2170		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2171
2172		let mut collected = Vec::new();
2173		while let Some(update) = consumer.try_announced() {
2174			collected.push(update);
2175		}
2176		assert!(
2177			collected.len() <= 1,
2178			"expected at most one pending update, got {}",
2179			collected.len()
2180		);
2181		assert!(
2182			collected.iter().all(|(path, _)| path == &Path::new("test")),
2183			"unexpected path in pending updates",
2184		);
2185	}
2186}