Skip to main content

moq_net/model/
origin.rs

1use std::{
2	collections::{BTreeMap, HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5	task::{Poll, ready},
6};
7
8use rand::RngExt;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	///
38	/// TEMPORARY: the wire format allows 62 bits, but older `@moq/lite` JS
39	/// clients decode `AnnounceInterest.exclude_hop` as a u53 (number) and
40	/// throw on anything > 2^53-1. To keep those clients alive against
41	/// fresh relays, we cap the random id at 53 bits. Restore to 62 bits
42	/// once the JS u62 fix has propagated to deployed bundles.
43	pub fn random() -> Self {
44		let mut rng = rand::rng();
45		let id = rng.random_range(1..(1u64 << 53));
46		Self { id }
47	}
48
49	/// Consume this [Origin] to create a producer that carries its id.
50	pub fn produce(self) -> OriginProducer {
51		OriginProducer::new(self)
52	}
53}
54
55impl From<u64> for Origin {
56	fn from(id: u64) -> Self {
57		Self { id }
58	}
59}
60
61impl fmt::Display for Origin {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		self.id.fmt(f)
64	}
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69	u64: Encode<V>,
70{
71	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72		self.id.encode(w, version)
73	}
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78	u64: Decode<V>,
79{
80	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81		let id = u64::decode(r, version)?;
82		if id >= 1u64 << 62 {
83			return Err(DecodeError::InvalidValue);
84		}
85		Ok(Self { id })
86	}
87}
88
89/// Maximum number of origins (hops) an [`OriginList`] can hold.
90///
91/// Caps pathological or loop-induced announcements at a reasonable cluster
92/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
93/// silently truncating.
94pub(crate) const MAX_HOPS: usize = 32;
95
96/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
97///
98/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
99/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111		write!(f, "too many origins (max {MAX_HOPS})")
112	}
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118	fn from(_: TooManyOrigins) -> Self {
119		DecodeError::BoundsExceeded
120	}
121}
122
123impl OriginList {
124	/// Create an empty list.
125	pub fn new() -> Self {
126		Self(Vec::new())
127	}
128
129	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
130	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131		if self.0.len() >= MAX_HOPS {
132			return Err(TooManyOrigins);
133		}
134		self.0.push(origin);
135		Ok(())
136	}
137
138	/// Replace the first entry equal to `target` with `replacement`, returning
139	/// true if a match was found. The length is unchanged.
140	pub fn replace_first(&mut self, target: Origin, replacement: Origin) -> bool {
141		for entry in &mut self.0 {
142			if *entry == target {
143				*entry = replacement;
144				return true;
145			}
146		}
147		false
148	}
149
150	/// Returns true if any entry matches `origin`.
151	pub fn contains(&self, origin: &Origin) -> bool {
152		self.0.contains(origin)
153	}
154
155	/// Number of entries currently in the list (always `<= MAX_HOPS`).
156	pub fn len(&self) -> usize {
157		self.0.len()
158	}
159
160	/// Whether the list contains no entries.
161	pub fn is_empty(&self) -> bool {
162		self.0.is_empty()
163	}
164
165	/// Iterate over the entries in hop order (oldest first).
166	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
167		self.0.iter()
168	}
169
170	/// Borrow the entries as a slice.
171	pub fn as_slice(&self) -> &[Origin] {
172		&self.0
173	}
174}
175
176impl TryFrom<Vec<Origin>> for OriginList {
177	type Error = TooManyOrigins;
178
179	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
180		if v.len() > MAX_HOPS {
181			return Err(TooManyOrigins);
182		}
183		Ok(Self(v))
184	}
185}
186
187impl<'a> IntoIterator for &'a OriginList {
188	type Item = &'a Origin;
189	type IntoIter = std::slice::Iter<'a, Origin>;
190
191	fn into_iter(self) -> Self::IntoIter {
192		self.iter()
193	}
194}
195
196impl<V: Copy> Encode<V> for OriginList
197where
198	u64: Encode<V>,
199	Origin: Encode<V>,
200{
201	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
202		(self.0.len() as u64).encode(w, version)?;
203		for origin in &self.0 {
204			origin.encode(w, version)?;
205		}
206		Ok(())
207	}
208}
209
210impl<V: Copy> Decode<V> for OriginList
211where
212	u64: Decode<V>,
213	Origin: Decode<V>,
214{
215	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
216		let count = u64::decode(r, version)? as usize;
217		if count > MAX_HOPS {
218			return Err(DecodeError::BoundsExceeded);
219		}
220		let mut list = Vec::with_capacity(count);
221		for _ in 0..count {
222			list.push(Origin::decode(r, version)?);
223		}
224		Ok(Self(list))
225	}
226}
227
228static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231struct ConsumerId(u64);
232
233impl ConsumerId {
234	fn new() -> Self {
235		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
236	}
237}
238
239// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
240struct OriginBroadcast {
241	path: PathOwned,
242	active: BroadcastConsumer,
243	backup: VecDeque<BroadcastConsumer>,
244}
245
246/// Ordering key used to pick the active route among broadcasts at the same path.
247///
248/// Lower wins. Shorter hop chains sort first; equal-length chains are broken by a
249/// deterministic hash of the broadcast name and hop chain, so every node in the
250/// cluster, given the same candidate routes, converges on the same winner instead
251/// of relying on arrival order. Mixing the name in spreads equal-length routes
252/// across different upstreams rather than funneling every broadcast onto one.
253fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
254	// FNV-1a, not the std hasher: its output is fixed across Rust versions and
255	// builds, which matters when nodes run mismatched binaries during a rolling
256	// deploy and still need to agree on the same route. SEED is a custom basis
257	// (any nonzero u64 works, the textbook one is just as arbitrary); FNV_PRIME is
258	// the standard FNV-64 prime and should stay put.
259	const SEED: u64 = 0x420C0DECB00B; // 420 C0DEC B00B
260	const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
261
262	let mut hash = SEED;
263	for &byte in name.as_str().as_bytes() {
264		hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
265	}
266	for hop in hops {
267		for &byte in &hop.id.to_le_bytes() {
268			hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
269		}
270	}
271
272	(hops.len(), hash)
273}
274
275/// One coalesced update queued for an `OriginConsumer`.
276///
277/// At most one entry exists per path, so a slow consumer's pending set is bounded
278/// by the number of distinct paths. `UnannounceAnnounce` preserves the
279/// signal that the broadcast at a path was replaced (the consumer must see
280/// `(path, None)` before `(path, Some(new))`), while a stale
281/// `Announce` cancels with a subsequent `unannounce` because the consumer
282/// has not yet observed it.
283enum PendingUpdate {
284	Announce(BroadcastConsumer),
285	Unannounce,
286	UnannounceAnnounce(BroadcastConsumer),
287}
288
289/// Pending updates keyed by path. `BTreeMap` keeps memory strictly bounded by
290/// the number of distinct paths with outstanding work (collapsed pairs are
291/// fully erased) and gives a deterministic lexicographic delivery order so
292/// tests can predict it.
293#[derive(Default)]
294struct OriginConsumerState {
295	pending: BTreeMap<PathOwned, PendingUpdate>,
296}
297
298impl OriginConsumerState {
299	fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
300		let new = match self.pending.remove(&path) {
301			// First announce, or a stale announce being replaced.
302			None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
303			// Consumer needs to observe the unannounce before this announce.
304			Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
305				PendingUpdate::UnannounceAnnounce(broadcast)
306			}
307		};
308		self.pending.insert(path, new);
309	}
310
311	fn apply_unannounce(&mut self, path: PathOwned) {
312		match self.pending.remove(&path) {
313			// Consumer has not seen the pending announce; drop both entirely.
314			Some(PendingUpdate::Announce(_)) => {}
315			None | Some(PendingUpdate::Unannounce) => {
316				self.pending.insert(path, PendingUpdate::Unannounce);
317			}
318			// The embedded announce cancels with this unannounce; the consumer
319			// still needs the leading unannounce.
320			Some(PendingUpdate::UnannounceAnnounce(_)) => {
321				self.pending.insert(path, PendingUpdate::Unannounce);
322			}
323		}
324	}
325
326	/// Take one update to deliver to the consumer, if any.
327	fn take(&mut self) -> Option<OriginAnnounce> {
328		let path = self.pending.keys().next()?.clone();
329		match self.pending.remove(&path).unwrap() {
330			PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
331			PendingUpdate::Unannounce => Some((path, None)),
332			PendingUpdate::UnannounceAnnounce(broadcast) => {
333				// Deliver the unannounce now; leave the trailing announce pending so
334				// the next take returns it for the same path.
335				self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
336				Some((path, None))
337			}
338		}
339	}
340}
341
342#[derive(Clone)]
343struct OriginConsumerNotify {
344	root: PathOwned,
345	state: kio::Producer<OriginConsumerState>,
346}
347
348impl OriginConsumerNotify {
349	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
350		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
351		self.state
352			.write()
353			.ok()
354			.expect("consumer closed")
355			.apply_announce(path, broadcast);
356	}
357
358	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
359		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
360		let mut state = self.state.write().ok().expect("consumer closed");
361		state.apply_unannounce(path.clone());
362		state.apply_announce(path, broadcast);
363	}
364
365	fn unannounce(&self, path: impl AsPath) {
366		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
367		self.state.write().ok().expect("consumer closed").apply_unannounce(path);
368	}
369}
370
371struct NotifyNode {
372	parent: Option<Lock<NotifyNode>>,
373
374	// Consumers that are subscribed to this node.
375	// We store a consumer ID so we can remove it easily when it closes.
376	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
377}
378
379impl NotifyNode {
380	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
381		Self {
382			parent,
383			consumers: HashMap::new(),
384		}
385	}
386
387	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
388		for consumer in self.consumers.values() {
389			consumer.announce(path.as_path(), broadcast.clone());
390		}
391
392		if let Some(parent) = &self.parent {
393			parent.lock().announce(path, broadcast);
394		}
395	}
396
397	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
398		for consumer in self.consumers.values() {
399			consumer.reannounce(path.as_path(), broadcast.clone());
400		}
401
402		if let Some(parent) = &self.parent {
403			parent.lock().reannounce(path, broadcast);
404		}
405	}
406
407	fn unannounce(&mut self, path: impl AsPath) {
408		for consumer in self.consumers.values() {
409			consumer.unannounce(path.as_path());
410		}
411
412		if let Some(parent) = &self.parent {
413			parent.lock().unannounce(path);
414		}
415	}
416}
417
418struct OriginNode {
419	// The broadcast that is published to this node.
420	broadcast: Option<OriginBroadcast>,
421
422	// Nested nodes, one level down the tree.
423	nested: HashMap<String, Lock<OriginNode>>,
424
425	// Unfortunately, to notify consumers we need to traverse back up the tree.
426	notify: Lock<NotifyNode>,
427}
428
429impl OriginNode {
430	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
431		Self {
432			broadcast: None,
433			nested: HashMap::new(),
434			notify: Lock::new(NotifyNode::new(parent)),
435		}
436	}
437
438	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
439		let (dir, rest) = path.next_part().expect("leaf called with empty path");
440
441		let next = self.entry(dir);
442		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
443	}
444
445	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
446		match self.nested.get(dir) {
447			Some(next) => next.clone(),
448			None => {
449				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
450				self.nested.insert(dir.to_string(), next.clone());
451				next
452			}
453		}
454	}
455
456	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
457		let full = full.as_path();
458		let rest = relative.as_path();
459
460		// If the path has a directory component, then publish it to the nested node.
461		if let Some((dir, relative)) = rest.next_part() {
462			// Not using entry to avoid allocating a string most of the time.
463			self.entry(dir).lock().publish(&full, broadcast, &relative);
464		} else if let Some(existing) = &mut self.broadcast {
465			// This node is a leaf with an existing broadcast. Prefer the route with the
466			// lower ordering key (shorter hop chain, deterministic hash on ties), so every
467			// node converges on the same route regardless of the order announces arrive.
468			//
469			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
470			// backup queue can't accumulate clones of the active entry and trigger redundant
471			// reannouncements when a peer churns.
472			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
473				return;
474			}
475
476			if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
477				let old = existing.active.clone();
478				existing.active = broadcast.clone();
479				existing.backup.push_back(old);
480
481				self.notify.lock().reannounce(full, broadcast);
482			} else {
483				// Loses the ordering (longer path, or the tie-break): keep as a backup
484				// in case the active one drops.
485				existing.backup.push_back(broadcast.clone());
486			}
487		} else {
488			// This node is a leaf with no existing broadcast.
489			self.broadcast = Some(OriginBroadcast {
490				path: full.to_owned(),
491				active: broadcast.clone(),
492				backup: VecDeque::new(),
493			});
494			self.notify.lock().announce(full, broadcast);
495		}
496	}
497
498	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
499		self.consume_initial(&mut notify);
500		self.notify.lock().consumers.insert(id, notify);
501	}
502
503	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
504		if let Some(broadcast) = &self.broadcast {
505			notify.announce(&broadcast.path, broadcast.active.clone());
506		}
507
508		// Recursively subscribe to all nested nodes.
509		for nested in self.nested.values() {
510			nested.lock().consume_initial(notify);
511		}
512	}
513
514	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
515		let rest = rest.as_path();
516
517		if let Some((dir, rest)) = rest.next_part() {
518			let node = self.nested.get(dir)?.lock();
519			node.consume_broadcast(&rest)
520		} else {
521			self.broadcast.as_ref().map(|b| b.active.clone())
522		}
523	}
524
525	fn unconsume(&mut self, id: ConsumerId) {
526		self.notify.lock().consumers.remove(&id).expect("consumer not found");
527		if self.is_empty() {
528			//tracing::warn!("TODO: empty node; memory leak");
529			// This happens when consuming a path that is not being broadcasted.
530		}
531	}
532
533	// Returns true if the broadcast should be unannounced.
534	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
535		let full = full.as_path();
536		let relative = relative.as_path();
537
538		if let Some((dir, relative)) = relative.next_part() {
539			let nested = self.entry(dir);
540			let mut locked = nested.lock();
541			locked.remove(&full, broadcast, &relative);
542
543			if locked.is_empty() {
544				drop(locked);
545				self.nested.remove(dir);
546			}
547		} else {
548			let entry = match &mut self.broadcast {
549				Some(existing) => existing,
550				None => return,
551			};
552
553			// See if we can remove the broadcast from the backup list.
554			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
555			if let Some(pos) = pos {
556				entry.backup.remove(pos);
557				// Nothing else to do
558				return;
559			}
560
561			// Okay so it must be the active broadcast or else we fucked up.
562			assert!(entry.active.is_clone(&broadcast));
563
564			// Promote the backup with the lowest ordering key, the same rule used when
565			// publishing, so the route a node heals to still matches its peers.
566			let best = entry
567				.backup
568				.iter()
569				.enumerate()
570				.min_by_key(|(_, b)| route_key(&full, &b.hops))
571				.map(|(i, _)| i);
572			if let Some(idx) = best {
573				let active = entry.backup.remove(idx).expect("index in range");
574				entry.active = active;
575				self.notify.lock().reannounce(full, &entry.active);
576			} else {
577				// No more backups, so remove the entry.
578				self.broadcast = None;
579				self.notify.lock().unannounce(full);
580			}
581		}
582	}
583
584	fn is_empty(&self) -> bool {
585		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
586	}
587}
588
589#[derive(Clone)]
590struct OriginNodes {
591	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
592}
593
594impl OriginNodes {
595	// Returns nested roots that match the prefixes.
596	// PathPrefixes guarantees no duplicates or overlapping prefixes.
597	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
598		let mut roots = Vec::new();
599
600		for (root, state) in &self.nodes {
601			for prefix in prefixes {
602				if root.has_prefix(prefix) {
603					// Keep the existing node if we're allowed to access it.
604					roots.push((root.to_owned(), state.clone()));
605					continue;
606				}
607
608				if let Some(suffix) = prefix.strip_prefix(root) {
609					// If the requested prefix is larger than the allowed prefix, then we further scope it.
610					let nested = state.lock().leaf(&suffix);
611					roots.push((prefix.to_owned(), nested));
612				}
613			}
614		}
615
616		if roots.is_empty() {
617			None
618		} else {
619			Some(Self { nodes: roots })
620		}
621	}
622
623	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
624		let new_root = new_root.as_path();
625		let mut roots = Vec::new();
626
627		if new_root.is_empty() {
628			return Some(self.clone());
629		}
630
631		for (root, state) in &self.nodes {
632			if let Some(suffix) = root.strip_prefix(&new_root) {
633				// If the old root is longer than the new root, shorten the keys.
634				roots.push((suffix.to_owned(), state.clone()));
635			} else if let Some(suffix) = new_root.strip_prefix(root) {
636				// If the new root is longer than the old root, add a new root.
637				// NOTE: suffix can't be empty
638				let nested = state.lock().leaf(&suffix);
639				roots.push(("".into(), nested));
640			}
641		}
642
643		if roots.is_empty() {
644			None
645		} else {
646			Some(Self { nodes: roots })
647		}
648	}
649
650	// Returns the root that has this prefix.
651	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
652		let path = path.as_path();
653
654		for (root, state) in &self.nodes {
655			if let Some(suffix) = path.strip_prefix(root) {
656				return Some((state.clone(), suffix.to_owned()));
657			}
658		}
659
660		None
661	}
662}
663
664impl Default for OriginNodes {
665	fn default() -> Self {
666		Self {
667			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
668		}
669	}
670}
671
672/// A broadcast path and its associated consumer, or None if closed.
673pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
674
675/// Announces broadcasts to consumers over the network.
676#[derive(Clone)]
677pub struct OriginProducer {
678	// Identity for this origin. Appended to broadcast hops when
679	// re-announcing so downstream relays can detect loops and prefer the
680	// shortest path.
681	info: Origin,
682
683	// The roots of the tree that we are allowed to publish.
684	// A path of "" means we can publish anything.
685	nodes: OriginNodes,
686
687	// The prefix that is automatically stripped from all paths.
688	root: PathOwned,
689}
690
691impl std::ops::Deref for OriginProducer {
692	type Target = Origin;
693
694	fn deref(&self) -> &Self::Target {
695		&self.info
696	}
697}
698
699impl OriginProducer {
700	/// Build a producer for the given origin id with no scoped prefix and no
701	/// pre-existing broadcasts. Prefer [`Origin::produce`].
702	pub fn new(info: Origin) -> Self {
703		Self {
704			info,
705			nodes: OriginNodes::default(),
706			root: PathOwned::default(),
707		}
708	}
709
710	/// Create and publish a new broadcast, returning the producer.
711	///
712	/// This is a helper method when you only want to publish a broadcast to a single origin.
713	/// Returns [None] if the broadcast is not allowed to be published.
714	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
715		let broadcast = Broadcast::new().produce();
716		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
717	}
718
719	/// Publish a broadcast, announcing it to all consumers.
720	///
721	/// The broadcast will be unannounced when it is closed.
722	/// If there is already a broadcast with the same path, the new one replaces the active only
723	/// if it has a shorter hop path, or an equal-length path that wins a deterministic tie-break
724	/// (a hash of the broadcast name and hop chain); otherwise it is queued as a backup. The
725	/// tie-break is identical on every node, so a cluster converges on the same route.
726	/// When the active broadcast closes, the backup that wins the same ordering is promoted and
727	/// reannounced. Backups that close before being promoted are silently dropped.
728	///
729	/// Returns false if the broadcast is not allowed to be published.
730	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
731		let path = path.as_path();
732
733		// Loop detection: refuse broadcasts whose hop chain already contains our id.
734		if broadcast.hops.contains(&self.info) {
735			return false;
736		}
737
738		let (root, rest) = match self.nodes.get(&path) {
739			Some(root) => root,
740			None => return false,
741		};
742
743		let full = self.root.join(&path);
744
745		root.lock().publish(&full, &broadcast, &rest);
746		let root = root.clone();
747
748		web_async::spawn(async move {
749			broadcast.closed().await;
750			root.lock().remove(&full, broadcast, &rest);
751		});
752
753		true
754	}
755
756	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
757	///
758	/// Returns None if there are no legal prefixes (the requested prefixes are
759	/// disjoint from this producer's current scope).
760	// TODO accept PathPrefixes instead of &[Path]
761	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
762		let prefixes = PathPrefixes::new(prefixes);
763		Some(OriginProducer {
764			info: self.info,
765			nodes: self.nodes.select(&prefixes)?,
766			root: self.root.clone(),
767		})
768	}
769
770	/// Subscribe to all announced broadcasts.
771	pub fn consume(&self) -> OriginConsumer {
772		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
773	}
774
775	/// Get a broadcast by path if it has *already* been published.
776	///
777	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
778	/// announcement-cursor allocation, which is currently relatively expensive.
779	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
780	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
781		let path = path.as_path();
782		let (root, rest) = self.nodes.get(&path)?;
783		let state = root.lock();
784		state.consume_broadcast(&rest)
785	}
786
787	/// Returns a new OriginProducer that automatically strips out the provided prefix.
788	///
789	/// Returns None if the provided root is not authorized; when [`Self::scope`]
790	/// was already used without a wildcard.
791	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
792		let prefix = prefix.as_path();
793
794		Some(Self {
795			info: self.info,
796			root: self.root.join(&prefix).to_owned(),
797			nodes: self.nodes.root(&prefix)?,
798		})
799	}
800
801	/// Returns the root that is automatically stripped from all paths.
802	pub fn root(&self) -> &Path<'_> {
803		&self.root
804	}
805
806	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
807	// TODO return PathPrefixes
808	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
809		self.nodes.nodes.iter().map(|(root, _)| root)
810	}
811
812	/// Converts a relative path to an absolute path.
813	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
814		self.root.join(path)
815	}
816}
817
818/// Consumes announced broadcasts matching against an optional prefix.
819///
820/// NOTE: Clone is expensive, try to avoid it.
821pub struct OriginConsumer {
822	id: ConsumerId,
823	// Identity of the origin this consumer was derived from.
824	info: Origin,
825	nodes: OriginNodes,
826
827	// Pending updates queued for this consumer. Coalesced so a slow consumer
828	// can't accumulate redundant announce/unannounce pairs.
829	state: kio::Producer<OriginConsumerState>,
830
831	// A prefix that is automatically stripped from all paths.
832	root: PathOwned,
833}
834
835impl std::ops::Deref for OriginConsumer {
836	type Target = Origin;
837
838	fn deref(&self) -> &Self::Target {
839		&self.info
840	}
841}
842
843impl OriginConsumer {
844	fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
845		let state = kio::Producer::<OriginConsumerState>::default();
846		let id = ConsumerId::new();
847
848		for (_, node) in &nodes.nodes {
849			let notify = OriginConsumerNotify {
850				root: root.clone(),
851				state: state.clone(),
852			};
853			node.lock().consume(id, notify);
854		}
855
856		Self {
857			id,
858			info,
859			nodes,
860			state,
861			root,
862		}
863	}
864
865	/// Returns the next (un)announced broadcast and the absolute path.
866	///
867	/// The broadcast will only be announced if it was previously unannounced.
868	/// The same path won't be announced/unannounced twice, instead it will toggle.
869	/// Returns None if the consumer is closed.
870	///
871	/// Note: The returned path is absolute and will always match this consumer's prefix.
872	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
873		kio::wait(|waiter| self.poll_announced(waiter)).await
874	}
875
876	/// Poll for the next (un)announced broadcast, without blocking.
877	///
878	/// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the
879	/// consumer is closed, or `Poll::Pending` after registering `waiter` to be
880	/// notified when the next update arrives.
881	pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
882		let mut state = match ready!(self.state.poll(waiter, |state| {
883			if state.pending.is_empty() {
884				Poll::Pending
885			} else {
886				Poll::Ready(())
887			}
888		})) {
889			Ok(state) => state,
890			// Closed: discard the Ref so its MutexGuard doesn't escape this call.
891			Err(_) => return Poll::Ready(None),
892		};
893		Poll::Ready(Some(state.take().expect("predicate guaranteed an update")))
894	}
895
896	/// Returns the next (un)announced broadcast and the absolute path without blocking.
897	///
898	/// Returns None if there is no update available; NOT because the consumer is closed.
899	/// You have to use `is_closed` to check if the consumer is closed.
900	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
901		self.state.write().ok()?.take()
902	}
903
904	/// Create another consumer with its own announcement cursor over the same origin.
905	pub fn consume(&self) -> Self {
906		self.clone()
907	}
908
909	/// Get a broadcast by path if it has *already* been announced.
910	///
911	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
912	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
913	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
914	/// (blocks until announced) unless you can guarantee the announcement has already
915	/// landed (e.g. you're responding to an `announced()` callback).
916	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
917		let path = path.as_path();
918		let (root, rest) = self.nodes.get(&path)?;
919		let state = root.lock();
920		state.consume_broadcast(&rest)
921	}
922
923	/// Block until a broadcast with the given path is announced and return it.
924	///
925	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
926	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
927	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
928	///
929	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
930	/// cannot guarantee the announcement has already been received.
931	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
932		let path = path.as_path();
933
934		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
935		let mut consumer = self.scope(std::slice::from_ref(&path))?;
936
937		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
938		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
939		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
940		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
941			return None;
942		}
943
944		loop {
945			let (announced, broadcast) = consumer.announced().await?;
946			// `scope` narrows by prefix, but we only want an exact-path match.
947			if announced.as_path() == path {
948				if let Some(broadcast) = broadcast {
949					return Some(broadcast);
950				}
951			}
952		}
953	}
954
955	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
956	///
957	/// Returns None if there are no legal prefixes (the requested prefixes are
958	/// disjoint from this consumer's current scope, so it would always return None).
959	// TODO accept PathPrefixes instead of &[Path]
960	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
961		let prefixes = PathPrefixes::new(prefixes);
962		Some(OriginConsumer::new(
963			self.info,
964			self.root.clone(),
965			self.nodes.select(&prefixes)?,
966		))
967	}
968
969	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
970	///
971	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
972	/// already used without a wildcard.
973	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
974		let prefix = prefix.as_path();
975
976		Some(Self::new(
977			self.info,
978			self.root.join(&prefix).to_owned(),
979			self.nodes.root(&prefix)?,
980		))
981	}
982
983	/// Returns the prefix that is automatically stripped from all paths.
984	pub fn root(&self) -> &Path<'_> {
985		&self.root
986	}
987
988	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
989	// TODO return PathPrefixes
990	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
991		self.nodes.nodes.iter().map(|(root, _)| root)
992	}
993
994	/// Converts a relative path to an absolute path.
995	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
996		self.root.join(path)
997	}
998}
999
1000impl Drop for OriginConsumer {
1001	fn drop(&mut self) {
1002		for (_, root) in &self.nodes.nodes {
1003			root.lock().unconsume(self.id);
1004		}
1005	}
1006}
1007
1008impl Clone for OriginConsumer {
1009	fn clone(&self) -> Self {
1010		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
1011	}
1012}
1013
1014#[cfg(test)]
1015use futures::FutureExt;
1016
1017#[cfg(test)]
1018impl OriginConsumer {
1019	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1020		let expected = expected.as_path();
1021		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1022		assert_eq!(path, expected, "wrong path");
1023		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1024	}
1025
1026	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1027		let expected = expected.as_path();
1028		let (path, active) = self.try_announced().expect("no next");
1029		assert_eq!(path, expected, "wrong path");
1030		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1031	}
1032
1033	pub fn assert_next_none(&mut self, expected: impl AsPath) {
1034		let expected = expected.as_path();
1035		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1036		assert_eq!(path, expected, "wrong path");
1037		assert!(active.is_none(), "should be unannounced");
1038	}
1039
1040	pub fn assert_next_wait(&mut self) {
1041		if let Some(res) = self.announced().now_or_never() {
1042			panic!("next should block: got {:?}", res.map(|(path, _)| path));
1043		}
1044	}
1045
1046	/*
1047	pub fn assert_next_closed(&mut self) {
1048		assert!(
1049			self.announced().now_or_never().expect("next blocked").is_none(),
1050			"next should be closed"
1051		);
1052	}
1053	*/
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058	use crate::Broadcast;
1059
1060	use super::*;
1061
1062	#[test]
1063	fn origin_list_push_fails_at_limit() {
1064		let mut list = OriginList::new();
1065		for _ in 0..MAX_HOPS {
1066			list.push(Origin::random()).unwrap();
1067		}
1068		assert_eq!(list.len(), MAX_HOPS);
1069		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1070	}
1071
1072	#[test]
1073	fn origin_list_replace_first() {
1074		let mut list = OriginList::new();
1075		for _ in 0..3 {
1076			list.push(Origin::UNKNOWN).unwrap();
1077		}
1078
1079		// Rewrites only the first placeholder, keeping the length the same.
1080		assert!(list.replace_first(Origin::UNKNOWN, Origin::from(7)));
1081		assert_eq!(list.as_slice(), &[Origin::from(7), Origin::UNKNOWN, Origin::UNKNOWN]);
1082
1083		// No match leaves the list untouched.
1084		assert!(!list.replace_first(Origin::from(99), Origin::from(8)));
1085		assert_eq!(list.len(), 3);
1086	}
1087
1088	#[test]
1089	fn origin_list_try_from_vec_enforces_limit() {
1090		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1091		assert!(OriginList::try_from(under).is_ok());
1092
1093		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1094		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1095	}
1096
1097	#[tokio::test]
1098	async fn test_announce() {
1099		tokio::time::pause();
1100
1101		let origin = Origin::random().produce();
1102		let broadcast1 = Broadcast::new().produce();
1103		let broadcast2 = Broadcast::new().produce();
1104
1105		let mut consumer1 = origin.consume();
1106		// Make a new consumer that should get it.
1107		consumer1.assert_next_wait();
1108
1109		// Publish the first broadcast.
1110		origin.publish_broadcast("test1", broadcast1.consume());
1111
1112		consumer1.assert_next("test1", &broadcast1.consume());
1113		consumer1.assert_next_wait();
1114
1115		// Make a new consumer that should get the existing broadcast.
1116		// But we don't consume it yet.
1117		let mut consumer2 = origin.consume();
1118
1119		// Publish the second broadcast.
1120		origin.publish_broadcast("test2", broadcast2.consume());
1121
1122		consumer1.assert_next("test2", &broadcast2.consume());
1123		consumer1.assert_next_wait();
1124
1125		consumer2.assert_next("test1", &broadcast1.consume());
1126		consumer2.assert_next("test2", &broadcast2.consume());
1127		consumer2.assert_next_wait();
1128
1129		// Close the first broadcast.
1130		drop(broadcast1);
1131
1132		// Wait for the async task to run.
1133		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1134
1135		// All consumers should get a None now.
1136		consumer1.assert_next_none("test1");
1137		consumer2.assert_next_none("test1");
1138		consumer1.assert_next_wait();
1139		consumer2.assert_next_wait();
1140
1141		// And a new consumer only gets the last broadcast.
1142		let mut consumer3 = origin.consume();
1143		consumer3.assert_next("test2", &broadcast2.consume());
1144		consumer3.assert_next_wait();
1145
1146		// Close the other producer and make sure it cleans up
1147		drop(broadcast2);
1148
1149		// Wait for the async task to run.
1150		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1151
1152		consumer1.assert_next_none("test2");
1153		consumer2.assert_next_none("test2");
1154		consumer3.assert_next_none("test2");
1155
1156		/* TODO close the origin consumer when the producer is dropped
1157		consumer1.assert_next_closed();
1158		consumer2.assert_next_closed();
1159		consumer3.assert_next_closed();
1160		*/
1161	}
1162
1163	#[tokio::test]
1164	async fn test_duplicate() {
1165		tokio::time::pause();
1166
1167		let origin = Origin::random().produce();
1168
1169		let broadcast1 = Broadcast::new().produce();
1170		let broadcast2 = Broadcast::new().produce();
1171		let broadcast3 = Broadcast::new().produce();
1172
1173		let consumer1 = broadcast1.consume();
1174		let consumer2 = broadcast2.consume();
1175		let consumer3 = broadcast3.consume();
1176
1177		let mut consumer = origin.consume();
1178
1179		origin.publish_broadcast("test", consumer1.clone());
1180		origin.publish_broadcast("test", consumer2.clone());
1181		origin.publish_broadcast("test", consumer3.clone());
1182		assert!(consumer.get_broadcast("test").is_some());
1183
1184		// Identical (empty) hop chains tie on the deterministic key, so the first publish
1185		// stays active and the rest queue as backups. No churn, no reannounce.
1186		consumer.assert_next("test", &consumer1);
1187		consumer.assert_next_wait();
1188
1189		// Drop a backup, nothing should change.
1190		drop(broadcast2);
1191
1192		// Wait for the async task to run.
1193		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1194
1195		assert!(consumer.get_broadcast("test").is_some());
1196		consumer.assert_next_wait();
1197
1198		// Drop the active, we should reannounce with the remaining backup.
1199		drop(broadcast1);
1200
1201		// Wait for the async task to run.
1202		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1203
1204		assert!(consumer.get_broadcast("test").is_some());
1205		consumer.assert_next_none("test");
1206		consumer.assert_next("test", &consumer3);
1207
1208		// Drop the final broadcast, we should unannounce.
1209		drop(broadcast3);
1210
1211		// Wait for the async task to run.
1212		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213		assert!(consumer.get_broadcast("test").is_none());
1214
1215		consumer.assert_next_none("test");
1216		consumer.assert_next_wait();
1217	}
1218
1219	#[tokio::test]
1220	async fn test_duplicate_reverse() {
1221		tokio::time::pause();
1222
1223		let origin = Origin::random().produce();
1224		let broadcast1 = Broadcast::new().produce();
1225		let broadcast2 = Broadcast::new().produce();
1226
1227		origin.publish_broadcast("test", broadcast1.consume());
1228		origin.publish_broadcast("test", broadcast2.consume());
1229		assert!(origin.consume().get_broadcast("test").is_some());
1230
1231		// This is harder, dropping the new broadcast first.
1232		drop(broadcast2);
1233
1234		// Wait for the cleanup async task to run.
1235		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1236		assert!(origin.consume().get_broadcast("test").is_some());
1237
1238		drop(broadcast1);
1239
1240		// Wait for the cleanup async task to run.
1241		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1242		assert!(origin.consume().get_broadcast("test").is_none());
1243	}
1244
1245	#[tokio::test]
1246	async fn test_deterministic_tiebreak() {
1247		tokio::time::pause();
1248
1249		// Build a broadcast carrying a specific hop chain.
1250		fn route(ids: &[u64]) -> BroadcastProducer {
1251			let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1252			Broadcast { hops }.produce()
1253		}
1254
1255		// Resolve the active route for "test" after publishing both routes in the given order.
1256		fn winner(first: &[u64], second: &[u64]) -> OriginList {
1257			let origin = Origin::random().produce();
1258			let a = route(first);
1259			let b = route(second);
1260			origin.publish_broadcast("test", a.consume());
1261			origin.publish_broadcast("test", b.consume());
1262			let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1263			// Keep the producers alive until after we read the active route.
1264			drop((a, b));
1265			hops
1266		}
1267
1268		// Two routes with equal hop counts but distinct chains. The winner is decided by
1269		// the deterministic key, not arrival order, so both publish orders converge.
1270		let forward = winner(&[10, 20], &[30, 40]);
1271		let reverse = winner(&[30, 40], &[10, 20]);
1272		assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1273
1274		// A strictly shorter chain always wins regardless of the hash.
1275		assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1276		assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1277	}
1278
1279	#[tokio::test]
1280	async fn test_double_publish() {
1281		tokio::time::pause();
1282
1283		let origin = Origin::random().produce();
1284		let broadcast = Broadcast::new().produce();
1285
1286		// Ensure it doesn't crash.
1287		origin.publish_broadcast("test", broadcast.consume());
1288		origin.publish_broadcast("test", broadcast.consume());
1289
1290		assert!(origin.consume().get_broadcast("test").is_some());
1291
1292		drop(broadcast);
1293
1294		// Wait for the async task to run.
1295		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1296		assert!(origin.consume().get_broadcast("test").is_none());
1297	}
1298	// A previous mpsc-based implementation could only deliver the first 127 broadcasts
1299	// instantly via `assert_next` (which uses `now_or_never`). The kio-backed
1300	// implementation polls synchronously and can deliver all of them without yielding.
1301	// Names are zero-padded so lexicographic delivery order matches the loop index.
1302	#[tokio::test]
1303	async fn test_many_announces() {
1304		let origin = Origin::random().produce();
1305		let broadcast = Broadcast::new().produce();
1306
1307		let mut consumer = origin.consume();
1308		for i in 0..256 {
1309			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1310		}
1311
1312		for i in 0..256 {
1313			consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1314		}
1315		consumer.assert_next_wait();
1316	}
1317
1318	#[tokio::test]
1319	async fn test_many_announces_try() {
1320		let origin = Origin::random().produce();
1321		let broadcast = Broadcast::new().produce();
1322
1323		let mut consumer = origin.consume();
1324		for i in 0..256 {
1325			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1326		}
1327
1328		for i in 0..256 {
1329			consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1330		}
1331	}
1332
1333	#[tokio::test]
1334	async fn test_with_root_basic() {
1335		let origin = Origin::random().produce();
1336		let broadcast = Broadcast::new().produce();
1337
1338		// Create a producer with root "/foo"
1339		let foo_producer = origin.with_root("foo").expect("should create root");
1340		assert_eq!(foo_producer.root().as_str(), "foo");
1341
1342		let mut consumer = origin.consume();
1343
1344		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1345		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1346		// The original consumer should see the full path
1347		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1348
1349		// A consumer created from the rooted producer should see the stripped path
1350		let mut foo_consumer = foo_producer.consume();
1351		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1352	}
1353
1354	#[tokio::test]
1355	async fn test_with_root_nested() {
1356		let origin = Origin::random().produce();
1357		let broadcast = Broadcast::new().produce();
1358
1359		// Create nested roots
1360		let foo_producer = origin.with_root("foo").expect("should create foo root");
1361		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1362		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1363
1364		let mut consumer = origin.consume();
1365
1366		// Publishing to "baz" should actually publish to "foo/bar/baz"
1367		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1368		// The original consumer sees the full path
1369		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1370
1371		// Consumer from foo_bar_producer sees just "baz"
1372		let mut foo_bar_consumer = foo_bar_producer.consume();
1373		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1374	}
1375
1376	#[tokio::test]
1377	async fn test_publish_scope_allows() {
1378		let origin = Origin::random().produce();
1379		let broadcast = Broadcast::new().produce();
1380
1381		// Create a producer that can only publish to "allowed" paths
1382		let limited_producer = origin
1383			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1384			.expect("should create limited producer");
1385
1386		// Should be able to publish to allowed paths
1387		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1388		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1389		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1390
1391		// Should not be able to publish to disallowed paths
1392		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1393		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1394		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1395	}
1396
1397	#[tokio::test]
1398	async fn test_publish_scope_empty() {
1399		let origin = Origin::random().produce();
1400
1401		// Creating a producer with no allowed paths should return None
1402		assert!(origin.scope(&[]).is_none());
1403	}
1404
1405	#[tokio::test]
1406	async fn test_consume_scope_filters() {
1407		let origin = Origin::random().produce();
1408		let broadcast1 = Broadcast::new().produce();
1409		let broadcast2 = Broadcast::new().produce();
1410		let broadcast3 = Broadcast::new().produce();
1411
1412		let mut consumer = origin.consume();
1413
1414		// Publish to different paths
1415		origin.publish_broadcast("allowed", broadcast1.consume());
1416		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1417		origin.publish_broadcast("notallowed", broadcast3.consume());
1418
1419		// Create a consumer that only sees "allowed" paths
1420		let mut limited_consumer = origin
1421			.consume()
1422			.scope(&["allowed".into()])
1423			.expect("should create limited consumer");
1424
1425		// Should only receive broadcasts under "allowed"
1426		limited_consumer.assert_next("allowed", &broadcast1.consume());
1427		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1428		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1429
1430		// Unscoped consumer should see all
1431		consumer.assert_next("allowed", &broadcast1.consume());
1432		consumer.assert_next("allowed/nested", &broadcast2.consume());
1433		consumer.assert_next("notallowed", &broadcast3.consume());
1434	}
1435
1436	#[tokio::test]
1437	async fn test_consume_scope_multiple_prefixes() {
1438		let origin = Origin::random().produce();
1439		let broadcast1 = Broadcast::new().produce();
1440		let broadcast2 = Broadcast::new().produce();
1441		let broadcast3 = Broadcast::new().produce();
1442
1443		origin.publish_broadcast("foo/test", broadcast1.consume());
1444		origin.publish_broadcast("bar/test", broadcast2.consume());
1445		origin.publish_broadcast("baz/test", broadcast3.consume());
1446
1447		// Consumer that only sees "foo" and "bar" paths
1448		let mut limited_consumer = origin
1449			.consume()
1450			.scope(&["foo".into(), "bar".into()])
1451			.expect("should create limited consumer");
1452
1453		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1454		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1455		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1456		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1457	}
1458
1459	#[tokio::test]
1460	async fn test_with_root_and_publish_scope() {
1461		let origin = Origin::random().produce();
1462		let broadcast = Broadcast::new().produce();
1463
1464		// User connects to /foo root
1465		let foo_producer = origin.with_root("foo").expect("should create foo root");
1466
1467		// Limit them to publish only to "bar" and "goop/pee" within /foo
1468		let limited_producer = foo_producer
1469			.scope(&["bar".into(), "goop/pee".into()])
1470			.expect("should create limited producer");
1471
1472		let mut consumer = origin.consume();
1473
1474		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1475		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1476		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1477		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1478		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1479
1480		// Should not be able to publish outside allowed paths
1481		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1482		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1483		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1484
1485		// Original consumer sees full paths
1486		consumer.assert_next("foo/bar", &broadcast.consume());
1487		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1488		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1489		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1490	}
1491
1492	#[tokio::test]
1493	async fn test_with_root_and_consume_scope() {
1494		let origin = Origin::random().produce();
1495		let broadcast1 = Broadcast::new().produce();
1496		let broadcast2 = Broadcast::new().produce();
1497		let broadcast3 = Broadcast::new().produce();
1498
1499		// Publish broadcasts
1500		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1501		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1502		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1503
1504		// User connects to /foo root
1505		let foo_producer = origin.with_root("foo").expect("should create foo root");
1506
1507		// Create consumer limited to "bar" and "goop/pee" within /foo
1508		let mut limited_consumer = foo_producer
1509			.consume()
1510			.scope(&["bar".into(), "goop/pee".into()])
1511			.expect("should create limited consumer");
1512
1513		// Should only see allowed paths (without foo prefix)
1514		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1515		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1516		limited_consumer.assert_next_wait(); // Should not see "other/test"
1517	}
1518
1519	#[tokio::test]
1520	async fn test_with_root_unauthorized() {
1521		let origin = Origin::random().produce();
1522
1523		// First limit the producer to specific paths
1524		let limited_producer = origin
1525			.scope(&["allowed".into()])
1526			.expect("should create limited producer");
1527
1528		// Trying to create a root outside allowed paths should fail
1529		assert!(limited_producer.with_root("notallowed").is_none());
1530
1531		// But creating a root within allowed paths should work
1532		let allowed_root = limited_producer
1533			.with_root("allowed")
1534			.expect("should create allowed root");
1535		assert_eq!(allowed_root.root().as_str(), "allowed");
1536	}
1537
1538	#[tokio::test]
1539	async fn test_wildcard_permission() {
1540		let origin = Origin::random().produce();
1541		let broadcast = Broadcast::new().produce();
1542
1543		// Producer with root access (empty string means wildcard)
1544		let root_producer = origin.clone();
1545
1546		// Should be able to publish anywhere
1547		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1548		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1549
1550		// Can create any root
1551		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1552		assert_eq!(foo_producer.root().as_str(), "foo");
1553	}
1554
1555	#[tokio::test]
1556	async fn test_consume_broadcast_with_permissions() {
1557		let origin = Origin::random().produce();
1558		let broadcast1 = Broadcast::new().produce();
1559		let broadcast2 = Broadcast::new().produce();
1560
1561		origin.publish_broadcast("allowed/test", broadcast1.consume());
1562		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1563
1564		// Create limited consumer
1565		let limited_consumer = origin
1566			.consume()
1567			.scope(&["allowed".into()])
1568			.expect("should create limited consumer");
1569
1570		// Should be able to get allowed broadcast
1571		let result = limited_consumer.get_broadcast("allowed/test");
1572		assert!(result.is_some());
1573		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1574
1575		// Should not be able to get disallowed broadcast
1576		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1577
1578		// Original consumer can get both
1579		let consumer = origin.consume();
1580		assert!(consumer.get_broadcast("allowed/test").is_some());
1581		assert!(consumer.get_broadcast("notallowed/test").is_some());
1582	}
1583
1584	#[tokio::test]
1585	async fn test_nested_paths_with_permissions() {
1586		let origin = Origin::random().produce();
1587		let broadcast = Broadcast::new().produce();
1588
1589		// Create producer limited to "a/b/c"
1590		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1591
1592		// Should be able to publish to exact path and nested paths
1593		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1594		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1595		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1596
1597		// Should not be able to publish to parent or sibling paths
1598		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1599		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1600		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1601	}
1602
1603	#[tokio::test]
1604	async fn test_multiple_consumers_with_different_permissions() {
1605		let origin = Origin::random().produce();
1606		let broadcast1 = Broadcast::new().produce();
1607		let broadcast2 = Broadcast::new().produce();
1608		let broadcast3 = Broadcast::new().produce();
1609
1610		// Publish to different paths
1611		origin.publish_broadcast("foo/test", broadcast1.consume());
1612		origin.publish_broadcast("bar/test", broadcast2.consume());
1613		origin.publish_broadcast("baz/test", broadcast3.consume());
1614
1615		// Create consumers with different permissions
1616		let mut foo_consumer = origin
1617			.consume()
1618			.scope(&["foo".into()])
1619			.expect("should create foo consumer");
1620
1621		let mut bar_consumer = origin
1622			.consume()
1623			.scope(&["bar".into()])
1624			.expect("should create bar consumer");
1625
1626		let mut foobar_consumer = origin
1627			.consume()
1628			.scope(&["foo".into(), "bar".into()])
1629			.expect("should create foobar consumer");
1630
1631		// Each consumer should only see their allowed paths
1632		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1633		foo_consumer.assert_next_wait();
1634
1635		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1636		bar_consumer.assert_next_wait();
1637
1638		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1639		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1640		foobar_consumer.assert_next_wait();
1641	}
1642
1643	#[tokio::test]
1644	async fn test_select_with_empty_prefix() {
1645		let origin = Origin::random().produce();
1646		let broadcast1 = Broadcast::new().produce();
1647		let broadcast2 = Broadcast::new().produce();
1648
1649		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1650		let demo_producer = origin.with_root("demo").expect("should create demo root");
1651		let limited_producer = demo_producer
1652			.scope(&["worm-node".into(), "foobar".into()])
1653			.expect("should create limited producer");
1654
1655		// Publish some broadcasts
1656		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1657		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1658
1659		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1660		let mut consumer = limited_producer
1661			.consume()
1662			.scope(&["".into()])
1663			.expect("should create consumer with empty prefix");
1664
1665		// Should see both broadcasts (order depends on PathPrefixes sort)
1666		let a1 = consumer.try_announced().expect("expected first announcement");
1667		let a2 = consumer.try_announced().expect("expected second announcement");
1668		consumer.assert_next_wait();
1669
1670		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1671		paths.sort();
1672		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1673	}
1674
1675	#[tokio::test]
1676	async fn test_select_narrowing_scope() {
1677		let origin = Origin::random().produce();
1678		let broadcast1 = Broadcast::new().produce();
1679		let broadcast2 = Broadcast::new().produce();
1680		let broadcast3 = Broadcast::new().produce();
1681
1682		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1683		let demo_producer = origin.with_root("demo").expect("should create demo root");
1684		let limited_producer = demo_producer
1685			.scope(&["worm-node".into(), "foobar".into()])
1686			.expect("should create limited producer");
1687
1688		// Publish broadcasts at different levels
1689		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1690		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1691		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1692
1693		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1694		let mut worm_consumer = limited_producer
1695			.consume()
1696			.scope(&["worm-node".into()])
1697			.expect("should create worm-node consumer");
1698
1699		// Should see worm-node content with paths stripped to ""
1700		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1701		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1702		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1703
1704		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1705		let mut foo_consumer = limited_producer
1706			.consume()
1707			.scope(&["worm-node/foo".into()])
1708			.expect("should create worm-node/foo consumer");
1709
1710		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1711		foo_consumer.assert_next_wait(); // Should NOT see other content
1712	}
1713
1714	#[tokio::test]
1715	async fn test_select_multiple_roots_with_empty_prefix() {
1716		let origin = Origin::random().produce();
1717		let broadcast1 = Broadcast::new().produce();
1718		let broadcast2 = Broadcast::new().produce();
1719		let broadcast3 = Broadcast::new().produce();
1720
1721		// Producer with multiple allowed roots
1722		let limited_producer = origin
1723			.scope(&["app1".into(), "app2".into(), "shared".into()])
1724			.expect("should create limited producer");
1725
1726		// Publish to each root
1727		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1728		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1729		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1730
1731		// scope with empty prefix should maintain all roots
1732		let mut consumer = limited_producer
1733			.consume()
1734			.scope(&["".into()])
1735			.expect("should create consumer with empty prefix");
1736
1737		// Should see all broadcasts from all roots
1738		consumer.assert_next("app1/data", &broadcast1.consume());
1739		consumer.assert_next("app2/config", &broadcast2.consume());
1740		consumer.assert_next("shared/resource", &broadcast3.consume());
1741		consumer.assert_next_wait();
1742	}
1743
1744	#[tokio::test]
1745	async fn test_publish_scope_with_empty_prefix() {
1746		let origin = Origin::random().produce();
1747		let broadcast = Broadcast::new().produce();
1748
1749		// Producer with specific allowed paths
1750		let limited_producer = origin
1751			.scope(&["services/api".into(), "services/web".into()])
1752			.expect("should create limited producer");
1753
1754		// scope with empty prefix should keep the same restrictions
1755		let same_producer = limited_producer
1756			.scope(&["".into()])
1757			.expect("should create producer with empty prefix");
1758
1759		// Should still have the same publishing restrictions
1760		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1761		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1762		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1763		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1764	}
1765
1766	#[tokio::test]
1767	async fn test_select_narrowing_to_deeper_path() {
1768		let origin = Origin::random().produce();
1769		let broadcast1 = Broadcast::new().produce();
1770		let broadcast2 = Broadcast::new().produce();
1771		let broadcast3 = Broadcast::new().produce();
1772
1773		// Producer with broad permission
1774		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1775
1776		// Publish at various depths
1777		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1778		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1779		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1780
1781		// Narrow down to team2 only
1782		let mut team2_consumer = limited_producer
1783			.consume()
1784			.scope(&["org/team2".into()])
1785			.expect("should create team2 consumer");
1786
1787		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1788		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1789
1790		// Further narrow down to team1/project1
1791		let mut project1_consumer = limited_producer
1792			.consume()
1793			.scope(&["org/team1/project1".into()])
1794			.expect("should create project1 consumer");
1795
1796		// Should only see project1 content at root
1797		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1798		project1_consumer.assert_next_wait();
1799	}
1800
1801	#[tokio::test]
1802	async fn test_select_with_non_matching_prefix() {
1803		let origin = Origin::random().produce();
1804
1805		// Producer with specific allowed paths
1806		let limited_producer = origin
1807			.scope(&["allowed/path".into()])
1808			.expect("should create limited producer");
1809
1810		// Trying to scope with a completely different prefix should return None
1811		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1812
1813		// Similarly for scope
1814		assert!(limited_producer.scope(&["other/path".into()]).is_none());
1815	}
1816
1817	// Regression test for https://github.com/moq-dev/moq/issues/910
1818	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1819	#[tokio::test]
1820	async fn test_with_root_trailing_slash_consumer() {
1821		let origin = Origin::random().produce();
1822
1823		// Use an owned String so the trailing slash is NOT normalized away.
1824		let prefix = "some_prefix/".to_string();
1825		let mut consumer = origin.consume().with_root(prefix).unwrap();
1826
1827		let b = origin.create_broadcast("some_prefix/test").unwrap();
1828		consumer.assert_next("test", &b.consume());
1829	}
1830
1831	// Same issue but for the producer side of with_root
1832	#[tokio::test]
1833	async fn test_with_root_trailing_slash_producer() {
1834		let origin = Origin::random().produce();
1835
1836		// Use an owned String so the trailing slash is NOT normalized away.
1837		let prefix = "some_prefix/".to_string();
1838		let rooted = origin.with_root(prefix).unwrap();
1839
1840		let b = rooted.create_broadcast("test").unwrap();
1841
1842		let mut consumer = rooted.consume();
1843		consumer.assert_next("test", &b.consume());
1844	}
1845
1846	// Verify unannounce also doesn't panic with trailing slash
1847	#[tokio::test]
1848	async fn test_with_root_trailing_slash_unannounce() {
1849		tokio::time::pause();
1850
1851		let origin = Origin::random().produce();
1852
1853		let prefix = "some_prefix/".to_string();
1854		let mut consumer = origin.consume().with_root(prefix).unwrap();
1855
1856		let b = origin.create_broadcast("some_prefix/test").unwrap();
1857		consumer.assert_next("test", &b.consume());
1858
1859		// Drop the broadcast producer to trigger unannounce
1860		drop(b);
1861		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1862
1863		// unannounce also calls strip_prefix(&self.root).unwrap()
1864		consumer.assert_next_none("test");
1865	}
1866
1867	#[tokio::test]
1868	async fn test_select_maintains_access_with_wider_prefix() {
1869		let origin = Origin::random().produce();
1870		let broadcast1 = Broadcast::new().produce();
1871		let broadcast2 = Broadcast::new().produce();
1872
1873		// Setup: user with root "demo" allowed to subscribe to specific paths
1874		let demo_producer = origin.with_root("demo").expect("should create demo root");
1875		let user_producer = demo_producer
1876			.scope(&["worm-node".into(), "foobar".into()])
1877			.expect("should create user producer");
1878
1879		// Publish some data
1880		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1881		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1882
1883		// Key test: scope with "" should maintain access to allowed roots
1884		let mut consumer = user_producer
1885			.consume()
1886			.scope(&["".into()])
1887			.expect("scope with empty prefix should not fail when user has specific permissions");
1888
1889		// Should still receive broadcasts from allowed paths (order not guaranteed)
1890		let a1 = consumer.try_announced().expect("expected first announcement");
1891		let a2 = consumer.try_announced().expect("expected second announcement");
1892		consumer.assert_next_wait();
1893
1894		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1895		paths.sort();
1896		assert_eq!(paths, ["foobar", "worm-node/data"]);
1897
1898		// Also test that we can still narrow the scope
1899		let mut narrow_consumer = user_producer
1900			.consume()
1901			.scope(&["worm-node".into()])
1902			.expect("should be able to narrow scope to worm-node");
1903
1904		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1905		narrow_consumer.assert_next_wait(); // Should not see foobar
1906	}
1907
1908	#[tokio::test]
1909	async fn test_duplicate_prefixes_deduped() {
1910		let origin = Origin::random().produce();
1911		let broadcast = Broadcast::new().produce();
1912
1913		// scope with duplicate prefixes should work (deduped internally)
1914		let producer = origin
1915			.scope(&["demo".into(), "demo".into()])
1916			.expect("should create producer");
1917
1918		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1919
1920		let mut consumer = producer.consume();
1921		consumer.assert_next("demo/stream", &broadcast.consume());
1922		consumer.assert_next_wait();
1923	}
1924
1925	#[tokio::test]
1926	async fn test_overlapping_prefixes_deduped() {
1927		let origin = Origin::random().produce();
1928		let broadcast = Broadcast::new().produce();
1929
1930		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1931		let producer = origin
1932			.scope(&["demo".into(), "demo/foo".into()])
1933			.expect("should create producer");
1934
1935		// Can still publish under "demo/bar" since "demo" covers everything
1936		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1937
1938		let mut consumer = producer.consume();
1939		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1940		consumer.assert_next_wait();
1941	}
1942
1943	#[tokio::test]
1944	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1945		let origin = Origin::random().produce();
1946		let broadcast = Broadcast::new().produce();
1947
1948		// Both "demo" and "demo/foo" are requested — should only have one node
1949		let producer = origin
1950			.scope(&["demo".into(), "demo/foo".into()])
1951			.expect("should create producer");
1952
1953		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1954
1955		let mut consumer = producer.consume();
1956		// Should only get ONE announcement (not two from overlapping nodes)
1957		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1958		consumer.assert_next_wait();
1959	}
1960
1961	#[tokio::test]
1962	async fn test_allowed_returns_deduped_prefixes() {
1963		let origin = Origin::random().produce();
1964
1965		let producer = origin
1966			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1967			.expect("should create producer");
1968
1969		let allowed: Vec<_> = producer.allowed().collect();
1970		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1971	}
1972
1973	#[tokio::test]
1974	async fn test_announced_broadcast_already_announced() {
1975		let origin = Origin::random().produce();
1976		let broadcast = Broadcast::new().produce();
1977
1978		origin.publish_broadcast("test", broadcast.consume());
1979
1980		let consumer = origin.consume();
1981		let result = consumer.announced_broadcast("test").await.expect("should find it");
1982		assert!(result.is_clone(&broadcast.consume()));
1983	}
1984
1985	#[tokio::test]
1986	async fn test_announced_broadcast_delayed() {
1987		tokio::time::pause();
1988
1989		let origin = Origin::random().produce();
1990		let broadcast = Broadcast::new().produce();
1991
1992		let consumer = origin.consume();
1993
1994		// Start waiting before it's announced.
1995		let wait = tokio::spawn({
1996			let consumer = consumer.clone();
1997			async move { consumer.announced_broadcast("test").await }
1998		});
1999
2000		// Give the spawned task a chance to subscribe.
2001		tokio::task::yield_now().await;
2002
2003		origin.publish_broadcast("test", broadcast.consume());
2004
2005		let result = wait.await.unwrap().expect("should find it");
2006		assert!(result.is_clone(&broadcast.consume()));
2007	}
2008
2009	#[tokio::test]
2010	async fn test_announced_broadcast_ignores_unrelated_paths() {
2011		tokio::time::pause();
2012
2013		let origin = Origin::random().produce();
2014		let other = Broadcast::new().produce();
2015		let target = Broadcast::new().produce();
2016
2017		let consumer = origin.consume();
2018
2019		let wait = tokio::spawn({
2020			let consumer = consumer.clone();
2021			async move { consumer.announced_broadcast("target").await }
2022		});
2023
2024		tokio::task::yield_now().await;
2025
2026		// Publish an unrelated broadcast first — announced_broadcast should skip it.
2027		origin.publish_broadcast("other", other.consume());
2028		tokio::task::yield_now().await;
2029		assert!(!wait.is_finished(), "must not resolve on unrelated path");
2030
2031		origin.publish_broadcast("target", target.consume());
2032		let result = wait.await.unwrap().expect("should find target");
2033		assert!(result.is_clone(&target.consume()));
2034	}
2035
2036	#[tokio::test]
2037	async fn test_announced_broadcast_skips_nested_paths() {
2038		tokio::time::pause();
2039
2040		let origin = Origin::random().produce();
2041		let nested = Broadcast::new().produce();
2042		let exact = Broadcast::new().produce();
2043
2044		let consumer = origin.consume();
2045
2046		let wait = tokio::spawn({
2047			let consumer = consumer.clone();
2048			async move { consumer.announced_broadcast("foo").await }
2049		});
2050
2051		tokio::task::yield_now().await;
2052
2053		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
2054		origin.publish_broadcast("foo/bar", nested.consume());
2055		tokio::task::yield_now().await;
2056		assert!(!wait.is_finished(), "must not resolve on a nested path");
2057
2058		origin.publish_broadcast("foo", exact.consume());
2059		let result = wait.await.unwrap().expect("should find foo exactly");
2060		assert!(result.is_clone(&exact.consume()));
2061	}
2062
2063	#[tokio::test]
2064	async fn test_announced_broadcast_disallowed() {
2065		let origin = Origin::random().produce();
2066		let limited = origin
2067			.consume()
2068			.scope(&["allowed".into()])
2069			.expect("should create limited");
2070
2071		// Path is outside allowed prefixes — should return None immediately.
2072		assert!(limited.announced_broadcast("notallowed").await.is_none());
2073	}
2074
2075	#[tokio::test]
2076	async fn test_announced_broadcast_scope_too_narrow() {
2077		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
2078		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
2079		let origin = Origin::random().produce();
2080		let limited = origin
2081			.consume()
2082			.scope(&["foo/specific".into()])
2083			.expect("should create limited");
2084
2085		// now_or_never so we fail fast instead of hanging if the guard regresses.
2086		let result = limited
2087			.announced_broadcast("foo")
2088			.now_or_never()
2089			.expect("must not block");
2090		assert!(result.is_none());
2091	}
2092
2093	// Coalescing tests: a slow consumer that doesn't drain between updates
2094	// should observe a bounded number of deliveries.
2095
2096	#[tokio::test]
2097	async fn test_coalesce_announce_then_unannounce() {
2098		// announce + unannounce that the consumer hasn't observed yet collapses to nothing.
2099		tokio::time::pause();
2100
2101		let origin = Origin::random().produce();
2102		let mut consumer = origin.consume();
2103
2104		let broadcast = Broadcast::new().produce();
2105		origin.publish_broadcast("test", broadcast.consume());
2106		drop(broadcast);
2107
2108		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2109
2110		consumer.assert_next_wait();
2111	}
2112
2113	#[tokio::test]
2114	async fn test_coalesce_announce_unannounce_announce() {
2115		// announce, unannounce, announce that the consumer hasn't drained collapses
2116		// to a single Announce of the latest broadcast.
2117		tokio::time::pause();
2118
2119		let origin = Origin::random().produce();
2120		let mut consumer = origin.consume();
2121
2122		let broadcast1 = Broadcast::new().produce();
2123		let broadcast2 = Broadcast::new().produce();
2124
2125		origin.publish_broadcast("test", broadcast1.consume());
2126		drop(broadcast1);
2127		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2128		origin.publish_broadcast("test", broadcast2.consume());
2129
2130		consumer.assert_next("test", &broadcast2.consume());
2131		consumer.assert_next_wait();
2132	}
2133
2134	#[tokio::test]
2135	async fn test_coalesce_unannounce_announce_preserved() {
2136		// unannounce followed by announce of a different broadcast must be preserved
2137		// as two deliveries so the consumer learns the origin changed.
2138		tokio::time::pause();
2139
2140		let origin = Origin::random().produce();
2141		let broadcast1 = Broadcast::new().produce();
2142		origin.publish_broadcast("test", broadcast1.consume());
2143
2144		let mut consumer = origin.consume();
2145		consumer.assert_next("test", &broadcast1.consume());
2146
2147		// Drop, then publish a fresh broadcast at the same path.
2148		drop(broadcast1);
2149		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2150
2151		let broadcast2 = Broadcast::new().produce();
2152		origin.publish_broadcast("test", broadcast2.consume());
2153
2154		// The consumer must see the unannounce before the new announce.
2155		consumer.assert_next_none("test");
2156		consumer.assert_next("test", &broadcast2.consume());
2157		consumer.assert_next_wait();
2158	}
2159
2160	#[tokio::test]
2161	async fn test_coalesce_unannounce_announce_unannounce() {
2162		// unannounce + announce + unannounce collapses to a single unannounce: the
2163		// embedded announce was never observed.
2164		tokio::time::pause();
2165
2166		let origin = Origin::random().produce();
2167		let broadcast1 = Broadcast::new().produce();
2168		origin.publish_broadcast("test", broadcast1.consume());
2169
2170		let mut consumer = origin.consume();
2171		consumer.assert_next("test", &broadcast1.consume());
2172
2173		drop(broadcast1);
2174		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2175
2176		let broadcast2 = Broadcast::new().produce();
2177		origin.publish_broadcast("test", broadcast2.consume());
2178		drop(broadcast2);
2179		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2180
2181		consumer.assert_next_none("test");
2182		consumer.assert_next_wait();
2183	}
2184
2185	#[tokio::test]
2186	async fn test_coalesce_churn_bounded() {
2187		// A churn loop on a single path should keep the pending set bounded.
2188		// Backup promotion during cleanup can leave the consumer with zero or one
2189		// pending update for "test" depending on the order tasks run; we only
2190		// require that churn doesn't accumulate across iterations.
2191		tokio::time::pause();
2192
2193		let origin = Origin::random().produce();
2194		let mut consumer = origin.consume();
2195
2196		for _ in 0..1000 {
2197			let broadcast = Broadcast::new().produce();
2198			origin.publish_broadcast("test", broadcast.consume());
2199			drop(broadcast);
2200		}
2201		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2202
2203		let mut collected = Vec::new();
2204		while let Some(update) = consumer.try_announced() {
2205			collected.push(update);
2206		}
2207		assert!(
2208			collected.len() <= 1,
2209			"expected at most one pending update, got {}",
2210			collected.len()
2211		);
2212		assert!(
2213			collected.iter().all(|(path, _)| path == &Path::new("test")),
2214			"unexpected path in pending updates",
2215		);
2216	}
2217}