Skip to main content

moq_net/model/
origin.rs

1use std::{
2	collections::{BTreeMap, HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5	task::Poll,
6};
7
8use rand::Rng;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	///
38	/// TEMPORARY: the wire format allows 62 bits, but older `@moq/lite` JS
39	/// clients decode `AnnounceInterest.exclude_hop` as a u53 (number) and
40	/// throw on anything > 2^53-1. To keep those clients alive against
41	/// fresh relays, we cap the random id at 53 bits. Restore to 62 bits
42	/// once the JS u62 fix has propagated to deployed bundles.
43	pub fn random() -> Self {
44		let mut rng = rand::rng();
45		let id = rng.random_range(1..(1u64 << 53));
46		Self { id }
47	}
48
49	/// Consume this [Origin] to create a producer that carries its id.
50	pub fn produce(self) -> OriginProducer {
51		OriginProducer::new(self)
52	}
53}
54
55impl From<u64> for Origin {
56	fn from(id: u64) -> Self {
57		Self { id }
58	}
59}
60
61impl fmt::Display for Origin {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		self.id.fmt(f)
64	}
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69	u64: Encode<V>,
70{
71	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72		self.id.encode(w, version)
73	}
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78	u64: Decode<V>,
79{
80	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81		let id = u64::decode(r, version)?;
82		if id >= 1u64 << 62 {
83			return Err(DecodeError::InvalidValue);
84		}
85		Ok(Self { id })
86	}
87}
88
89/// Maximum number of origins (hops) an [`OriginList`] can hold.
90///
91/// Caps pathological or loop-induced announcements at a reasonable cluster
92/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
93/// silently truncating.
94pub(crate) const MAX_HOPS: usize = 32;
95
96/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
97///
98/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
99/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111		write!(f, "too many origins (max {MAX_HOPS})")
112	}
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118	fn from(_: TooManyOrigins) -> Self {
119		DecodeError::BoundsExceeded
120	}
121}
122
123impl OriginList {
124	/// Create an empty list.
125	pub fn new() -> Self {
126		Self(Vec::new())
127	}
128
129	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
130	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131		if self.0.len() >= MAX_HOPS {
132			return Err(TooManyOrigins);
133		}
134		self.0.push(origin);
135		Ok(())
136	}
137
138	/// Returns true if any entry matches `origin`.
139	pub fn contains(&self, origin: &Origin) -> bool {
140		self.0.contains(origin)
141	}
142
143	/// Number of entries currently in the list (always `<= MAX_HOPS`).
144	pub fn len(&self) -> usize {
145		self.0.len()
146	}
147
148	/// Whether the list contains no entries.
149	pub fn is_empty(&self) -> bool {
150		self.0.is_empty()
151	}
152
153	/// Iterate over the entries in hop order (oldest first).
154	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155		self.0.iter()
156	}
157
158	/// Borrow the entries as a slice.
159	pub fn as_slice(&self) -> &[Origin] {
160		&self.0
161	}
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165	type Error = TooManyOrigins;
166
167	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168		if v.len() > MAX_HOPS {
169			return Err(TooManyOrigins);
170		}
171		Ok(Self(v))
172	}
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176	type Item = &'a Origin;
177	type IntoIter = std::slice::Iter<'a, Origin>;
178
179	fn into_iter(self) -> Self::IntoIter {
180		self.iter()
181	}
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186	u64: Encode<V>,
187	Origin: Encode<V>,
188{
189	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190		(self.0.len() as u64).encode(w, version)?;
191		for origin in &self.0 {
192			origin.encode(w, version)?;
193		}
194		Ok(())
195	}
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200	u64: Decode<V>,
201	Origin: Decode<V>,
202{
203	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204		let count = u64::decode(r, version)? as usize;
205		if count > MAX_HOPS {
206			return Err(DecodeError::BoundsExceeded);
207		}
208		let mut list = Vec::with_capacity(count);
209		for _ in 0..count {
210			list.push(Origin::decode(r, version)?);
211		}
212		Ok(Self(list))
213	}
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222	fn new() -> Self {
223		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224	}
225}
226
227// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
228struct OriginBroadcast {
229	path: PathOwned,
230	active: BroadcastConsumer,
231	backup: VecDeque<BroadcastConsumer>,
232}
233
234/// One coalesced update queued for an `OriginConsumer`.
235///
236/// At most one entry exists per path, so a slow consumer's pending set is bounded
237/// by the number of distinct paths. `UnannounceAnnounce` preserves the
238/// signal that the broadcast at a path was replaced (the consumer must see
239/// `(path, None)` before `(path, Some(new))`), while a stale
240/// `Announce` cancels with a subsequent `unannounce` because the consumer
241/// has not yet observed it.
242enum PendingUpdate {
243	Announce(BroadcastConsumer),
244	Unannounce,
245	UnannounceAnnounce(BroadcastConsumer),
246}
247
248/// Pending updates keyed by path. `BTreeMap` keeps memory strictly bounded by
249/// the number of distinct paths with outstanding work (collapsed pairs are
250/// fully erased) and gives a deterministic lexicographic delivery order so
251/// tests can predict it.
252#[derive(Default)]
253struct OriginConsumerState {
254	pending: BTreeMap<PathOwned, PendingUpdate>,
255}
256
257impl OriginConsumerState {
258	fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
259		let new = match self.pending.remove(&path) {
260			// First announce, or a stale announce being replaced.
261			None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
262			// Consumer needs to observe the unannounce before this announce.
263			Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
264				PendingUpdate::UnannounceAnnounce(broadcast)
265			}
266		};
267		self.pending.insert(path, new);
268	}
269
270	fn apply_unannounce(&mut self, path: PathOwned) {
271		match self.pending.remove(&path) {
272			// Consumer has not seen the pending announce; drop both entirely.
273			Some(PendingUpdate::Announce(_)) => {}
274			None | Some(PendingUpdate::Unannounce) => {
275				self.pending.insert(path, PendingUpdate::Unannounce);
276			}
277			// The embedded announce cancels with this unannounce; the consumer
278			// still needs the leading unannounce.
279			Some(PendingUpdate::UnannounceAnnounce(_)) => {
280				self.pending.insert(path, PendingUpdate::Unannounce);
281			}
282		}
283	}
284
285	/// Take one update to deliver to the consumer, if any.
286	fn take(&mut self) -> Option<OriginAnnounce> {
287		let path = self.pending.keys().next()?.clone();
288		match self.pending.remove(&path).unwrap() {
289			PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
290			PendingUpdate::Unannounce => Some((path, None)),
291			PendingUpdate::UnannounceAnnounce(broadcast) => {
292				// Deliver the unannounce now; leave the trailing announce pending so
293				// the next take returns it for the same path.
294				self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
295				Some((path, None))
296			}
297		}
298	}
299}
300
301#[derive(Clone)]
302struct OriginConsumerNotify {
303	root: PathOwned,
304	state: conducer::Producer<OriginConsumerState>,
305}
306
307impl OriginConsumerNotify {
308	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
309		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
310		self.state
311			.write()
312			.ok()
313			.expect("consumer closed")
314			.apply_announce(path, broadcast);
315	}
316
317	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
318		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
319		let mut state = self.state.write().ok().expect("consumer closed");
320		state.apply_unannounce(path.clone());
321		state.apply_announce(path, broadcast);
322	}
323
324	fn unannounce(&self, path: impl AsPath) {
325		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
326		self.state.write().ok().expect("consumer closed").apply_unannounce(path);
327	}
328}
329
330struct NotifyNode {
331	parent: Option<Lock<NotifyNode>>,
332
333	// Consumers that are subscribed to this node.
334	// We store a consumer ID so we can remove it easily when it closes.
335	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
336}
337
338impl NotifyNode {
339	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
340		Self {
341			parent,
342			consumers: HashMap::new(),
343		}
344	}
345
346	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
347		for consumer in self.consumers.values() {
348			consumer.announce(path.as_path(), broadcast.clone());
349		}
350
351		if let Some(parent) = &self.parent {
352			parent.lock().announce(path, broadcast);
353		}
354	}
355
356	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
357		for consumer in self.consumers.values() {
358			consumer.reannounce(path.as_path(), broadcast.clone());
359		}
360
361		if let Some(parent) = &self.parent {
362			parent.lock().reannounce(path, broadcast);
363		}
364	}
365
366	fn unannounce(&mut self, path: impl AsPath) {
367		for consumer in self.consumers.values() {
368			consumer.unannounce(path.as_path());
369		}
370
371		if let Some(parent) = &self.parent {
372			parent.lock().unannounce(path);
373		}
374	}
375}
376
377struct OriginNode {
378	// The broadcast that is published to this node.
379	broadcast: Option<OriginBroadcast>,
380
381	// Nested nodes, one level down the tree.
382	nested: HashMap<String, Lock<OriginNode>>,
383
384	// Unfortunately, to notify consumers we need to traverse back up the tree.
385	notify: Lock<NotifyNode>,
386}
387
388impl OriginNode {
389	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
390		Self {
391			broadcast: None,
392			nested: HashMap::new(),
393			notify: Lock::new(NotifyNode::new(parent)),
394		}
395	}
396
397	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
398		let (dir, rest) = path.next_part().expect("leaf called with empty path");
399
400		let next = self.entry(dir);
401		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
402	}
403
404	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
405		match self.nested.get(dir) {
406			Some(next) => next.clone(),
407			None => {
408				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
409				self.nested.insert(dir.to_string(), next.clone());
410				next
411			}
412		}
413	}
414
415	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
416		let full = full.as_path();
417		let rest = relative.as_path();
418
419		// If the path has a directory component, then publish it to the nested node.
420		if let Some((dir, relative)) = rest.next_part() {
421			// Not using entry to avoid allocating a string most of the time.
422			self.entry(dir).lock().publish(&full, broadcast, &relative);
423		} else if let Some(existing) = &mut self.broadcast {
424			// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
425			// on ties, the newer broadcast wins, since the previous one may be about to close.
426			//
427			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
428			// backup queue can't accumulate clones of the active entry and trigger redundant
429			// reannouncements when a peer churns.
430			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
431				return;
432			}
433
434			if broadcast.hops.len() <= existing.active.hops.len() {
435				let old = existing.active.clone();
436				existing.active = broadcast.clone();
437				existing.backup.push_back(old);
438
439				self.notify.lock().reannounce(full, broadcast);
440			} else {
441				// Longer path: keep as a backup in case the active one drops.
442				existing.backup.push_back(broadcast.clone());
443			}
444		} else {
445			// This node is a leaf with no existing broadcast.
446			self.broadcast = Some(OriginBroadcast {
447				path: full.to_owned(),
448				active: broadcast.clone(),
449				backup: VecDeque::new(),
450			});
451			self.notify.lock().announce(full, broadcast);
452		}
453	}
454
455	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
456		self.consume_initial(&mut notify);
457		self.notify.lock().consumers.insert(id, notify);
458	}
459
460	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
461		if let Some(broadcast) = &self.broadcast {
462			notify.announce(&broadcast.path, broadcast.active.clone());
463		}
464
465		// Recursively subscribe to all nested nodes.
466		for nested in self.nested.values() {
467			nested.lock().consume_initial(notify);
468		}
469	}
470
471	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
472		let rest = rest.as_path();
473
474		if let Some((dir, rest)) = rest.next_part() {
475			let node = self.nested.get(dir)?.lock();
476			node.consume_broadcast(&rest)
477		} else {
478			self.broadcast.as_ref().map(|b| b.active.clone())
479		}
480	}
481
482	fn unconsume(&mut self, id: ConsumerId) {
483		self.notify.lock().consumers.remove(&id).expect("consumer not found");
484		if self.is_empty() {
485			//tracing::warn!("TODO: empty node; memory leak");
486			// This happens when consuming a path that is not being broadcasted.
487		}
488	}
489
490	// Returns true if the broadcast should be unannounced.
491	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
492		let full = full.as_path();
493		let relative = relative.as_path();
494
495		if let Some((dir, relative)) = relative.next_part() {
496			let nested = self.entry(dir);
497			let mut locked = nested.lock();
498			locked.remove(&full, broadcast, &relative);
499
500			if locked.is_empty() {
501				drop(locked);
502				self.nested.remove(dir);
503			}
504		} else {
505			let entry = match &mut self.broadcast {
506				Some(existing) => existing,
507				None => return,
508			};
509
510			// See if we can remove the broadcast from the backup list.
511			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
512			if let Some(pos) = pos {
513				entry.backup.remove(pos);
514				// Nothing else to do
515				return;
516			}
517
518			// Okay so it must be the active broadcast or else we fucked up.
519			assert!(entry.active.is_clone(&broadcast));
520
521			// Promote the backup with the shortest hop chain so we keep preferring short paths.
522			// Ties break toward the oldest (FIFO) since min_by_key returns the first minimum.
523			let best = entry
524				.backup
525				.iter()
526				.enumerate()
527				.min_by_key(|(_, b)| b.hops.len())
528				.map(|(i, _)| i);
529			if let Some(idx) = best {
530				let active = entry.backup.remove(idx).expect("index in range");
531				entry.active = active;
532				self.notify.lock().reannounce(full, &entry.active);
533			} else {
534				// No more backups, so remove the entry.
535				self.broadcast = None;
536				self.notify.lock().unannounce(full);
537			}
538		}
539	}
540
541	fn is_empty(&self) -> bool {
542		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
543	}
544}
545
546#[derive(Clone)]
547struct OriginNodes {
548	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
549}
550
551impl OriginNodes {
552	// Returns nested roots that match the prefixes.
553	// PathPrefixes guarantees no duplicates or overlapping prefixes.
554	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
555		let mut roots = Vec::new();
556
557		for (root, state) in &self.nodes {
558			for prefix in prefixes {
559				if root.has_prefix(prefix) {
560					// Keep the existing node if we're allowed to access it.
561					roots.push((root.to_owned(), state.clone()));
562					continue;
563				}
564
565				if let Some(suffix) = prefix.strip_prefix(root) {
566					// If the requested prefix is larger than the allowed prefix, then we further scope it.
567					let nested = state.lock().leaf(&suffix);
568					roots.push((prefix.to_owned(), nested));
569				}
570			}
571		}
572
573		if roots.is_empty() {
574			None
575		} else {
576			Some(Self { nodes: roots })
577		}
578	}
579
580	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
581		let new_root = new_root.as_path();
582		let mut roots = Vec::new();
583
584		if new_root.is_empty() {
585			return Some(self.clone());
586		}
587
588		for (root, state) in &self.nodes {
589			if let Some(suffix) = root.strip_prefix(&new_root) {
590				// If the old root is longer than the new root, shorten the keys.
591				roots.push((suffix.to_owned(), state.clone()));
592			} else if let Some(suffix) = new_root.strip_prefix(root) {
593				// If the new root is longer than the old root, add a new root.
594				// NOTE: suffix can't be empty
595				let nested = state.lock().leaf(&suffix);
596				roots.push(("".into(), nested));
597			}
598		}
599
600		if roots.is_empty() {
601			None
602		} else {
603			Some(Self { nodes: roots })
604		}
605	}
606
607	// Returns the root that has this prefix.
608	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
609		let path = path.as_path();
610
611		for (root, state) in &self.nodes {
612			if let Some(suffix) = path.strip_prefix(root) {
613				return Some((state.clone(), suffix.to_owned()));
614			}
615		}
616
617		None
618	}
619}
620
621impl Default for OriginNodes {
622	fn default() -> Self {
623		Self {
624			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
625		}
626	}
627}
628
629/// A broadcast path and its associated consumer, or None if closed.
630pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
631
632/// Announces broadcasts to consumers over the network.
633#[derive(Clone)]
634pub struct OriginProducer {
635	// Identity for this origin. Appended to broadcast hops when
636	// re-announcing so downstream relays can detect loops and prefer the
637	// shortest path.
638	info: Origin,
639
640	// The roots of the tree that we are allowed to publish.
641	// A path of "" means we can publish anything.
642	nodes: OriginNodes,
643
644	// The prefix that is automatically stripped from all paths.
645	root: PathOwned,
646}
647
648impl std::ops::Deref for OriginProducer {
649	type Target = Origin;
650
651	fn deref(&self) -> &Self::Target {
652		&self.info
653	}
654}
655
656impl OriginProducer {
657	/// Build a producer for the given origin id with no scoped prefix and no
658	/// pre-existing broadcasts. Prefer [`Origin::produce`].
659	pub fn new(info: Origin) -> Self {
660		Self {
661			info,
662			nodes: OriginNodes::default(),
663			root: PathOwned::default(),
664		}
665	}
666
667	/// Create and publish a new broadcast, returning the producer.
668	///
669	/// This is a helper method when you only want to publish a broadcast to a single origin.
670	/// Returns [None] if the broadcast is not allowed to be published.
671	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
672		let broadcast = Broadcast::new().produce();
673		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
674	}
675
676	/// Publish a broadcast, announcing it to all consumers.
677	///
678	/// The broadcast will be unannounced when it is closed.
679	/// If there is already a broadcast with the same path, the new one replaces the active only
680	/// if its hop path is shorter or equal; otherwise it is queued as a backup.
681	/// When the active broadcast closes, the backup with the shortest hop path is promoted and
682	/// reannounced. Backups that close before being promoted are silently dropped.
683	///
684	/// Returns false if the broadcast is not allowed to be published.
685	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
686		let path = path.as_path();
687
688		// Loop detection: refuse broadcasts whose hop chain already contains our id.
689		if broadcast.hops.contains(&self.info) {
690			return false;
691		}
692
693		let (root, rest) = match self.nodes.get(&path) {
694			Some(root) => root,
695			None => return false,
696		};
697
698		let full = self.root.join(&path);
699
700		root.lock().publish(&full, &broadcast, &rest);
701		let root = root.clone();
702
703		web_async::spawn(async move {
704			broadcast.closed().await;
705			root.lock().remove(&full, broadcast, &rest);
706		});
707
708		true
709	}
710
711	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
712	///
713	/// Returns None if there are no legal prefixes (the requested prefixes are
714	/// disjoint from this producer's current scope).
715	// TODO accept PathPrefixes instead of &[Path]
716	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
717		let prefixes = PathPrefixes::new(prefixes);
718		Some(OriginProducer {
719			info: self.info,
720			nodes: self.nodes.select(&prefixes)?,
721			root: self.root.clone(),
722		})
723	}
724
725	/// Subscribe to all announced broadcasts.
726	pub fn consume(&self) -> OriginConsumer {
727		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
728	}
729
730	/// Get a broadcast by path if it has *already* been published.
731	///
732	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
733	/// announcement-cursor allocation, which is currently relatively expensive.
734	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
735	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
736		let path = path.as_path();
737		let (root, rest) = self.nodes.get(&path)?;
738		let state = root.lock();
739		state.consume_broadcast(&rest)
740	}
741
742	/// Returns a new OriginProducer that automatically strips out the provided prefix.
743	///
744	/// Returns None if the provided root is not authorized; when [`Self::scope`]
745	/// was already used without a wildcard.
746	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
747		let prefix = prefix.as_path();
748
749		Some(Self {
750			info: self.info,
751			root: self.root.join(&prefix).to_owned(),
752			nodes: self.nodes.root(&prefix)?,
753		})
754	}
755
756	/// Returns the root that is automatically stripped from all paths.
757	pub fn root(&self) -> &Path<'_> {
758		&self.root
759	}
760
761	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
762	// TODO return PathPrefixes
763	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
764		self.nodes.nodes.iter().map(|(root, _)| root)
765	}
766
767	/// Converts a relative path to an absolute path.
768	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
769		self.root.join(path)
770	}
771}
772
773/// Consumes announced broadcasts matching against an optional prefix.
774///
775/// NOTE: Clone is expensive, try to avoid it.
776pub struct OriginConsumer {
777	id: ConsumerId,
778	// Identity of the origin this consumer was derived from.
779	info: Origin,
780	nodes: OriginNodes,
781
782	// Pending updates queued for this consumer. Coalesced so a slow consumer
783	// can't accumulate redundant announce/unannounce pairs.
784	state: conducer::Producer<OriginConsumerState>,
785
786	// A prefix that is automatically stripped from all paths.
787	root: PathOwned,
788}
789
790impl std::ops::Deref for OriginConsumer {
791	type Target = Origin;
792
793	fn deref(&self) -> &Self::Target {
794		&self.info
795	}
796}
797
798impl OriginConsumer {
799	fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
800		let state = conducer::Producer::<OriginConsumerState>::default();
801		let id = ConsumerId::new();
802
803		for (_, node) in &nodes.nodes {
804			let notify = OriginConsumerNotify {
805				root: root.clone(),
806				state: state.clone(),
807			};
808			node.lock().consume(id, notify);
809		}
810
811		Self {
812			id,
813			info,
814			nodes,
815			state,
816			root,
817		}
818	}
819
820	/// Returns the next (un)announced broadcast and the absolute path.
821	///
822	/// The broadcast will only be announced if it was previously unannounced.
823	/// The same path won't be announced/unannounced twice, instead it will toggle.
824	/// Returns None if the consumer is closed.
825	///
826	/// Note: The returned path is absolute and will always match this consumer's prefix.
827	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
828		conducer::wait(|waiter| self.poll_announced(waiter)).await
829	}
830
831	/// Poll for the next (un)announced broadcast, without blocking.
832	///
833	/// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the
834	/// consumer is closed, or `Poll::Pending` after registering `waiter` to be
835	/// notified when the next update arrives.
836	pub fn poll_announced(&mut self, waiter: &conducer::Waiter) -> Poll<Option<OriginAnnounce>> {
837		match self.state.poll(waiter, |state| match state.take() {
838			Some(item) => Poll::Ready(item),
839			None => Poll::Pending,
840		}) {
841			Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
842			// Closed: discard the Ref so its MutexGuard doesn't escape this call.
843			Poll::Ready(Err(_)) => Poll::Ready(None),
844			Poll::Pending => Poll::Pending,
845		}
846	}
847
848	/// Returns the next (un)announced broadcast and the absolute path without blocking.
849	///
850	/// Returns None if there is no update available; NOT because the consumer is closed.
851	/// You have to use `is_closed` to check if the consumer is closed.
852	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
853		self.state.write().ok()?.take()
854	}
855
856	/// Create another consumer with its own announcement cursor over the same origin.
857	pub fn consume(&self) -> Self {
858		self.clone()
859	}
860
861	/// Get a broadcast by path if it has *already* been announced.
862	///
863	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
864	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
865	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
866	/// (blocks until announced) unless you can guarantee the announcement has already
867	/// landed (e.g. you're responding to an `announced()` callback).
868	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
869		let path = path.as_path();
870		let (root, rest) = self.nodes.get(&path)?;
871		let state = root.lock();
872		state.consume_broadcast(&rest)
873	}
874
875	/// Block until a broadcast with the given path is announced and return it.
876	///
877	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
878	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
879	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
880	///
881	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
882	/// cannot guarantee the announcement has already been received.
883	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
884		let path = path.as_path();
885
886		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
887		let mut consumer = self.scope(std::slice::from_ref(&path))?;
888
889		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
890		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
891		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
892		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
893			return None;
894		}
895
896		loop {
897			let (announced, broadcast) = consumer.announced().await?;
898			// `scope` narrows by prefix, but we only want an exact-path match.
899			if announced.as_path() == path {
900				if let Some(broadcast) = broadcast {
901					return Some(broadcast);
902				}
903			}
904		}
905	}
906
907	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
908	///
909	/// Returns None if there are no legal prefixes (the requested prefixes are
910	/// disjoint from this consumer's current scope, so it would always return None).
911	// TODO accept PathPrefixes instead of &[Path]
912	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
913		let prefixes = PathPrefixes::new(prefixes);
914		Some(OriginConsumer::new(
915			self.info,
916			self.root.clone(),
917			self.nodes.select(&prefixes)?,
918		))
919	}
920
921	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
922	///
923	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
924	/// already used without a wildcard.
925	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
926		let prefix = prefix.as_path();
927
928		Some(Self::new(
929			self.info,
930			self.root.join(&prefix).to_owned(),
931			self.nodes.root(&prefix)?,
932		))
933	}
934
935	/// Returns the prefix that is automatically stripped from all paths.
936	pub fn root(&self) -> &Path<'_> {
937		&self.root
938	}
939
940	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
941	// TODO return PathPrefixes
942	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
943		self.nodes.nodes.iter().map(|(root, _)| root)
944	}
945
946	/// Converts a relative path to an absolute path.
947	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
948		self.root.join(path)
949	}
950}
951
952impl Drop for OriginConsumer {
953	fn drop(&mut self) {
954		for (_, root) in &self.nodes.nodes {
955			root.lock().unconsume(self.id);
956		}
957	}
958}
959
960impl Clone for OriginConsumer {
961	fn clone(&self) -> Self {
962		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
963	}
964}
965
966#[cfg(test)]
967use futures::FutureExt;
968
969#[cfg(test)]
970impl OriginConsumer {
971	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
972		let expected = expected.as_path();
973		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
974		assert_eq!(path, expected, "wrong path");
975		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
976	}
977
978	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
979		let expected = expected.as_path();
980		let (path, active) = self.try_announced().expect("no next");
981		assert_eq!(path, expected, "wrong path");
982		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
983	}
984
985	pub fn assert_next_none(&mut self, expected: impl AsPath) {
986		let expected = expected.as_path();
987		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
988		assert_eq!(path, expected, "wrong path");
989		assert!(active.is_none(), "should be unannounced");
990	}
991
992	pub fn assert_next_wait(&mut self) {
993		if let Some(res) = self.announced().now_or_never() {
994			panic!("next should block: got {:?}", res.map(|(path, _)| path));
995		}
996	}
997
998	/*
999	pub fn assert_next_closed(&mut self) {
1000		assert!(
1001			self.announced().now_or_never().expect("next blocked").is_none(),
1002			"next should be closed"
1003		);
1004	}
1005	*/
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010	use crate::Broadcast;
1011
1012	use super::*;
1013
1014	#[test]
1015	fn origin_list_push_fails_at_limit() {
1016		let mut list = OriginList::new();
1017		for _ in 0..MAX_HOPS {
1018			list.push(Origin::random()).unwrap();
1019		}
1020		assert_eq!(list.len(), MAX_HOPS);
1021		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1022	}
1023
1024	#[test]
1025	fn origin_list_try_from_vec_enforces_limit() {
1026		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1027		assert!(OriginList::try_from(under).is_ok());
1028
1029		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1030		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1031	}
1032
1033	#[tokio::test]
1034	async fn test_announce() {
1035		tokio::time::pause();
1036
1037		let origin = Origin::random().produce();
1038		let broadcast1 = Broadcast::new().produce();
1039		let broadcast2 = Broadcast::new().produce();
1040
1041		let mut consumer1 = origin.consume();
1042		// Make a new consumer that should get it.
1043		consumer1.assert_next_wait();
1044
1045		// Publish the first broadcast.
1046		origin.publish_broadcast("test1", broadcast1.consume());
1047
1048		consumer1.assert_next("test1", &broadcast1.consume());
1049		consumer1.assert_next_wait();
1050
1051		// Make a new consumer that should get the existing broadcast.
1052		// But we don't consume it yet.
1053		let mut consumer2 = origin.consume();
1054
1055		// Publish the second broadcast.
1056		origin.publish_broadcast("test2", broadcast2.consume());
1057
1058		consumer1.assert_next("test2", &broadcast2.consume());
1059		consumer1.assert_next_wait();
1060
1061		consumer2.assert_next("test1", &broadcast1.consume());
1062		consumer2.assert_next("test2", &broadcast2.consume());
1063		consumer2.assert_next_wait();
1064
1065		// Close the first broadcast.
1066		drop(broadcast1);
1067
1068		// Wait for the async task to run.
1069		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1070
1071		// All consumers should get a None now.
1072		consumer1.assert_next_none("test1");
1073		consumer2.assert_next_none("test1");
1074		consumer1.assert_next_wait();
1075		consumer2.assert_next_wait();
1076
1077		// And a new consumer only gets the last broadcast.
1078		let mut consumer3 = origin.consume();
1079		consumer3.assert_next("test2", &broadcast2.consume());
1080		consumer3.assert_next_wait();
1081
1082		// Close the other producer and make sure it cleans up
1083		drop(broadcast2);
1084
1085		// Wait for the async task to run.
1086		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1087
1088		consumer1.assert_next_none("test2");
1089		consumer2.assert_next_none("test2");
1090		consumer3.assert_next_none("test2");
1091
1092		/* TODO close the origin consumer when the producer is dropped
1093		consumer1.assert_next_closed();
1094		consumer2.assert_next_closed();
1095		consumer3.assert_next_closed();
1096		*/
1097	}
1098
1099	#[tokio::test]
1100	async fn test_duplicate() {
1101		tokio::time::pause();
1102
1103		let origin = Origin::random().produce();
1104
1105		let broadcast1 = Broadcast::new().produce();
1106		let broadcast2 = Broadcast::new().produce();
1107		let broadcast3 = Broadcast::new().produce();
1108
1109		let consumer1 = broadcast1.consume();
1110		let consumer2 = broadcast2.consume();
1111		let consumer3 = broadcast3.consume();
1112
1113		let mut consumer = origin.consume();
1114
1115		origin.publish_broadcast("test", consumer1.clone());
1116		origin.publish_broadcast("test", consumer2.clone());
1117		origin.publish_broadcast("test", consumer3.clone());
1118		assert!(consumer.get_broadcast("test").is_some());
1119
1120		// On equal hop lengths, each new publish replaces the active and reannounces.
1121		// Because the consumer hasn't drained between publishes, the stale announces
1122		// collapse with their following unannounces and only the final active broadcast
1123		// is delivered.
1124		consumer.assert_next("test", &consumer3);
1125		consumer.assert_next_wait();
1126
1127		// Drop a backup, nothing should change.
1128		drop(broadcast2);
1129
1130		// Wait for the async task to run.
1131		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1132
1133		assert!(consumer.get_broadcast("test").is_some());
1134		consumer.assert_next_wait();
1135
1136		// Drop the active, we should reannounce with the remaining backup.
1137		drop(broadcast3);
1138
1139		// Wait for the async task to run.
1140		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1141
1142		assert!(consumer.get_broadcast("test").is_some());
1143		consumer.assert_next_none("test");
1144		consumer.assert_next("test", &consumer1);
1145
1146		// Drop the final broadcast, we should unannounce.
1147		drop(broadcast1);
1148
1149		// Wait for the async task to run.
1150		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1151		assert!(consumer.get_broadcast("test").is_none());
1152
1153		consumer.assert_next_none("test");
1154		consumer.assert_next_wait();
1155	}
1156
1157	#[tokio::test]
1158	async fn test_duplicate_reverse() {
1159		tokio::time::pause();
1160
1161		let origin = Origin::random().produce();
1162		let broadcast1 = Broadcast::new().produce();
1163		let broadcast2 = Broadcast::new().produce();
1164
1165		origin.publish_broadcast("test", broadcast1.consume());
1166		origin.publish_broadcast("test", broadcast2.consume());
1167		assert!(origin.consume().get_broadcast("test").is_some());
1168
1169		// This is harder, dropping the new broadcast first.
1170		drop(broadcast2);
1171
1172		// Wait for the cleanup async task to run.
1173		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1174		assert!(origin.consume().get_broadcast("test").is_some());
1175
1176		drop(broadcast1);
1177
1178		// Wait for the cleanup async task to run.
1179		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1180		assert!(origin.consume().get_broadcast("test").is_none());
1181	}
1182
1183	#[tokio::test]
1184	async fn test_double_publish() {
1185		tokio::time::pause();
1186
1187		let origin = Origin::random().produce();
1188		let broadcast = Broadcast::new().produce();
1189
1190		// Ensure it doesn't crash.
1191		origin.publish_broadcast("test", broadcast.consume());
1192		origin.publish_broadcast("test", broadcast.consume());
1193
1194		assert!(origin.consume().get_broadcast("test").is_some());
1195
1196		drop(broadcast);
1197
1198		// Wait for the async task to run.
1199		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1200		assert!(origin.consume().get_broadcast("test").is_none());
1201	}
1202	// A previous mpsc-based implementation could only deliver the first 127 broadcasts
1203	// instantly via `assert_next` (which uses `now_or_never`). The conducer-backed
1204	// implementation polls synchronously and can deliver all of them without yielding.
1205	// Names are zero-padded so lexicographic delivery order matches the loop index.
1206	#[tokio::test]
1207	async fn test_many_announces() {
1208		let origin = Origin::random().produce();
1209		let broadcast = Broadcast::new().produce();
1210
1211		let mut consumer = origin.consume();
1212		for i in 0..256 {
1213			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1214		}
1215
1216		for i in 0..256 {
1217			consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1218		}
1219		consumer.assert_next_wait();
1220	}
1221
1222	#[tokio::test]
1223	async fn test_many_announces_try() {
1224		let origin = Origin::random().produce();
1225		let broadcast = Broadcast::new().produce();
1226
1227		let mut consumer = origin.consume();
1228		for i in 0..256 {
1229			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1230		}
1231
1232		for i in 0..256 {
1233			consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1234		}
1235	}
1236
1237	#[tokio::test]
1238	async fn test_with_root_basic() {
1239		let origin = Origin::random().produce();
1240		let broadcast = Broadcast::new().produce();
1241
1242		// Create a producer with root "/foo"
1243		let foo_producer = origin.with_root("foo").expect("should create root");
1244		assert_eq!(foo_producer.root().as_str(), "foo");
1245
1246		let mut consumer = origin.consume();
1247
1248		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1249		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1250		// The original consumer should see the full path
1251		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1252
1253		// A consumer created from the rooted producer should see the stripped path
1254		let mut foo_consumer = foo_producer.consume();
1255		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1256	}
1257
1258	#[tokio::test]
1259	async fn test_with_root_nested() {
1260		let origin = Origin::random().produce();
1261		let broadcast = Broadcast::new().produce();
1262
1263		// Create nested roots
1264		let foo_producer = origin.with_root("foo").expect("should create foo root");
1265		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1266		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1267
1268		let mut consumer = origin.consume();
1269
1270		// Publishing to "baz" should actually publish to "foo/bar/baz"
1271		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1272		// The original consumer sees the full path
1273		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1274
1275		// Consumer from foo_bar_producer sees just "baz"
1276		let mut foo_bar_consumer = foo_bar_producer.consume();
1277		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1278	}
1279
1280	#[tokio::test]
1281	async fn test_publish_scope_allows() {
1282		let origin = Origin::random().produce();
1283		let broadcast = Broadcast::new().produce();
1284
1285		// Create a producer that can only publish to "allowed" paths
1286		let limited_producer = origin
1287			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1288			.expect("should create limited producer");
1289
1290		// Should be able to publish to allowed paths
1291		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1292		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1293		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1294
1295		// Should not be able to publish to disallowed paths
1296		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1297		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1298		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1299	}
1300
1301	#[tokio::test]
1302	async fn test_publish_scope_empty() {
1303		let origin = Origin::random().produce();
1304
1305		// Creating a producer with no allowed paths should return None
1306		assert!(origin.scope(&[]).is_none());
1307	}
1308
1309	#[tokio::test]
1310	async fn test_consume_scope_filters() {
1311		let origin = Origin::random().produce();
1312		let broadcast1 = Broadcast::new().produce();
1313		let broadcast2 = Broadcast::new().produce();
1314		let broadcast3 = Broadcast::new().produce();
1315
1316		let mut consumer = origin.consume();
1317
1318		// Publish to different paths
1319		origin.publish_broadcast("allowed", broadcast1.consume());
1320		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1321		origin.publish_broadcast("notallowed", broadcast3.consume());
1322
1323		// Create a consumer that only sees "allowed" paths
1324		let mut limited_consumer = origin
1325			.consume()
1326			.scope(&["allowed".into()])
1327			.expect("should create limited consumer");
1328
1329		// Should only receive broadcasts under "allowed"
1330		limited_consumer.assert_next("allowed", &broadcast1.consume());
1331		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1332		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1333
1334		// Unscoped consumer should see all
1335		consumer.assert_next("allowed", &broadcast1.consume());
1336		consumer.assert_next("allowed/nested", &broadcast2.consume());
1337		consumer.assert_next("notallowed", &broadcast3.consume());
1338	}
1339
1340	#[tokio::test]
1341	async fn test_consume_scope_multiple_prefixes() {
1342		let origin = Origin::random().produce();
1343		let broadcast1 = Broadcast::new().produce();
1344		let broadcast2 = Broadcast::new().produce();
1345		let broadcast3 = Broadcast::new().produce();
1346
1347		origin.publish_broadcast("foo/test", broadcast1.consume());
1348		origin.publish_broadcast("bar/test", broadcast2.consume());
1349		origin.publish_broadcast("baz/test", broadcast3.consume());
1350
1351		// Consumer that only sees "foo" and "bar" paths
1352		let mut limited_consumer = origin
1353			.consume()
1354			.scope(&["foo".into(), "bar".into()])
1355			.expect("should create limited consumer");
1356
1357		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1358		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1359		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1360		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1361	}
1362
1363	#[tokio::test]
1364	async fn test_with_root_and_publish_scope() {
1365		let origin = Origin::random().produce();
1366		let broadcast = Broadcast::new().produce();
1367
1368		// User connects to /foo root
1369		let foo_producer = origin.with_root("foo").expect("should create foo root");
1370
1371		// Limit them to publish only to "bar" and "goop/pee" within /foo
1372		let limited_producer = foo_producer
1373			.scope(&["bar".into(), "goop/pee".into()])
1374			.expect("should create limited producer");
1375
1376		let mut consumer = origin.consume();
1377
1378		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1379		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1380		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1381		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1382		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1383
1384		// Should not be able to publish outside allowed paths
1385		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1386		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1387		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1388
1389		// Original consumer sees full paths
1390		consumer.assert_next("foo/bar", &broadcast.consume());
1391		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1392		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1393		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1394	}
1395
1396	#[tokio::test]
1397	async fn test_with_root_and_consume_scope() {
1398		let origin = Origin::random().produce();
1399		let broadcast1 = Broadcast::new().produce();
1400		let broadcast2 = Broadcast::new().produce();
1401		let broadcast3 = Broadcast::new().produce();
1402
1403		// Publish broadcasts
1404		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1405		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1406		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1407
1408		// User connects to /foo root
1409		let foo_producer = origin.with_root("foo").expect("should create foo root");
1410
1411		// Create consumer limited to "bar" and "goop/pee" within /foo
1412		let mut limited_consumer = foo_producer
1413			.consume()
1414			.scope(&["bar".into(), "goop/pee".into()])
1415			.expect("should create limited consumer");
1416
1417		// Should only see allowed paths (without foo prefix)
1418		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1419		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1420		limited_consumer.assert_next_wait(); // Should not see "other/test"
1421	}
1422
1423	#[tokio::test]
1424	async fn test_with_root_unauthorized() {
1425		let origin = Origin::random().produce();
1426
1427		// First limit the producer to specific paths
1428		let limited_producer = origin
1429			.scope(&["allowed".into()])
1430			.expect("should create limited producer");
1431
1432		// Trying to create a root outside allowed paths should fail
1433		assert!(limited_producer.with_root("notallowed").is_none());
1434
1435		// But creating a root within allowed paths should work
1436		let allowed_root = limited_producer
1437			.with_root("allowed")
1438			.expect("should create allowed root");
1439		assert_eq!(allowed_root.root().as_str(), "allowed");
1440	}
1441
1442	#[tokio::test]
1443	async fn test_wildcard_permission() {
1444		let origin = Origin::random().produce();
1445		let broadcast = Broadcast::new().produce();
1446
1447		// Producer with root access (empty string means wildcard)
1448		let root_producer = origin.clone();
1449
1450		// Should be able to publish anywhere
1451		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1452		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1453
1454		// Can create any root
1455		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1456		assert_eq!(foo_producer.root().as_str(), "foo");
1457	}
1458
1459	#[tokio::test]
1460	async fn test_consume_broadcast_with_permissions() {
1461		let origin = Origin::random().produce();
1462		let broadcast1 = Broadcast::new().produce();
1463		let broadcast2 = Broadcast::new().produce();
1464
1465		origin.publish_broadcast("allowed/test", broadcast1.consume());
1466		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1467
1468		// Create limited consumer
1469		let limited_consumer = origin
1470			.consume()
1471			.scope(&["allowed".into()])
1472			.expect("should create limited consumer");
1473
1474		// Should be able to get allowed broadcast
1475		let result = limited_consumer.get_broadcast("allowed/test");
1476		assert!(result.is_some());
1477		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1478
1479		// Should not be able to get disallowed broadcast
1480		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1481
1482		// Original consumer can get both
1483		let consumer = origin.consume();
1484		assert!(consumer.get_broadcast("allowed/test").is_some());
1485		assert!(consumer.get_broadcast("notallowed/test").is_some());
1486	}
1487
1488	#[tokio::test]
1489	async fn test_nested_paths_with_permissions() {
1490		let origin = Origin::random().produce();
1491		let broadcast = Broadcast::new().produce();
1492
1493		// Create producer limited to "a/b/c"
1494		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1495
1496		// Should be able to publish to exact path and nested paths
1497		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1498		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1499		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1500
1501		// Should not be able to publish to parent or sibling paths
1502		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1503		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1504		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1505	}
1506
1507	#[tokio::test]
1508	async fn test_multiple_consumers_with_different_permissions() {
1509		let origin = Origin::random().produce();
1510		let broadcast1 = Broadcast::new().produce();
1511		let broadcast2 = Broadcast::new().produce();
1512		let broadcast3 = Broadcast::new().produce();
1513
1514		// Publish to different paths
1515		origin.publish_broadcast("foo/test", broadcast1.consume());
1516		origin.publish_broadcast("bar/test", broadcast2.consume());
1517		origin.publish_broadcast("baz/test", broadcast3.consume());
1518
1519		// Create consumers with different permissions
1520		let mut foo_consumer = origin
1521			.consume()
1522			.scope(&["foo".into()])
1523			.expect("should create foo consumer");
1524
1525		let mut bar_consumer = origin
1526			.consume()
1527			.scope(&["bar".into()])
1528			.expect("should create bar consumer");
1529
1530		let mut foobar_consumer = origin
1531			.consume()
1532			.scope(&["foo".into(), "bar".into()])
1533			.expect("should create foobar consumer");
1534
1535		// Each consumer should only see their allowed paths
1536		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1537		foo_consumer.assert_next_wait();
1538
1539		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1540		bar_consumer.assert_next_wait();
1541
1542		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1543		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1544		foobar_consumer.assert_next_wait();
1545	}
1546
1547	#[tokio::test]
1548	async fn test_select_with_empty_prefix() {
1549		let origin = Origin::random().produce();
1550		let broadcast1 = Broadcast::new().produce();
1551		let broadcast2 = Broadcast::new().produce();
1552
1553		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1554		let demo_producer = origin.with_root("demo").expect("should create demo root");
1555		let limited_producer = demo_producer
1556			.scope(&["worm-node".into(), "foobar".into()])
1557			.expect("should create limited producer");
1558
1559		// Publish some broadcasts
1560		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1561		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1562
1563		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1564		let mut consumer = limited_producer
1565			.consume()
1566			.scope(&["".into()])
1567			.expect("should create consumer with empty prefix");
1568
1569		// Should see both broadcasts (order depends on PathPrefixes sort)
1570		let a1 = consumer.try_announced().expect("expected first announcement");
1571		let a2 = consumer.try_announced().expect("expected second announcement");
1572		consumer.assert_next_wait();
1573
1574		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1575		paths.sort();
1576		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1577	}
1578
1579	#[tokio::test]
1580	async fn test_select_narrowing_scope() {
1581		let origin = Origin::random().produce();
1582		let broadcast1 = Broadcast::new().produce();
1583		let broadcast2 = Broadcast::new().produce();
1584		let broadcast3 = Broadcast::new().produce();
1585
1586		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1587		let demo_producer = origin.with_root("demo").expect("should create demo root");
1588		let limited_producer = demo_producer
1589			.scope(&["worm-node".into(), "foobar".into()])
1590			.expect("should create limited producer");
1591
1592		// Publish broadcasts at different levels
1593		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1594		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1595		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1596
1597		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1598		let mut worm_consumer = limited_producer
1599			.consume()
1600			.scope(&["worm-node".into()])
1601			.expect("should create worm-node consumer");
1602
1603		// Should see worm-node content with paths stripped to ""
1604		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1605		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1606		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1607
1608		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1609		let mut foo_consumer = limited_producer
1610			.consume()
1611			.scope(&["worm-node/foo".into()])
1612			.expect("should create worm-node/foo consumer");
1613
1614		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1615		foo_consumer.assert_next_wait(); // Should NOT see other content
1616	}
1617
1618	#[tokio::test]
1619	async fn test_select_multiple_roots_with_empty_prefix() {
1620		let origin = Origin::random().produce();
1621		let broadcast1 = Broadcast::new().produce();
1622		let broadcast2 = Broadcast::new().produce();
1623		let broadcast3 = Broadcast::new().produce();
1624
1625		// Producer with multiple allowed roots
1626		let limited_producer = origin
1627			.scope(&["app1".into(), "app2".into(), "shared".into()])
1628			.expect("should create limited producer");
1629
1630		// Publish to each root
1631		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1632		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1633		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1634
1635		// scope with empty prefix should maintain all roots
1636		let mut consumer = limited_producer
1637			.consume()
1638			.scope(&["".into()])
1639			.expect("should create consumer with empty prefix");
1640
1641		// Should see all broadcasts from all roots
1642		consumer.assert_next("app1/data", &broadcast1.consume());
1643		consumer.assert_next("app2/config", &broadcast2.consume());
1644		consumer.assert_next("shared/resource", &broadcast3.consume());
1645		consumer.assert_next_wait();
1646	}
1647
1648	#[tokio::test]
1649	async fn test_publish_scope_with_empty_prefix() {
1650		let origin = Origin::random().produce();
1651		let broadcast = Broadcast::new().produce();
1652
1653		// Producer with specific allowed paths
1654		let limited_producer = origin
1655			.scope(&["services/api".into(), "services/web".into()])
1656			.expect("should create limited producer");
1657
1658		// scope with empty prefix should keep the same restrictions
1659		let same_producer = limited_producer
1660			.scope(&["".into()])
1661			.expect("should create producer with empty prefix");
1662
1663		// Should still have the same publishing restrictions
1664		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1665		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1666		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1667		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1668	}
1669
1670	#[tokio::test]
1671	async fn test_select_narrowing_to_deeper_path() {
1672		let origin = Origin::random().produce();
1673		let broadcast1 = Broadcast::new().produce();
1674		let broadcast2 = Broadcast::new().produce();
1675		let broadcast3 = Broadcast::new().produce();
1676
1677		// Producer with broad permission
1678		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1679
1680		// Publish at various depths
1681		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1682		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1683		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1684
1685		// Narrow down to team2 only
1686		let mut team2_consumer = limited_producer
1687			.consume()
1688			.scope(&["org/team2".into()])
1689			.expect("should create team2 consumer");
1690
1691		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1692		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1693
1694		// Further narrow down to team1/project1
1695		let mut project1_consumer = limited_producer
1696			.consume()
1697			.scope(&["org/team1/project1".into()])
1698			.expect("should create project1 consumer");
1699
1700		// Should only see project1 content at root
1701		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1702		project1_consumer.assert_next_wait();
1703	}
1704
1705	#[tokio::test]
1706	async fn test_select_with_non_matching_prefix() {
1707		let origin = Origin::random().produce();
1708
1709		// Producer with specific allowed paths
1710		let limited_producer = origin
1711			.scope(&["allowed/path".into()])
1712			.expect("should create limited producer");
1713
1714		// Trying to scope with a completely different prefix should return None
1715		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1716
1717		// Similarly for scope
1718		assert!(limited_producer.scope(&["other/path".into()]).is_none());
1719	}
1720
1721	// Regression test for https://github.com/moq-dev/moq/issues/910
1722	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1723	#[tokio::test]
1724	async fn test_with_root_trailing_slash_consumer() {
1725		let origin = Origin::random().produce();
1726
1727		// Use an owned String so the trailing slash is NOT normalized away.
1728		let prefix = "some_prefix/".to_string();
1729		let mut consumer = origin.consume().with_root(prefix).unwrap();
1730
1731		let b = origin.create_broadcast("some_prefix/test").unwrap();
1732		consumer.assert_next("test", &b.consume());
1733	}
1734
1735	// Same issue but for the producer side of with_root
1736	#[tokio::test]
1737	async fn test_with_root_trailing_slash_producer() {
1738		let origin = Origin::random().produce();
1739
1740		// Use an owned String so the trailing slash is NOT normalized away.
1741		let prefix = "some_prefix/".to_string();
1742		let rooted = origin.with_root(prefix).unwrap();
1743
1744		let b = rooted.create_broadcast("test").unwrap();
1745
1746		let mut consumer = rooted.consume();
1747		consumer.assert_next("test", &b.consume());
1748	}
1749
1750	// Verify unannounce also doesn't panic with trailing slash
1751	#[tokio::test]
1752	async fn test_with_root_trailing_slash_unannounce() {
1753		tokio::time::pause();
1754
1755		let origin = Origin::random().produce();
1756
1757		let prefix = "some_prefix/".to_string();
1758		let mut consumer = origin.consume().with_root(prefix).unwrap();
1759
1760		let b = origin.create_broadcast("some_prefix/test").unwrap();
1761		consumer.assert_next("test", &b.consume());
1762
1763		// Drop the broadcast producer to trigger unannounce
1764		drop(b);
1765		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1766
1767		// unannounce also calls strip_prefix(&self.root).unwrap()
1768		consumer.assert_next_none("test");
1769	}
1770
1771	#[tokio::test]
1772	async fn test_select_maintains_access_with_wider_prefix() {
1773		let origin = Origin::random().produce();
1774		let broadcast1 = Broadcast::new().produce();
1775		let broadcast2 = Broadcast::new().produce();
1776
1777		// Setup: user with root "demo" allowed to subscribe to specific paths
1778		let demo_producer = origin.with_root("demo").expect("should create demo root");
1779		let user_producer = demo_producer
1780			.scope(&["worm-node".into(), "foobar".into()])
1781			.expect("should create user producer");
1782
1783		// Publish some data
1784		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1785		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1786
1787		// Key test: scope with "" should maintain access to allowed roots
1788		let mut consumer = user_producer
1789			.consume()
1790			.scope(&["".into()])
1791			.expect("scope with empty prefix should not fail when user has specific permissions");
1792
1793		// Should still receive broadcasts from allowed paths (order not guaranteed)
1794		let a1 = consumer.try_announced().expect("expected first announcement");
1795		let a2 = consumer.try_announced().expect("expected second announcement");
1796		consumer.assert_next_wait();
1797
1798		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1799		paths.sort();
1800		assert_eq!(paths, ["foobar", "worm-node/data"]);
1801
1802		// Also test that we can still narrow the scope
1803		let mut narrow_consumer = user_producer
1804			.consume()
1805			.scope(&["worm-node".into()])
1806			.expect("should be able to narrow scope to worm-node");
1807
1808		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1809		narrow_consumer.assert_next_wait(); // Should not see foobar
1810	}
1811
1812	#[tokio::test]
1813	async fn test_duplicate_prefixes_deduped() {
1814		let origin = Origin::random().produce();
1815		let broadcast = Broadcast::new().produce();
1816
1817		// scope with duplicate prefixes should work (deduped internally)
1818		let producer = origin
1819			.scope(&["demo".into(), "demo".into()])
1820			.expect("should create producer");
1821
1822		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1823
1824		let mut consumer = producer.consume();
1825		consumer.assert_next("demo/stream", &broadcast.consume());
1826		consumer.assert_next_wait();
1827	}
1828
1829	#[tokio::test]
1830	async fn test_overlapping_prefixes_deduped() {
1831		let origin = Origin::random().produce();
1832		let broadcast = Broadcast::new().produce();
1833
1834		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1835		let producer = origin
1836			.scope(&["demo".into(), "demo/foo".into()])
1837			.expect("should create producer");
1838
1839		// Can still publish under "demo/bar" since "demo" covers everything
1840		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1841
1842		let mut consumer = producer.consume();
1843		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1844		consumer.assert_next_wait();
1845	}
1846
1847	#[tokio::test]
1848	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1849		let origin = Origin::random().produce();
1850		let broadcast = Broadcast::new().produce();
1851
1852		// Both "demo" and "demo/foo" are requested — should only have one node
1853		let producer = origin
1854			.scope(&["demo".into(), "demo/foo".into()])
1855			.expect("should create producer");
1856
1857		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1858
1859		let mut consumer = producer.consume();
1860		// Should only get ONE announcement (not two from overlapping nodes)
1861		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1862		consumer.assert_next_wait();
1863	}
1864
1865	#[tokio::test]
1866	async fn test_allowed_returns_deduped_prefixes() {
1867		let origin = Origin::random().produce();
1868
1869		let producer = origin
1870			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1871			.expect("should create producer");
1872
1873		let allowed: Vec<_> = producer.allowed().collect();
1874		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1875	}
1876
1877	#[tokio::test]
1878	async fn test_announced_broadcast_already_announced() {
1879		let origin = Origin::random().produce();
1880		let broadcast = Broadcast::new().produce();
1881
1882		origin.publish_broadcast("test", broadcast.consume());
1883
1884		let consumer = origin.consume();
1885		let result = consumer.announced_broadcast("test").await.expect("should find it");
1886		assert!(result.is_clone(&broadcast.consume()));
1887	}
1888
1889	#[tokio::test]
1890	async fn test_announced_broadcast_delayed() {
1891		tokio::time::pause();
1892
1893		let origin = Origin::random().produce();
1894		let broadcast = Broadcast::new().produce();
1895
1896		let consumer = origin.consume();
1897
1898		// Start waiting before it's announced.
1899		let wait = tokio::spawn({
1900			let consumer = consumer.clone();
1901			async move { consumer.announced_broadcast("test").await }
1902		});
1903
1904		// Give the spawned task a chance to subscribe.
1905		tokio::task::yield_now().await;
1906
1907		origin.publish_broadcast("test", broadcast.consume());
1908
1909		let result = wait.await.unwrap().expect("should find it");
1910		assert!(result.is_clone(&broadcast.consume()));
1911	}
1912
1913	#[tokio::test]
1914	async fn test_announced_broadcast_ignores_unrelated_paths() {
1915		tokio::time::pause();
1916
1917		let origin = Origin::random().produce();
1918		let other = Broadcast::new().produce();
1919		let target = Broadcast::new().produce();
1920
1921		let consumer = origin.consume();
1922
1923		let wait = tokio::spawn({
1924			let consumer = consumer.clone();
1925			async move { consumer.announced_broadcast("target").await }
1926		});
1927
1928		tokio::task::yield_now().await;
1929
1930		// Publish an unrelated broadcast first — announced_broadcast should skip it.
1931		origin.publish_broadcast("other", other.consume());
1932		tokio::task::yield_now().await;
1933		assert!(!wait.is_finished(), "must not resolve on unrelated path");
1934
1935		origin.publish_broadcast("target", target.consume());
1936		let result = wait.await.unwrap().expect("should find target");
1937		assert!(result.is_clone(&target.consume()));
1938	}
1939
1940	#[tokio::test]
1941	async fn test_announced_broadcast_skips_nested_paths() {
1942		tokio::time::pause();
1943
1944		let origin = Origin::random().produce();
1945		let nested = Broadcast::new().produce();
1946		let exact = Broadcast::new().produce();
1947
1948		let consumer = origin.consume();
1949
1950		let wait = tokio::spawn({
1951			let consumer = consumer.clone();
1952			async move { consumer.announced_broadcast("foo").await }
1953		});
1954
1955		tokio::task::yield_now().await;
1956
1957		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
1958		origin.publish_broadcast("foo/bar", nested.consume());
1959		tokio::task::yield_now().await;
1960		assert!(!wait.is_finished(), "must not resolve on a nested path");
1961
1962		origin.publish_broadcast("foo", exact.consume());
1963		let result = wait.await.unwrap().expect("should find foo exactly");
1964		assert!(result.is_clone(&exact.consume()));
1965	}
1966
1967	#[tokio::test]
1968	async fn test_announced_broadcast_disallowed() {
1969		let origin = Origin::random().produce();
1970		let limited = origin
1971			.consume()
1972			.scope(&["allowed".into()])
1973			.expect("should create limited");
1974
1975		// Path is outside allowed prefixes — should return None immediately.
1976		assert!(limited.announced_broadcast("notallowed").await.is_none());
1977	}
1978
1979	#[tokio::test]
1980	async fn test_announced_broadcast_scope_too_narrow() {
1981		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
1982		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
1983		let origin = Origin::random().produce();
1984		let limited = origin
1985			.consume()
1986			.scope(&["foo/specific".into()])
1987			.expect("should create limited");
1988
1989		// now_or_never so we fail fast instead of hanging if the guard regresses.
1990		let result = limited
1991			.announced_broadcast("foo")
1992			.now_or_never()
1993			.expect("must not block");
1994		assert!(result.is_none());
1995	}
1996
1997	// Coalescing tests: a slow consumer that doesn't drain between updates
1998	// should observe a bounded number of deliveries.
1999
2000	#[tokio::test]
2001	async fn test_coalesce_announce_then_unannounce() {
2002		// announce + unannounce that the consumer hasn't observed yet collapses to nothing.
2003		tokio::time::pause();
2004
2005		let origin = Origin::random().produce();
2006		let mut consumer = origin.consume();
2007
2008		let broadcast = Broadcast::new().produce();
2009		origin.publish_broadcast("test", broadcast.consume());
2010		drop(broadcast);
2011
2012		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2013
2014		consumer.assert_next_wait();
2015	}
2016
2017	#[tokio::test]
2018	async fn test_coalesce_announce_unannounce_announce() {
2019		// announce, unannounce, announce that the consumer hasn't drained collapses
2020		// to a single Announce of the latest broadcast.
2021		tokio::time::pause();
2022
2023		let origin = Origin::random().produce();
2024		let mut consumer = origin.consume();
2025
2026		let broadcast1 = Broadcast::new().produce();
2027		let broadcast2 = Broadcast::new().produce();
2028
2029		origin.publish_broadcast("test", broadcast1.consume());
2030		drop(broadcast1);
2031		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2032		origin.publish_broadcast("test", broadcast2.consume());
2033
2034		consumer.assert_next("test", &broadcast2.consume());
2035		consumer.assert_next_wait();
2036	}
2037
2038	#[tokio::test]
2039	async fn test_coalesce_unannounce_announce_preserved() {
2040		// unannounce followed by announce of a different broadcast must be preserved
2041		// as two deliveries so the consumer learns the origin changed.
2042		tokio::time::pause();
2043
2044		let origin = Origin::random().produce();
2045		let broadcast1 = Broadcast::new().produce();
2046		origin.publish_broadcast("test", broadcast1.consume());
2047
2048		let mut consumer = origin.consume();
2049		consumer.assert_next("test", &broadcast1.consume());
2050
2051		// Drop, then publish a fresh broadcast at the same path.
2052		drop(broadcast1);
2053		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2054
2055		let broadcast2 = Broadcast::new().produce();
2056		origin.publish_broadcast("test", broadcast2.consume());
2057
2058		// The consumer must see the unannounce before the new announce.
2059		consumer.assert_next_none("test");
2060		consumer.assert_next("test", &broadcast2.consume());
2061		consumer.assert_next_wait();
2062	}
2063
2064	#[tokio::test]
2065	async fn test_coalesce_unannounce_announce_unannounce() {
2066		// unannounce + announce + unannounce collapses to a single unannounce: the
2067		// embedded announce was never observed.
2068		tokio::time::pause();
2069
2070		let origin = Origin::random().produce();
2071		let broadcast1 = Broadcast::new().produce();
2072		origin.publish_broadcast("test", broadcast1.consume());
2073
2074		let mut consumer = origin.consume();
2075		consumer.assert_next("test", &broadcast1.consume());
2076
2077		drop(broadcast1);
2078		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2079
2080		let broadcast2 = Broadcast::new().produce();
2081		origin.publish_broadcast("test", broadcast2.consume());
2082		drop(broadcast2);
2083		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2084
2085		consumer.assert_next_none("test");
2086		consumer.assert_next_wait();
2087	}
2088
2089	#[tokio::test]
2090	async fn test_coalesce_churn_bounded() {
2091		// A churn loop on a single path should keep the pending set bounded.
2092		// Backup promotion during cleanup can leave the consumer with zero or one
2093		// pending update for "test" depending on the order tasks run; we only
2094		// require that churn doesn't accumulate across iterations.
2095		tokio::time::pause();
2096
2097		let origin = Origin::random().produce();
2098		let mut consumer = origin.consume();
2099
2100		for _ in 0..1000 {
2101			let broadcast = Broadcast::new().produce();
2102			origin.publish_broadcast("test", broadcast.consume());
2103			drop(broadcast);
2104		}
2105		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2106
2107		let mut collected = Vec::new();
2108		while let Some(update) = consumer.try_announced() {
2109			collected.push(update);
2110		}
2111		assert!(
2112			collected.len() <= 1,
2113			"expected at most one pending update, got {}",
2114			collected.len()
2115		);
2116		assert!(
2117			collected.iter().all(|(path, _)| path == &Path::new("test")),
2118			"unexpected path in pending updates",
2119		);
2120	}
2121}