Skip to main content

moq_net/model/
origin.rs

1use std::{
2	collections::{BTreeMap, HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5	task::{Poll, ready},
6};
7
8use rand::RngExt;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Error, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	///
38	/// TEMPORARY: the wire format allows 62 bits, but older `@moq/lite` JS
39	/// clients decode `AnnounceInterest.exclude_hop` as a u53 (number) and
40	/// throw on anything > 2^53-1. To keep those clients alive against
41	/// fresh relays, we cap the random id at 53 bits. Restore to 62 bits
42	/// once the JS u62 fix has propagated to deployed bundles.
43	pub fn random() -> Self {
44		let mut rng = rand::rng();
45		let id = rng.random_range(1..(1u64 << 53));
46		Self { id }
47	}
48
49	/// Consume this [Origin] to create a producer that carries its id.
50	pub fn produce(self) -> OriginProducer {
51		OriginProducer::new(self)
52	}
53}
54
55impl From<u64> for Origin {
56	fn from(id: u64) -> Self {
57		Self { id }
58	}
59}
60
61impl fmt::Display for Origin {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		self.id.fmt(f)
64	}
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69	u64: Encode<V>,
70{
71	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72		self.id.encode(w, version)
73	}
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78	u64: Decode<V>,
79{
80	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81		let id = u64::decode(r, version)?;
82		if id >= 1u64 << 62 {
83			return Err(DecodeError::InvalidValue);
84		}
85		Ok(Self { id })
86	}
87}
88
89/// Maximum number of origins (hops) an [`OriginList`] can hold.
90///
91/// Caps pathological or loop-induced announcements at a reasonable cluster
92/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
93/// silently truncating.
94pub(crate) const MAX_HOPS: usize = 32;
95
96/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
97///
98/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
99/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111		write!(f, "too many origins (max {MAX_HOPS})")
112	}
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118	fn from(_: TooManyOrigins) -> Self {
119		DecodeError::BoundsExceeded
120	}
121}
122
123impl OriginList {
124	/// Create an empty list.
125	pub fn new() -> Self {
126		Self(Vec::new())
127	}
128
129	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
130	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131		if self.0.len() >= MAX_HOPS {
132			return Err(TooManyOrigins);
133		}
134		self.0.push(origin);
135		Ok(())
136	}
137
138	/// Replace the first entry equal to `target` with `replacement`, returning
139	/// true if a match was found. The length is unchanged.
140	pub fn replace_first(&mut self, target: Origin, replacement: Origin) -> bool {
141		for entry in &mut self.0 {
142			if *entry == target {
143				*entry = replacement;
144				return true;
145			}
146		}
147		false
148	}
149
150	/// Returns true if any entry matches `origin`.
151	pub fn contains(&self, origin: &Origin) -> bool {
152		self.0.contains(origin)
153	}
154
155	/// Number of entries currently in the list (always `<= MAX_HOPS`).
156	pub fn len(&self) -> usize {
157		self.0.len()
158	}
159
160	/// Whether the list contains no entries.
161	pub fn is_empty(&self) -> bool {
162		self.0.is_empty()
163	}
164
165	/// Iterate over the entries in hop order (oldest first).
166	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
167		self.0.iter()
168	}
169
170	/// Borrow the entries as a slice.
171	pub fn as_slice(&self) -> &[Origin] {
172		&self.0
173	}
174}
175
176impl TryFrom<Vec<Origin>> for OriginList {
177	type Error = TooManyOrigins;
178
179	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
180		if v.len() > MAX_HOPS {
181			return Err(TooManyOrigins);
182		}
183		Ok(Self(v))
184	}
185}
186
187impl<'a> IntoIterator for &'a OriginList {
188	type Item = &'a Origin;
189	type IntoIter = std::slice::Iter<'a, Origin>;
190
191	fn into_iter(self) -> Self::IntoIter {
192		self.iter()
193	}
194}
195
196impl<V: Copy> Encode<V> for OriginList
197where
198	u64: Encode<V>,
199	Origin: Encode<V>,
200{
201	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
202		(self.0.len() as u64).encode(w, version)?;
203		for origin in &self.0 {
204			origin.encode(w, version)?;
205		}
206		Ok(())
207	}
208}
209
210impl<V: Copy> Decode<V> for OriginList
211where
212	u64: Decode<V>,
213	Origin: Decode<V>,
214{
215	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
216		let count = u64::decode(r, version)? as usize;
217		if count > MAX_HOPS {
218			return Err(DecodeError::BoundsExceeded);
219		}
220		let mut list = Vec::with_capacity(count);
221		for _ in 0..count {
222			list.push(Origin::decode(r, version)?);
223		}
224		Ok(Self(list))
225	}
226}
227
228static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231struct ConsumerId(u64);
232
233impl ConsumerId {
234	fn new() -> Self {
235		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
236	}
237}
238
239// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
240struct OriginBroadcast {
241	path: PathOwned,
242	active: BroadcastConsumer,
243	backup: VecDeque<BroadcastConsumer>,
244}
245
246/// Ordering key used to pick the active route among broadcasts at the same path.
247///
248/// Lower wins. Shorter hop chains sort first; equal-length chains are broken by a
249/// deterministic hash of the broadcast name and hop chain, so every node in the
250/// cluster, given the same candidate routes, converges on the same winner instead
251/// of relying on arrival order. Mixing the name in spreads equal-length routes
252/// across different upstreams rather than funneling every broadcast onto one.
253fn route_key(name: &Path, hops: &OriginList) -> (usize, u64) {
254	// FNV-1a, not the std hasher: its output is fixed across Rust versions and
255	// builds, which matters when nodes run mismatched binaries during a rolling
256	// deploy and still need to agree on the same route. SEED is a custom basis
257	// (any nonzero u64 works, the textbook one is just as arbitrary); FNV_PRIME is
258	// the standard FNV-64 prime and should stay put.
259	const SEED: u64 = 0x420C0DECB00B; // 420 C0DEC B00B
260	const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
261
262	let mut hash = SEED;
263	for &byte in name.as_str().as_bytes() {
264		hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
265	}
266	for hop in hops {
267		for &byte in &hop.id.to_le_bytes() {
268			hash = (hash ^ u64::from(byte)).wrapping_mul(FNV_PRIME);
269		}
270	}
271
272	(hops.len(), hash)
273}
274
275/// One coalesced update queued for an `OriginConsumer`.
276///
277/// At most one entry exists per path, so a slow consumer's pending set is bounded
278/// by the number of distinct paths. `UnannounceAnnounce` preserves the
279/// signal that the broadcast at a path was replaced (the consumer must see
280/// `(path, None)` before `(path, Some(new))`), while a stale
281/// `Announce` cancels with a subsequent `unannounce` because the consumer
282/// has not yet observed it.
283enum PendingUpdate {
284	Announce(BroadcastConsumer),
285	Unannounce,
286	UnannounceAnnounce(BroadcastConsumer),
287}
288
289/// Pending updates keyed by path. `BTreeMap` keeps memory strictly bounded by
290/// the number of distinct paths with outstanding work (collapsed pairs are
291/// fully erased) and gives a deterministic lexicographic delivery order so
292/// tests can predict it.
293#[derive(Default)]
294struct OriginConsumerState {
295	pending: BTreeMap<PathOwned, PendingUpdate>,
296}
297
298impl OriginConsumerState {
299	fn apply_announce(&mut self, path: PathOwned, broadcast: BroadcastConsumer) {
300		let new = match self.pending.remove(&path) {
301			// First announce, or a stale announce being replaced.
302			None | Some(PendingUpdate::Announce(_)) => PendingUpdate::Announce(broadcast),
303			// Consumer needs to observe the unannounce before this announce.
304			Some(PendingUpdate::Unannounce | PendingUpdate::UnannounceAnnounce(_)) => {
305				PendingUpdate::UnannounceAnnounce(broadcast)
306			}
307		};
308		self.pending.insert(path, new);
309	}
310
311	fn apply_unannounce(&mut self, path: PathOwned) {
312		match self.pending.remove(&path) {
313			// Consumer has not seen the pending announce; drop both entirely.
314			Some(PendingUpdate::Announce(_)) => {}
315			None | Some(PendingUpdate::Unannounce) => {
316				self.pending.insert(path, PendingUpdate::Unannounce);
317			}
318			// The embedded announce cancels with this unannounce; the consumer
319			// still needs the leading unannounce.
320			Some(PendingUpdate::UnannounceAnnounce(_)) => {
321				self.pending.insert(path, PendingUpdate::Unannounce);
322			}
323		}
324	}
325
326	/// Take one update to deliver to the consumer, if any.
327	fn take(&mut self) -> Option<OriginAnnounce> {
328		let path = self.pending.keys().next()?.clone();
329		match self.pending.remove(&path).unwrap() {
330			PendingUpdate::Announce(broadcast) => Some((path, Some(broadcast))),
331			PendingUpdate::Unannounce => Some((path, None)),
332			PendingUpdate::UnannounceAnnounce(broadcast) => {
333				// Deliver the unannounce now; leave the trailing announce pending so
334				// the next take returns it for the same path.
335				self.pending.insert(path.clone(), PendingUpdate::Announce(broadcast));
336				Some((path, None))
337			}
338		}
339	}
340}
341
342#[derive(Clone)]
343struct OriginConsumerNotify {
344	root: PathOwned,
345	state: kio::Producer<OriginConsumerState>,
346}
347
348impl OriginConsumerNotify {
349	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
350		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
351		self.state
352			.write()
353			.ok()
354			.expect("consumer closed")
355			.apply_announce(path, broadcast);
356	}
357
358	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
359		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
360		let mut state = self.state.write().ok().expect("consumer closed");
361		state.apply_unannounce(path.clone());
362		state.apply_announce(path, broadcast);
363	}
364
365	fn unannounce(&self, path: impl AsPath) {
366		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
367		self.state.write().ok().expect("consumer closed").apply_unannounce(path);
368	}
369}
370
371struct NotifyNode {
372	parent: Option<Lock<NotifyNode>>,
373
374	// Consumers that are subscribed to this node.
375	// We store a consumer ID so we can remove it easily when it closes.
376	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
377}
378
379impl NotifyNode {
380	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
381		Self {
382			parent,
383			consumers: HashMap::new(),
384		}
385	}
386
387	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
388		for consumer in self.consumers.values() {
389			consumer.announce(path.as_path(), broadcast.clone());
390		}
391
392		if let Some(parent) = &self.parent {
393			parent.lock().announce(path, broadcast);
394		}
395	}
396
397	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
398		for consumer in self.consumers.values() {
399			consumer.reannounce(path.as_path(), broadcast.clone());
400		}
401
402		if let Some(parent) = &self.parent {
403			parent.lock().reannounce(path, broadcast);
404		}
405	}
406
407	fn unannounce(&mut self, path: impl AsPath) {
408		for consumer in self.consumers.values() {
409			consumer.unannounce(path.as_path());
410		}
411
412		if let Some(parent) = &self.parent {
413			parent.lock().unannounce(path);
414		}
415	}
416}
417
418struct OriginNode {
419	// The broadcast that is published to this node.
420	broadcast: Option<OriginBroadcast>,
421
422	// Nested nodes, one level down the tree.
423	nested: HashMap<String, Lock<OriginNode>>,
424
425	// Unfortunately, to notify consumers we need to traverse back up the tree.
426	notify: Lock<NotifyNode>,
427}
428
429impl OriginNode {
430	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
431		Self {
432			broadcast: None,
433			nested: HashMap::new(),
434			notify: Lock::new(NotifyNode::new(parent)),
435		}
436	}
437
438	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
439		let (dir, rest) = path.next_part().expect("leaf called with empty path");
440
441		let next = self.entry(dir);
442		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
443	}
444
445	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
446		match self.nested.get(dir) {
447			Some(next) => next.clone(),
448			None => {
449				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
450				self.nested.insert(dir.to_string(), next.clone());
451				next
452			}
453		}
454	}
455
456	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
457		let full = full.as_path();
458		let rest = relative.as_path();
459
460		// If the path has a directory component, then publish it to the nested node.
461		if let Some((dir, relative)) = rest.next_part() {
462			// Not using entry to avoid allocating a string most of the time.
463			self.entry(dir).lock().publish(&full, broadcast, &relative);
464		} else if let Some(existing) = &mut self.broadcast {
465			// This node is a leaf with an existing broadcast. Prefer the route with the
466			// lower ordering key (shorter hop chain, deterministic hash on ties), so every
467			// node converges on the same route regardless of the order announces arrive.
468			//
469			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
470			// backup queue can't accumulate clones of the active entry and trigger redundant
471			// reannouncements when a peer churns.
472			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
473				return;
474			}
475
476			if route_key(&full, &broadcast.hops) < route_key(&full, &existing.active.hops) {
477				let old = existing.active.clone();
478				existing.active = broadcast.clone();
479				existing.backup.push_back(old);
480
481				self.notify.lock().reannounce(full, broadcast);
482			} else {
483				// Loses the ordering (longer path, or the tie-break): keep as a backup
484				// in case the active one drops.
485				existing.backup.push_back(broadcast.clone());
486			}
487		} else {
488			// This node is a leaf with no existing broadcast.
489			self.broadcast = Some(OriginBroadcast {
490				path: full.to_owned(),
491				active: broadcast.clone(),
492				backup: VecDeque::new(),
493			});
494			self.notify.lock().announce(full, broadcast);
495		}
496	}
497
498	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
499		self.consume_initial(&mut notify);
500		self.notify.lock().consumers.insert(id, notify);
501	}
502
503	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
504		if let Some(broadcast) = &self.broadcast {
505			notify.announce(&broadcast.path, broadcast.active.clone());
506		}
507
508		// Recursively subscribe to all nested nodes.
509		for nested in self.nested.values() {
510			nested.lock().consume_initial(notify);
511		}
512	}
513
514	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
515		let rest = rest.as_path();
516
517		if let Some((dir, rest)) = rest.next_part() {
518			let node = self.nested.get(dir)?.lock();
519			node.consume_broadcast(&rest)
520		} else {
521			self.broadcast.as_ref().map(|b| b.active.clone())
522		}
523	}
524
525	fn unconsume(&mut self, id: ConsumerId) {
526		self.notify.lock().consumers.remove(&id).expect("consumer not found");
527		if self.is_empty() {
528			//tracing::warn!("TODO: empty node; memory leak");
529			// This happens when consuming a path that is not being broadcasted.
530		}
531	}
532
533	// Returns true if the broadcast should be unannounced.
534	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
535		let full = full.as_path();
536		let relative = relative.as_path();
537
538		if let Some((dir, relative)) = relative.next_part() {
539			let nested = self.entry(dir);
540			let mut locked = nested.lock();
541			locked.remove(&full, broadcast, &relative);
542
543			if locked.is_empty() {
544				drop(locked);
545				self.nested.remove(dir);
546			}
547		} else {
548			let entry = match &mut self.broadcast {
549				Some(existing) => existing,
550				None => return,
551			};
552
553			// See if we can remove the broadcast from the backup list.
554			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
555			if let Some(pos) = pos {
556				entry.backup.remove(pos);
557				// Nothing else to do
558				return;
559			}
560
561			// Okay so it must be the active broadcast or else we fucked up.
562			assert!(entry.active.is_clone(&broadcast));
563
564			// Promote the backup with the lowest ordering key, the same rule used when
565			// publishing, so the route a node heals to still matches its peers.
566			let best = entry
567				.backup
568				.iter()
569				.enumerate()
570				.min_by_key(|(_, b)| route_key(&full, &b.hops))
571				.map(|(i, _)| i);
572			if let Some(idx) = best {
573				let active = entry.backup.remove(idx).expect("index in range");
574				entry.active = active;
575				self.notify.lock().reannounce(full, &entry.active);
576			} else {
577				// No more backups, so remove the entry.
578				self.broadcast = None;
579				self.notify.lock().unannounce(full);
580			}
581		}
582	}
583
584	fn is_empty(&self) -> bool {
585		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
586	}
587}
588
589#[derive(Clone)]
590struct OriginNodes {
591	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
592}
593
594impl OriginNodes {
595	// Returns nested roots that match the prefixes.
596	// PathPrefixes guarantees no duplicates or overlapping prefixes.
597	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
598		let mut roots = Vec::new();
599
600		for (root, state) in &self.nodes {
601			for prefix in prefixes {
602				if root.has_prefix(prefix) {
603					// Keep the existing node if we're allowed to access it.
604					roots.push((root.to_owned(), state.clone()));
605					continue;
606				}
607
608				if let Some(suffix) = prefix.strip_prefix(root) {
609					// If the requested prefix is larger than the allowed prefix, then we further scope it.
610					let nested = state.lock().leaf(&suffix);
611					roots.push((prefix.to_owned(), nested));
612				}
613			}
614		}
615
616		if roots.is_empty() {
617			None
618		} else {
619			Some(Self { nodes: roots })
620		}
621	}
622
623	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
624		let new_root = new_root.as_path();
625		let mut roots = Vec::new();
626
627		if new_root.is_empty() {
628			return Some(self.clone());
629		}
630
631		for (root, state) in &self.nodes {
632			if let Some(suffix) = root.strip_prefix(&new_root) {
633				// If the old root is longer than the new root, shorten the keys.
634				roots.push((suffix.to_owned(), state.clone()));
635			} else if let Some(suffix) = new_root.strip_prefix(root) {
636				// If the new root is longer than the old root, add a new root.
637				// NOTE: suffix can't be empty
638				let nested = state.lock().leaf(&suffix);
639				roots.push(("".into(), nested));
640			}
641		}
642
643		if roots.is_empty() {
644			None
645		} else {
646			Some(Self { nodes: roots })
647		}
648	}
649
650	// Returns the root that has this prefix.
651	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
652		let path = path.as_path();
653
654		for (root, state) in &self.nodes {
655			if let Some(suffix) = path.strip_prefix(root) {
656				return Some((state.clone(), suffix.to_owned()));
657			}
658		}
659
660		None
661	}
662}
663
664impl Default for OriginNodes {
665	fn default() -> Self {
666		Self {
667			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
668		}
669	}
670}
671
672/// A broadcast path and its associated consumer, or None if closed.
673pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
674
675/// Announces broadcasts to consumers over the network.
676#[derive(Clone)]
677pub struct OriginProducer {
678	// Identity for this origin. Appended to broadcast hops when
679	// re-announcing so downstream relays can detect loops and prefer the
680	// shortest path.
681	info: Origin,
682
683	// The roots of the tree that we are allowed to publish.
684	// A path of "" means we can publish anything.
685	nodes: OriginNodes,
686
687	// The prefix that is automatically stripped from all paths.
688	root: PathOwned,
689
690	// Fallback request queue, shared with every derived consumer. Separate from
691	// `nodes` because dynamic broadcasts are never announced: they only resolve a
692	// consumer's `request_broadcast` when no live announcement exists.
693	dynamic: kio::Producer<OriginDynamicState>,
694}
695
696impl std::ops::Deref for OriginProducer {
697	type Target = Origin;
698
699	fn deref(&self) -> &Self::Target {
700		&self.info
701	}
702}
703
704impl OriginProducer {
705	/// Build a producer for the given origin id with no scoped prefix and no
706	/// pre-existing broadcasts. Prefer [`Origin::produce`].
707	pub fn new(info: Origin) -> Self {
708		Self {
709			info,
710			nodes: OriginNodes::default(),
711			root: PathOwned::default(),
712			dynamic: kio::Producer::default(),
713		}
714	}
715
716	/// Create and publish a new broadcast, returning the producer.
717	///
718	/// This is a helper method when you only want to publish a broadcast to a single origin.
719	/// Returns [None] if the broadcast is not allowed to be published.
720	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
721		let broadcast = Broadcast::new().produce();
722		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
723	}
724
725	/// Publish a broadcast, announcing it to all consumers.
726	///
727	/// The broadcast will be unannounced when it is closed.
728	/// If there is already a broadcast with the same path, the new one replaces the active only
729	/// if it has a shorter hop path, or an equal-length path that wins a deterministic tie-break
730	/// (a hash of the broadcast name and hop chain); otherwise it is queued as a backup. The
731	/// tie-break is identical on every node, so a cluster converges on the same route.
732	/// When the active broadcast closes, the backup that wins the same ordering is promoted and
733	/// reannounced. Backups that close before being promoted are silently dropped.
734	///
735	/// Returns false if the broadcast is not allowed to be published.
736	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
737		let path = path.as_path();
738
739		// Loop detection: refuse broadcasts whose hop chain already contains our id.
740		if broadcast.hops.contains(&self.info) {
741			return false;
742		}
743
744		let (root, rest) = match self.nodes.get(&path) {
745			Some(root) => root,
746			None => return false,
747		};
748
749		let full = self.root.join(&path);
750
751		root.lock().publish(&full, &broadcast, &rest);
752		let root = root.clone();
753
754		web_async::spawn(async move {
755			broadcast.closed().await;
756			root.lock().remove(&full, broadcast, &rest);
757		});
758
759		true
760	}
761
762	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
763	///
764	/// Returns None if there are no legal prefixes (the requested prefixes are
765	/// disjoint from this producer's current scope).
766	// TODO accept PathPrefixes instead of &[Path]
767	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
768		let prefixes = PathPrefixes::new(prefixes);
769		Some(OriginProducer {
770			info: self.info,
771			nodes: self.nodes.select(&prefixes)?,
772			root: self.root.clone(),
773			dynamic: self.dynamic.clone(),
774		})
775	}
776
777	/// Create a dynamic handler that picks up [`OriginConsumer::request_broadcast`]
778	/// calls for paths that are not announced.
779	///
780	/// This is the origin-level analogue of [`BroadcastProducer::dynamic`]: it serves
781	/// broadcasts on demand rather than tracks. Crucially the served broadcasts are
782	/// *not* announced, so [`OriginConsumer::announced`] never sees them; they exist
783	/// only as a fallback for a consumer that asks for an exact path with no live
784	/// announcement. Drop the handler (and every clone) to reject pending requests.
785	pub fn dynamic(&self) -> OriginDynamic {
786		OriginDynamic::new(self.info, self.root.clone(), self.dynamic.clone())
787	}
788
789	/// Subscribe to all announced broadcasts.
790	pub fn consume(&self) -> OriginConsumer {
791		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone(), self.dynamic.consume())
792	}
793
794	/// Get a broadcast by path if it has *already* been published.
795	///
796	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
797	/// announcement-cursor allocation, which is currently relatively expensive.
798	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
799	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
800		let path = path.as_path();
801		let (root, rest) = self.nodes.get(&path)?;
802		let state = root.lock();
803		state.consume_broadcast(&rest)
804	}
805
806	/// Returns a new OriginProducer that automatically strips out the provided prefix.
807	///
808	/// Returns None if the provided root is not authorized; when [`Self::scope`]
809	/// was already used without a wildcard.
810	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
811		let prefix = prefix.as_path();
812
813		Some(Self {
814			info: self.info,
815			root: self.root.join(&prefix).to_owned(),
816			nodes: self.nodes.root(&prefix)?,
817			dynamic: self.dynamic.clone(),
818		})
819	}
820
821	/// Returns the root that is automatically stripped from all paths.
822	pub fn root(&self) -> &Path<'_> {
823		&self.root
824	}
825
826	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
827	// TODO return PathPrefixes
828	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
829		self.nodes.nodes.iter().map(|(root, _)| root)
830	}
831
832	/// Converts a relative path to an absolute path.
833	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
834		self.root.join(path)
835	}
836}
837
838/// Consumes announced broadcasts matching against an optional prefix.
839///
840/// NOTE: Clone is expensive, try to avoid it.
841pub struct OriginConsumer {
842	id: ConsumerId,
843	// Identity of the origin this consumer was derived from.
844	info: Origin,
845	nodes: OriginNodes,
846
847	// Pending updates queued for this consumer. Coalesced so a slow consumer
848	// can't accumulate redundant announce/unannounce pairs.
849	state: kio::Producer<OriginConsumerState>,
850
851	// A prefix that is automatically stripped from all paths.
852	root: PathOwned,
853
854	// Shared fallback request queue, fed to any `OriginDynamic` handler on the
855	// producer side. Used only by `request_broadcast`; announced lookups ignore it.
856	dynamic: kio::Consumer<OriginDynamicState>,
857}
858
859impl std::ops::Deref for OriginConsumer {
860	type Target = Origin;
861
862	fn deref(&self) -> &Self::Target {
863		&self.info
864	}
865}
866
867impl OriginConsumer {
868	fn new(info: Origin, root: PathOwned, nodes: OriginNodes, dynamic: kio::Consumer<OriginDynamicState>) -> Self {
869		let state = kio::Producer::<OriginConsumerState>::default();
870		let id = ConsumerId::new();
871
872		for (_, node) in &nodes.nodes {
873			let notify = OriginConsumerNotify {
874				root: root.clone(),
875				state: state.clone(),
876			};
877			node.lock().consume(id, notify);
878		}
879
880		Self {
881			id,
882			info,
883			nodes,
884			state,
885			root,
886			dynamic,
887		}
888	}
889
890	/// Returns the next (un)announced broadcast and the absolute path.
891	///
892	/// The broadcast will only be announced if it was previously unannounced.
893	/// The same path won't be announced/unannounced twice, instead it will toggle.
894	/// Returns None if the consumer is closed.
895	///
896	/// Note: The returned path is absolute and will always match this consumer's prefix.
897	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
898		kio::wait(|waiter| self.poll_announced(waiter)).await
899	}
900
901	/// Poll for the next (un)announced broadcast, without blocking.
902	///
903	/// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the
904	/// consumer is closed, or `Poll::Pending` after registering `waiter` to be
905	/// notified when the next update arrives.
906	pub fn poll_announced(&mut self, waiter: &kio::Waiter) -> Poll<Option<OriginAnnounce>> {
907		let mut state = match ready!(self.state.poll(waiter, |state| {
908			if state.pending.is_empty() {
909				Poll::Pending
910			} else {
911				Poll::Ready(())
912			}
913		})) {
914			Ok(state) => state,
915			// Closed: discard the Ref so its MutexGuard doesn't escape this call.
916			Err(_) => return Poll::Ready(None),
917		};
918		Poll::Ready(Some(state.take().expect("predicate guaranteed an update")))
919	}
920
921	/// Returns the next (un)announced broadcast and the absolute path without blocking.
922	///
923	/// Returns None if there is no update available; NOT because the consumer is closed.
924	/// You have to use `is_closed` to check if the consumer is closed.
925	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
926		self.state.write().ok()?.take()
927	}
928
929	/// Create another consumer with its own announcement cursor over the same origin.
930	pub fn consume(&self) -> Self {
931		self.clone()
932	}
933
934	/// Get a broadcast by path if it has *already* been announced.
935	///
936	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
937	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
938	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
939	/// (blocks until announced) unless you can guarantee the announcement has already
940	/// landed (e.g. you're responding to an `announced()` callback).
941	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
942		let path = path.as_path();
943		let (root, rest) = self.nodes.get(&path)?;
944		let state = root.lock();
945		state.consume_broadcast(&rest)
946	}
947
948	/// Block until a broadcast with the given path is announced and return it.
949	///
950	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
951	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
952	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
953	///
954	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
955	/// cannot guarantee the announcement has already been received.
956	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
957		let path = path.as_path();
958
959		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
960		let mut consumer = self.scope(std::slice::from_ref(&path))?;
961
962		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
963		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
964		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
965		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
966			return None;
967		}
968
969		loop {
970			let (announced, broadcast) = consumer.announced().await?;
971			// `scope` narrows by prefix, but we only want an exact-path match.
972			if announced.as_path() == path {
973				if let Some(broadcast) = broadcast {
974					return Some(broadcast);
975				}
976			}
977		}
978	}
979
980	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
981	///
982	/// Returns None if there are no legal prefixes (the requested prefixes are
983	/// disjoint from this consumer's current scope, so it would always return None).
984	// TODO accept PathPrefixes instead of &[Path]
985	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
986		let prefixes = PathPrefixes::new(prefixes);
987		Some(OriginConsumer::new(
988			self.info,
989			self.root.clone(),
990			self.nodes.select(&prefixes)?,
991			self.dynamic.clone(),
992		))
993	}
994
995	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
996	///
997	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
998	/// already used without a wildcard.
999	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
1000		let prefix = prefix.as_path();
1001
1002		Some(Self::new(
1003			self.info,
1004			self.root.join(&prefix).to_owned(),
1005			self.nodes.root(&prefix)?,
1006			self.dynamic.clone(),
1007		))
1008	}
1009
1010	/// Get a broadcast by path, falling back to a dynamic request when it is not announced.
1011	///
1012	/// Returns a [`kio::Pending`] future (resolved synchronously for an announced broadcast,
1013	/// otherwise once a handler serves it). The lookup order is: an already-announced broadcast
1014	/// resolves immediately; otherwise, if an [`OriginDynamic`] handler is live (see
1015	/// [`OriginProducer::dynamic`]), a fallback request is registered and the future resolves
1016	/// when the handler [`accept`](BroadcastRequest::accept)s it (or errors if it
1017	/// [`reject`](BroadcastRequest::reject)s or every handler drops). Concurrent requests for
1018	/// the same unannounced path coalesce onto one handler request.
1019	///
1020	/// The returned future resolves to [`Error::Unroutable`] when the path is not announced and no
1021	/// dynamic handler exists, or [`Error::Dropped`] once the origin is gone. A request that is
1022	/// registered while a handler is live but then loses every handler before being served also
1023	/// resolves to [`Error::Unroutable`]. Unlike an announced broadcast, a dynamically served one
1024	/// is never visible to [`Self::announced`].
1025	pub fn request_broadcast(&self, path: impl AsPath) -> kio::Pending<BroadcastRequested> {
1026		let path = path.as_path();
1027
1028		// Prefer a live announcement when one is present; the dynamic queue is only a fallback.
1029		if let Some(broadcast) = self.get_broadcast(&path) {
1030			return kio::Pending::new(BroadcastRequested::ready(broadcast));
1031		}
1032
1033		// Key requests by absolute path so a scoped/rooted consumer and the handler
1034		// (which may have a different root) agree on the same entry.
1035		let absolute = self.root.join(&path).to_owned();
1036
1037		let Ok(mut state) = self.dynamic.write() else {
1038			return kio::Pending::new(BroadcastRequested::failed(Error::Dropped));
1039		};
1040
1041		// Coalesce onto a queued request for the same path; otherwise register a new one.
1042		let consumer = if let Some(producer) = state.requests.get(&absolute) {
1043			producer.consume()
1044		} else {
1045			if state.dynamic == 0 {
1046				return kio::Pending::new(BroadcastRequested::failed(Error::Unroutable));
1047			}
1048
1049			let producer = kio::Producer::<PendingBroadcast>::default();
1050			let consumer = producer.consume();
1051			state.requests.insert(absolute.clone(), producer);
1052			state.request_order.push_back(absolute);
1053			consumer
1054		};
1055
1056		kio::Pending::new(BroadcastRequested::pending(consumer))
1057	}
1058
1059	/// Returns the prefix that is automatically stripped from all paths.
1060	pub fn root(&self) -> &Path<'_> {
1061		&self.root
1062	}
1063
1064	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
1065	// TODO return PathPrefixes
1066	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
1067		self.nodes.nodes.iter().map(|(root, _)| root)
1068	}
1069
1070	/// Converts a relative path to an absolute path.
1071	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
1072		self.root.join(path)
1073	}
1074}
1075
1076impl Drop for OriginConsumer {
1077	fn drop(&mut self) {
1078		for (_, root) in &self.nodes.nodes {
1079			root.lock().unconsume(self.id);
1080		}
1081	}
1082}
1083
1084impl Clone for OriginConsumer {
1085	fn clone(&self) -> Self {
1086		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone(), self.dynamic.clone())
1087	}
1088}
1089
1090/// Shared fallback request queue for an origin.
1091///
1092/// Lives off to the side of the announce tree because dynamically served broadcasts
1093/// are never announced. Mirrors the `dynamic`/`requests`/`request_order` fields of the
1094/// broadcast and track models.
1095#[derive(Default)]
1096struct OriginDynamicState {
1097	// Result channels for queued requests, keyed by absolute path. Concurrent
1098	// `request_broadcast` calls for the same path coalesce onto the same channel while
1099	// it is queued. The producer is moved out (and the entry removed) when the handler
1100	// picks the request up via [`OriginDynamic::requested_broadcast`].
1101	requests: HashMap<PathOwned, kio::Producer<PendingBroadcast>>,
1102
1103	// Requested paths in FIFO order for the handler to drain.
1104	request_order: VecDeque<PathOwned>,
1105
1106	// The number of live `OriginDynamic` handlers. While zero, `request_broadcast`
1107	// fails fast with `Unroutable` rather than queueing a request nobody will serve.
1108	dynamic: usize,
1109}
1110
1111impl OriginDynamicState {
1112	/// Drop every queued request, closing its result channel so awaiting requesters
1113	/// resolve to an error. Called when the last handler goes away.
1114	fn reject_requests(&mut self) {
1115		self.requests.clear();
1116		self.request_order.clear();
1117	}
1118}
1119
1120/// One-shot result of a dynamic broadcast request.
1121///
1122/// Stays `None` until a handler [`accept`](BroadcastRequest::accept)s (yielding the served
1123/// broadcast) or [`reject`](BroadcastRequest::reject)s (yielding an error). The producer is
1124/// dropped right after writing, closing the channel; kio checks the value before the closed
1125/// flag, so an awaiting requester still observes the final result.
1126#[derive(Default)]
1127struct PendingBroadcast {
1128	resolved: Option<Result<BroadcastConsumer, Error>>,
1129}
1130
1131/// Picks up [`OriginConsumer::request_broadcast`] calls for paths that are not announced.
1132///
1133/// The origin-level analogue of [`crate::BroadcastDynamic`]: where that serves tracks on
1134/// demand within a broadcast, this serves whole broadcasts on demand within an origin. A
1135/// relay uses it as a fallback router, fetching a broadcast from upstream only when a
1136/// downstream consumer asks for an exact path that nobody announced.
1137///
1138/// Served broadcasts are deliberately *not* announced, so they never appear in
1139/// [`OriginConsumer::announced`]. Drop this handle (and every clone) to reject the
1140/// requests still waiting to be served.
1141pub struct OriginDynamic {
1142	info: Origin,
1143	root: PathOwned,
1144	state: kio::Producer<OriginDynamicState>,
1145}
1146
1147impl Clone for OriginDynamic {
1148	fn clone(&self) -> Self {
1149		// Mirror `new`: bump `dynamic` so each live handle is counted. Without this,
1150		// dropping a clone would decrement past `new`'s increment and prematurely flip
1151		// `dynamic` to zero, making future `request_broadcast` calls return `Unroutable`.
1152		if let Ok(mut state) = self.state.write() {
1153			state.dynamic += 1;
1154		}
1155
1156		Self {
1157			info: self.info,
1158			root: self.root.clone(),
1159			state: self.state.clone(),
1160		}
1161	}
1162}
1163
1164impl OriginDynamic {
1165	fn new(info: Origin, root: PathOwned, state: kio::Producer<OriginDynamicState>) -> Self {
1166		if let Ok(mut state) = state.write() {
1167			state.dynamic += 1;
1168		}
1169
1170		Self { info, root, state }
1171	}
1172
1173	/// The origin this handler belongs to.
1174	pub fn info(&self) -> &Origin {
1175		&self.info
1176	}
1177
1178	// Gate readiness on a queued request; mutate through the returned `Mut`.
1179	fn poll<F>(&self, waiter: &kio::Waiter, f: F) -> Poll<Result<kio::Mut<'_, OriginDynamicState>, Error>>
1180	where
1181		F: FnMut(&kio::Ref<'_, OriginDynamicState>) -> Poll<()>,
1182	{
1183		Poll::Ready(match ready!(self.state.poll(waiter, f)) {
1184			Ok(state) => Ok(state),
1185			Err(_) => Err(Error::Dropped),
1186		})
1187	}
1188
1189	/// Poll for the next requested broadcast, without blocking.
1190	pub fn poll_requested_broadcast(&mut self, waiter: &kio::Waiter) -> Poll<Result<BroadcastRequest, Error>> {
1191		let mut state = ready!(self.poll(waiter, |state| {
1192			if state.request_order.is_empty() {
1193				Poll::Pending
1194			} else {
1195				Poll::Ready(())
1196			}
1197		}))?;
1198
1199		let path = state.request_order.pop_front().expect("predicate guaranteed a request");
1200		let producer = state.requests.remove(&path).expect("request_order out of sync");
1201		Poll::Ready(Ok(BroadcastRequest { path, producer }))
1202	}
1203
1204	/// Block until a consumer requests an unannounced broadcast, returning a
1205	/// [`BroadcastRequest`] to serve.
1206	pub async fn requested_broadcast(&mut self) -> Result<BroadcastRequest, Error> {
1207		kio::wait(|waiter| self.poll_requested_broadcast(waiter)).await
1208	}
1209
1210	/// Returns the prefix that is automatically stripped from requested paths.
1211	pub fn root(&self) -> &Path<'_> {
1212		&self.root
1213	}
1214}
1215
1216impl Drop for OriginDynamic {
1217	fn drop(&mut self) {
1218		if let Ok(mut state) = self.state.write() {
1219			// Saturating sub so `OriginProducer::dynamic` can stay infallible.
1220			state.dynamic = state.dynamic.saturating_sub(1);
1221			if state.dynamic == 0 {
1222				// No handlers left to fulfill queued requests; close them.
1223				state.reject_requests();
1224			}
1225		}
1226	}
1227}
1228
1229/// A pending request for a broadcast that was not announced.
1230///
1231/// Yielded by [`OriginDynamic::requested_broadcast`]. The requester is awaiting inside
1232/// [`OriginConsumer::request_broadcast`]; [`accept`](Self::accept) resolves it with a live
1233/// broadcast (which the handler keeps producing into) and [`reject`](Self::reject) resolves
1234/// it with an error. Dropping the request without either rejects it.
1235pub struct BroadcastRequest {
1236	// Absolute path that was requested.
1237	path: PathOwned,
1238
1239	// Result channel back to the awaiting requester(s). Writing `resolved` and dropping
1240	// this wakes them with the outcome.
1241	producer: kio::Producer<PendingBroadcast>,
1242}
1243
1244impl BroadcastRequest {
1245	/// The absolute path that was requested.
1246	pub fn path(&self) -> &Path<'_> {
1247		&self.path
1248	}
1249
1250	/// Accept the request, resolving every awaiting requester with `broadcast`.
1251	///
1252	/// The caller keeps producing into `broadcast` (e.g. a relay proxying tracks from
1253	/// upstream); the requesters receive a consumer for it. The broadcast is *not*
1254	/// announced.
1255	pub fn accept(self, broadcast: BroadcastConsumer) {
1256		if let Ok(mut state) = self.producer.write() {
1257			state.resolved = Some(Ok(broadcast));
1258		}
1259		// `self.producer` drops here, closing the channel; the value is still observable.
1260	}
1261
1262	/// Reject the request, resolving every awaiting requester with `err`.
1263	pub fn reject(self, err: Error) {
1264		if let Ok(mut state) = self.producer.write() {
1265			state.resolved = Some(Err(err));
1266		}
1267	}
1268}
1269
1270/// The pollable result of [`OriginConsumer::request_broadcast`].
1271///
1272/// Awaited via the [`kio::Pending`] wrapper; resolves to the [`BroadcastConsumer`]
1273/// immediately when the broadcast was already announced, or once an [`OriginDynamic`]
1274/// handler serves the request. Resolves to an error if the request is rejected or every
1275/// handler drops before serving it.
1276pub struct BroadcastRequested {
1277	inner: Requested,
1278}
1279
1280enum Requested {
1281	// Already announced: resolves immediately with a clone of this broadcast.
1282	Ready(BroadcastConsumer),
1283	// Unroutable at request time, or the origin was already dropped: resolves immediately
1284	// with this error. Baked in so `request_broadcast` itself stays infallible.
1285	Failed(Error),
1286	// Awaiting a handler: resolves when the request's result channel is written.
1287	Pending(kio::Consumer<PendingBroadcast>),
1288}
1289
1290impl BroadcastRequested {
1291	fn ready(broadcast: BroadcastConsumer) -> Self {
1292		Self {
1293			inner: Requested::Ready(broadcast),
1294		}
1295	}
1296
1297	fn failed(error: Error) -> Self {
1298		Self {
1299			inner: Requested::Failed(error),
1300		}
1301	}
1302
1303	fn pending(consumer: kio::Consumer<PendingBroadcast>) -> Self {
1304		Self {
1305			inner: Requested::Pending(consumer),
1306		}
1307	}
1308
1309	/// Poll for the requested broadcast without blocking.
1310	pub fn poll_ok(&self, waiter: &kio::Waiter) -> Poll<Result<BroadcastConsumer, Error>> {
1311		match &self.inner {
1312			Requested::Ready(broadcast) => Poll::Ready(Ok(broadcast.clone())),
1313			Requested::Failed(error) => Poll::Ready(Err(error.clone())),
1314			Requested::Pending(consumer) => Poll::Ready(
1315				match ready!(consumer.poll(waiter, |state| match &state.resolved {
1316					Some(result) => Poll::Ready(result.clone()),
1317					None => Poll::Pending,
1318				})) {
1319					Ok(result) => result,
1320					// Every handler dropped without resolving: nobody could route it.
1321					Err(_closed) => Err(Error::Unroutable),
1322				},
1323			),
1324		}
1325	}
1326}
1327
1328impl kio::Future for BroadcastRequested {
1329	type Output = Result<BroadcastConsumer, Error>;
1330
1331	fn poll(&self, waiter: &kio::Waiter) -> Poll<Self::Output> {
1332		self.poll_ok(waiter)
1333	}
1334}
1335
1336#[cfg(test)]
1337use futures::FutureExt;
1338
1339#[cfg(test)]
1340impl OriginConsumer {
1341	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1342		let expected = expected.as_path();
1343		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1344		assert_eq!(path, expected, "wrong path");
1345		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1346	}
1347
1348	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
1349		let expected = expected.as_path();
1350		let (path, active) = self.try_announced().expect("no next");
1351		assert_eq!(path, expected, "wrong path");
1352		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
1353	}
1354
1355	pub fn assert_next_none(&mut self, expected: impl AsPath) {
1356		let expected = expected.as_path();
1357		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
1358		assert_eq!(path, expected, "wrong path");
1359		assert!(active.is_none(), "should be unannounced");
1360	}
1361
1362	pub fn assert_next_wait(&mut self) {
1363		if let Some(res) = self.announced().now_or_never() {
1364			panic!("next should block: got {:?}", res.map(|(path, _)| path));
1365		}
1366	}
1367
1368	/*
1369	pub fn assert_next_closed(&mut self) {
1370		assert!(
1371			self.announced().now_or_never().expect("next blocked").is_none(),
1372			"next should be closed"
1373		);
1374	}
1375	*/
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380	use futures::FutureExt;
1381
1382	use crate::Broadcast;
1383
1384	use super::*;
1385
1386	#[test]
1387	fn origin_list_push_fails_at_limit() {
1388		let mut list = OriginList::new();
1389		for _ in 0..MAX_HOPS {
1390			list.push(Origin::random()).unwrap();
1391		}
1392		assert_eq!(list.len(), MAX_HOPS);
1393		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
1394	}
1395
1396	#[test]
1397	fn origin_list_replace_first() {
1398		let mut list = OriginList::new();
1399		for _ in 0..3 {
1400			list.push(Origin::UNKNOWN).unwrap();
1401		}
1402
1403		// Rewrites only the first placeholder, keeping the length the same.
1404		assert!(list.replace_first(Origin::UNKNOWN, Origin::from(7)));
1405		assert_eq!(list.as_slice(), &[Origin::from(7), Origin::UNKNOWN, Origin::UNKNOWN]);
1406
1407		// No match leaves the list untouched.
1408		assert!(!list.replace_first(Origin::from(99), Origin::from(8)));
1409		assert_eq!(list.len(), 3);
1410	}
1411
1412	#[test]
1413	fn origin_list_try_from_vec_enforces_limit() {
1414		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
1415		assert!(OriginList::try_from(under).is_ok());
1416
1417		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
1418		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
1419	}
1420
1421	#[tokio::test]
1422	async fn test_announce() {
1423		tokio::time::pause();
1424
1425		let origin = Origin::random().produce();
1426		let broadcast1 = Broadcast::new().produce();
1427		let broadcast2 = Broadcast::new().produce();
1428
1429		let mut consumer1 = origin.consume();
1430		// Make a new consumer that should get it.
1431		consumer1.assert_next_wait();
1432
1433		// Publish the first broadcast.
1434		origin.publish_broadcast("test1", broadcast1.consume());
1435
1436		consumer1.assert_next("test1", &broadcast1.consume());
1437		consumer1.assert_next_wait();
1438
1439		// Make a new consumer that should get the existing broadcast.
1440		// But we don't consume it yet.
1441		let mut consumer2 = origin.consume();
1442
1443		// Publish the second broadcast.
1444		origin.publish_broadcast("test2", broadcast2.consume());
1445
1446		consumer1.assert_next("test2", &broadcast2.consume());
1447		consumer1.assert_next_wait();
1448
1449		consumer2.assert_next("test1", &broadcast1.consume());
1450		consumer2.assert_next("test2", &broadcast2.consume());
1451		consumer2.assert_next_wait();
1452
1453		// Close the first broadcast.
1454		drop(broadcast1);
1455
1456		// Wait for the async task to run.
1457		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1458
1459		// All consumers should get a None now.
1460		consumer1.assert_next_none("test1");
1461		consumer2.assert_next_none("test1");
1462		consumer1.assert_next_wait();
1463		consumer2.assert_next_wait();
1464
1465		// And a new consumer only gets the last broadcast.
1466		let mut consumer3 = origin.consume();
1467		consumer3.assert_next("test2", &broadcast2.consume());
1468		consumer3.assert_next_wait();
1469
1470		// Close the other producer and make sure it cleans up
1471		drop(broadcast2);
1472
1473		// Wait for the async task to run.
1474		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1475
1476		consumer1.assert_next_none("test2");
1477		consumer2.assert_next_none("test2");
1478		consumer3.assert_next_none("test2");
1479
1480		/* TODO close the origin consumer when the producer is dropped
1481		consumer1.assert_next_closed();
1482		consumer2.assert_next_closed();
1483		consumer3.assert_next_closed();
1484		*/
1485	}
1486
1487	#[tokio::test]
1488	async fn test_duplicate() {
1489		tokio::time::pause();
1490
1491		let origin = Origin::random().produce();
1492
1493		let broadcast1 = Broadcast::new().produce();
1494		let broadcast2 = Broadcast::new().produce();
1495		let broadcast3 = Broadcast::new().produce();
1496
1497		let consumer1 = broadcast1.consume();
1498		let consumer2 = broadcast2.consume();
1499		let consumer3 = broadcast3.consume();
1500
1501		let mut consumer = origin.consume();
1502
1503		origin.publish_broadcast("test", consumer1.clone());
1504		origin.publish_broadcast("test", consumer2.clone());
1505		origin.publish_broadcast("test", consumer3.clone());
1506		assert!(consumer.get_broadcast("test").is_some());
1507
1508		// Identical (empty) hop chains tie on the deterministic key, so the first publish
1509		// stays active and the rest queue as backups. No churn, no reannounce.
1510		consumer.assert_next("test", &consumer1);
1511		consumer.assert_next_wait();
1512
1513		// Drop a backup, nothing should change.
1514		drop(broadcast2);
1515
1516		// Wait for the async task to run.
1517		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1518
1519		assert!(consumer.get_broadcast("test").is_some());
1520		consumer.assert_next_wait();
1521
1522		// Drop the active, we should reannounce with the remaining backup.
1523		drop(broadcast1);
1524
1525		// Wait for the async task to run.
1526		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1527
1528		assert!(consumer.get_broadcast("test").is_some());
1529		consumer.assert_next_none("test");
1530		consumer.assert_next("test", &consumer3);
1531
1532		// Drop the final broadcast, we should unannounce.
1533		drop(broadcast3);
1534
1535		// Wait for the async task to run.
1536		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1537		assert!(consumer.get_broadcast("test").is_none());
1538
1539		consumer.assert_next_none("test");
1540		consumer.assert_next_wait();
1541	}
1542
1543	#[tokio::test]
1544	async fn test_duplicate_reverse() {
1545		tokio::time::pause();
1546
1547		let origin = Origin::random().produce();
1548		let broadcast1 = Broadcast::new().produce();
1549		let broadcast2 = Broadcast::new().produce();
1550
1551		origin.publish_broadcast("test", broadcast1.consume());
1552		origin.publish_broadcast("test", broadcast2.consume());
1553		assert!(origin.consume().get_broadcast("test").is_some());
1554
1555		// This is harder, dropping the new broadcast first.
1556		drop(broadcast2);
1557
1558		// Wait for the cleanup async task to run.
1559		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1560		assert!(origin.consume().get_broadcast("test").is_some());
1561
1562		drop(broadcast1);
1563
1564		// Wait for the cleanup async task to run.
1565		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1566		assert!(origin.consume().get_broadcast("test").is_none());
1567	}
1568
1569	#[tokio::test]
1570	async fn test_deterministic_tiebreak() {
1571		tokio::time::pause();
1572
1573		// Build a broadcast carrying a specific hop chain.
1574		fn route(ids: &[u64]) -> BroadcastProducer {
1575			let hops = OriginList::try_from(ids.iter().copied().map(Origin::from).collect::<Vec<_>>()).unwrap();
1576			Broadcast { hops }.produce()
1577		}
1578
1579		// Resolve the active route for "test" after publishing both routes in the given order.
1580		fn winner(first: &[u64], second: &[u64]) -> OriginList {
1581			let origin = Origin::random().produce();
1582			let a = route(first);
1583			let b = route(second);
1584			origin.publish_broadcast("test", a.consume());
1585			origin.publish_broadcast("test", b.consume());
1586			let hops = origin.consume().get_broadcast("test").unwrap().hops.clone();
1587			// Keep the producers alive until after we read the active route.
1588			drop((a, b));
1589			hops
1590		}
1591
1592		// Two routes with equal hop counts but distinct chains. The winner is decided by
1593		// the deterministic key, not arrival order, so both publish orders converge.
1594		let forward = winner(&[10, 20], &[30, 40]);
1595		let reverse = winner(&[30, 40], &[10, 20]);
1596		assert_eq!(forward, reverse, "tie-break must not depend on publish order");
1597
1598		// A strictly shorter chain always wins regardless of the hash.
1599		assert_eq!(winner(&[10, 20], &[30]).len(), 1);
1600		assert_eq!(winner(&[30], &[10, 20]).len(), 1);
1601	}
1602
1603	#[tokio::test]
1604	async fn test_double_publish() {
1605		tokio::time::pause();
1606
1607		let origin = Origin::random().produce();
1608		let broadcast = Broadcast::new().produce();
1609
1610		// Ensure it doesn't crash.
1611		origin.publish_broadcast("test", broadcast.consume());
1612		origin.publish_broadcast("test", broadcast.consume());
1613
1614		assert!(origin.consume().get_broadcast("test").is_some());
1615
1616		drop(broadcast);
1617
1618		// Wait for the async task to run.
1619		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1620		assert!(origin.consume().get_broadcast("test").is_none());
1621	}
1622	// A previous mpsc-based implementation could only deliver the first 127 broadcasts
1623	// instantly via `assert_next` (which uses `now_or_never`). The kio-backed
1624	// implementation polls synchronously and can deliver all of them without yielding.
1625	// Names are zero-padded so lexicographic delivery order matches the loop index.
1626	#[tokio::test]
1627	async fn test_many_announces() {
1628		let origin = Origin::random().produce();
1629		let broadcast = Broadcast::new().produce();
1630
1631		let mut consumer = origin.consume();
1632		for i in 0..256 {
1633			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1634		}
1635
1636		for i in 0..256 {
1637			consumer.assert_next(format!("test{i:03}"), &broadcast.consume());
1638		}
1639		consumer.assert_next_wait();
1640	}
1641
1642	#[tokio::test]
1643	async fn test_many_announces_try() {
1644		let origin = Origin::random().produce();
1645		let broadcast = Broadcast::new().produce();
1646
1647		let mut consumer = origin.consume();
1648		for i in 0..256 {
1649			origin.publish_broadcast(format!("test{i:03}"), broadcast.consume());
1650		}
1651
1652		for i in 0..256 {
1653			consumer.assert_try_next(format!("test{i:03}"), &broadcast.consume());
1654		}
1655	}
1656
1657	#[tokio::test]
1658	async fn test_with_root_basic() {
1659		let origin = Origin::random().produce();
1660		let broadcast = Broadcast::new().produce();
1661
1662		// Create a producer with root "/foo"
1663		let foo_producer = origin.with_root("foo").expect("should create root");
1664		assert_eq!(foo_producer.root().as_str(), "foo");
1665
1666		let mut consumer = origin.consume();
1667
1668		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1669		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1670		// The original consumer should see the full path
1671		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1672
1673		// A consumer created from the rooted producer should see the stripped path
1674		let mut foo_consumer = foo_producer.consume();
1675		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1676	}
1677
1678	#[tokio::test]
1679	async fn test_with_root_nested() {
1680		let origin = Origin::random().produce();
1681		let broadcast = Broadcast::new().produce();
1682
1683		// Create nested roots
1684		let foo_producer = origin.with_root("foo").expect("should create foo root");
1685		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1686		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1687
1688		let mut consumer = origin.consume();
1689
1690		// Publishing to "baz" should actually publish to "foo/bar/baz"
1691		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1692		// The original consumer sees the full path
1693		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1694
1695		// Consumer from foo_bar_producer sees just "baz"
1696		let mut foo_bar_consumer = foo_bar_producer.consume();
1697		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1698	}
1699
1700	#[tokio::test]
1701	async fn test_publish_scope_allows() {
1702		let origin = Origin::random().produce();
1703		let broadcast = Broadcast::new().produce();
1704
1705		// Create a producer that can only publish to "allowed" paths
1706		let limited_producer = origin
1707			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1708			.expect("should create limited producer");
1709
1710		// Should be able to publish to allowed paths
1711		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1712		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1713		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1714
1715		// Should not be able to publish to disallowed paths
1716		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1717		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1718		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1719	}
1720
1721	#[tokio::test]
1722	async fn test_publish_scope_empty() {
1723		let origin = Origin::random().produce();
1724
1725		// Creating a producer with no allowed paths should return None
1726		assert!(origin.scope(&[]).is_none());
1727	}
1728
1729	#[tokio::test]
1730	async fn test_consume_scope_filters() {
1731		let origin = Origin::random().produce();
1732		let broadcast1 = Broadcast::new().produce();
1733		let broadcast2 = Broadcast::new().produce();
1734		let broadcast3 = Broadcast::new().produce();
1735
1736		let mut consumer = origin.consume();
1737
1738		// Publish to different paths
1739		origin.publish_broadcast("allowed", broadcast1.consume());
1740		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1741		origin.publish_broadcast("notallowed", broadcast3.consume());
1742
1743		// Create a consumer that only sees "allowed" paths
1744		let mut limited_consumer = origin
1745			.consume()
1746			.scope(&["allowed".into()])
1747			.expect("should create limited consumer");
1748
1749		// Should only receive broadcasts under "allowed"
1750		limited_consumer.assert_next("allowed", &broadcast1.consume());
1751		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1752		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1753
1754		// Unscoped consumer should see all
1755		consumer.assert_next("allowed", &broadcast1.consume());
1756		consumer.assert_next("allowed/nested", &broadcast2.consume());
1757		consumer.assert_next("notallowed", &broadcast3.consume());
1758	}
1759
1760	#[tokio::test]
1761	async fn test_consume_scope_multiple_prefixes() {
1762		let origin = Origin::random().produce();
1763		let broadcast1 = Broadcast::new().produce();
1764		let broadcast2 = Broadcast::new().produce();
1765		let broadcast3 = Broadcast::new().produce();
1766
1767		origin.publish_broadcast("foo/test", broadcast1.consume());
1768		origin.publish_broadcast("bar/test", broadcast2.consume());
1769		origin.publish_broadcast("baz/test", broadcast3.consume());
1770
1771		// Consumer that only sees "foo" and "bar" paths
1772		let mut limited_consumer = origin
1773			.consume()
1774			.scope(&["foo".into(), "bar".into()])
1775			.expect("should create limited consumer");
1776
1777		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1778		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1779		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1780		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1781	}
1782
1783	#[tokio::test]
1784	async fn test_with_root_and_publish_scope() {
1785		let origin = Origin::random().produce();
1786		let broadcast = Broadcast::new().produce();
1787
1788		// User connects to /foo root
1789		let foo_producer = origin.with_root("foo").expect("should create foo root");
1790
1791		// Limit them to publish only to "bar" and "goop/pee" within /foo
1792		let limited_producer = foo_producer
1793			.scope(&["bar".into(), "goop/pee".into()])
1794			.expect("should create limited producer");
1795
1796		let mut consumer = origin.consume();
1797
1798		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1799		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1800		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1801		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1802		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1803
1804		// Should not be able to publish outside allowed paths
1805		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1806		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1807		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1808
1809		// Original consumer sees full paths
1810		consumer.assert_next("foo/bar", &broadcast.consume());
1811		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1812		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1813		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1814	}
1815
1816	#[tokio::test]
1817	async fn test_with_root_and_consume_scope() {
1818		let origin = Origin::random().produce();
1819		let broadcast1 = Broadcast::new().produce();
1820		let broadcast2 = Broadcast::new().produce();
1821		let broadcast3 = Broadcast::new().produce();
1822
1823		// Publish broadcasts
1824		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1825		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1826		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1827
1828		// User connects to /foo root
1829		let foo_producer = origin.with_root("foo").expect("should create foo root");
1830
1831		// Create consumer limited to "bar" and "goop/pee" within /foo
1832		let mut limited_consumer = foo_producer
1833			.consume()
1834			.scope(&["bar".into(), "goop/pee".into()])
1835			.expect("should create limited consumer");
1836
1837		// Should only see allowed paths (without foo prefix)
1838		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1839		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1840		limited_consumer.assert_next_wait(); // Should not see "other/test"
1841	}
1842
1843	#[tokio::test]
1844	async fn test_with_root_unauthorized() {
1845		let origin = Origin::random().produce();
1846
1847		// First limit the producer to specific paths
1848		let limited_producer = origin
1849			.scope(&["allowed".into()])
1850			.expect("should create limited producer");
1851
1852		// Trying to create a root outside allowed paths should fail
1853		assert!(limited_producer.with_root("notallowed").is_none());
1854
1855		// But creating a root within allowed paths should work
1856		let allowed_root = limited_producer
1857			.with_root("allowed")
1858			.expect("should create allowed root");
1859		assert_eq!(allowed_root.root().as_str(), "allowed");
1860	}
1861
1862	#[tokio::test]
1863	async fn test_wildcard_permission() {
1864		let origin = Origin::random().produce();
1865		let broadcast = Broadcast::new().produce();
1866
1867		// Producer with root access (empty string means wildcard)
1868		let root_producer = origin.clone();
1869
1870		// Should be able to publish anywhere
1871		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1872		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1873
1874		// Can create any root
1875		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1876		assert_eq!(foo_producer.root().as_str(), "foo");
1877	}
1878
1879	#[tokio::test]
1880	async fn test_consume_broadcast_with_permissions() {
1881		let origin = Origin::random().produce();
1882		let broadcast1 = Broadcast::new().produce();
1883		let broadcast2 = Broadcast::new().produce();
1884
1885		origin.publish_broadcast("allowed/test", broadcast1.consume());
1886		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1887
1888		// Create limited consumer
1889		let limited_consumer = origin
1890			.consume()
1891			.scope(&["allowed".into()])
1892			.expect("should create limited consumer");
1893
1894		// Should be able to get allowed broadcast
1895		let result = limited_consumer.get_broadcast("allowed/test");
1896		assert!(result.is_some());
1897		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1898
1899		// Should not be able to get disallowed broadcast
1900		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1901
1902		// Original consumer can get both
1903		let consumer = origin.consume();
1904		assert!(consumer.get_broadcast("allowed/test").is_some());
1905		assert!(consumer.get_broadcast("notallowed/test").is_some());
1906	}
1907
1908	#[tokio::test]
1909	async fn test_nested_paths_with_permissions() {
1910		let origin = Origin::random().produce();
1911		let broadcast = Broadcast::new().produce();
1912
1913		// Create producer limited to "a/b/c"
1914		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1915
1916		// Should be able to publish to exact path and nested paths
1917		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1918		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1919		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1920
1921		// Should not be able to publish to parent or sibling paths
1922		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1923		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1924		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1925	}
1926
1927	#[tokio::test]
1928	async fn test_multiple_consumers_with_different_permissions() {
1929		let origin = Origin::random().produce();
1930		let broadcast1 = Broadcast::new().produce();
1931		let broadcast2 = Broadcast::new().produce();
1932		let broadcast3 = Broadcast::new().produce();
1933
1934		// Publish to different paths
1935		origin.publish_broadcast("foo/test", broadcast1.consume());
1936		origin.publish_broadcast("bar/test", broadcast2.consume());
1937		origin.publish_broadcast("baz/test", broadcast3.consume());
1938
1939		// Create consumers with different permissions
1940		let mut foo_consumer = origin
1941			.consume()
1942			.scope(&["foo".into()])
1943			.expect("should create foo consumer");
1944
1945		let mut bar_consumer = origin
1946			.consume()
1947			.scope(&["bar".into()])
1948			.expect("should create bar consumer");
1949
1950		let mut foobar_consumer = origin
1951			.consume()
1952			.scope(&["foo".into(), "bar".into()])
1953			.expect("should create foobar consumer");
1954
1955		// Each consumer should only see their allowed paths
1956		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1957		foo_consumer.assert_next_wait();
1958
1959		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1960		bar_consumer.assert_next_wait();
1961
1962		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1963		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1964		foobar_consumer.assert_next_wait();
1965	}
1966
1967	#[tokio::test]
1968	async fn test_select_with_empty_prefix() {
1969		let origin = Origin::random().produce();
1970		let broadcast1 = Broadcast::new().produce();
1971		let broadcast2 = Broadcast::new().produce();
1972
1973		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1974		let demo_producer = origin.with_root("demo").expect("should create demo root");
1975		let limited_producer = demo_producer
1976			.scope(&["worm-node".into(), "foobar".into()])
1977			.expect("should create limited producer");
1978
1979		// Publish some broadcasts
1980		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1981		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1982
1983		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1984		let mut consumer = limited_producer
1985			.consume()
1986			.scope(&["".into()])
1987			.expect("should create consumer with empty prefix");
1988
1989		// Should see both broadcasts (order depends on PathPrefixes sort)
1990		let a1 = consumer.try_announced().expect("expected first announcement");
1991		let a2 = consumer.try_announced().expect("expected second announcement");
1992		consumer.assert_next_wait();
1993
1994		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1995		paths.sort();
1996		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1997	}
1998
1999	#[tokio::test]
2000	async fn test_select_narrowing_scope() {
2001		let origin = Origin::random().produce();
2002		let broadcast1 = Broadcast::new().produce();
2003		let broadcast2 = Broadcast::new().produce();
2004		let broadcast3 = Broadcast::new().produce();
2005
2006		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
2007		let demo_producer = origin.with_root("demo").expect("should create demo root");
2008		let limited_producer = demo_producer
2009			.scope(&["worm-node".into(), "foobar".into()])
2010			.expect("should create limited producer");
2011
2012		// Publish broadcasts at different levels
2013		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
2014		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
2015		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
2016
2017		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
2018		let mut worm_consumer = limited_producer
2019			.consume()
2020			.scope(&["worm-node".into()])
2021			.expect("should create worm-node consumer");
2022
2023		// Should see worm-node content with paths stripped to ""
2024		worm_consumer.assert_next("worm-node", &broadcast1.consume());
2025		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
2026		worm_consumer.assert_next_wait(); // Should NOT see foobar content
2027
2028		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
2029		let mut foo_consumer = limited_producer
2030			.consume()
2031			.scope(&["worm-node/foo".into()])
2032			.expect("should create worm-node/foo consumer");
2033
2034		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
2035		foo_consumer.assert_next_wait(); // Should NOT see other content
2036	}
2037
2038	#[tokio::test]
2039	async fn test_select_multiple_roots_with_empty_prefix() {
2040		let origin = Origin::random().produce();
2041		let broadcast1 = Broadcast::new().produce();
2042		let broadcast2 = Broadcast::new().produce();
2043		let broadcast3 = Broadcast::new().produce();
2044
2045		// Producer with multiple allowed roots
2046		let limited_producer = origin
2047			.scope(&["app1".into(), "app2".into(), "shared".into()])
2048			.expect("should create limited producer");
2049
2050		// Publish to each root
2051		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
2052		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
2053		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
2054
2055		// scope with empty prefix should maintain all roots
2056		let mut consumer = limited_producer
2057			.consume()
2058			.scope(&["".into()])
2059			.expect("should create consumer with empty prefix");
2060
2061		// Should see all broadcasts from all roots
2062		consumer.assert_next("app1/data", &broadcast1.consume());
2063		consumer.assert_next("app2/config", &broadcast2.consume());
2064		consumer.assert_next("shared/resource", &broadcast3.consume());
2065		consumer.assert_next_wait();
2066	}
2067
2068	#[tokio::test]
2069	async fn test_publish_scope_with_empty_prefix() {
2070		let origin = Origin::random().produce();
2071		let broadcast = Broadcast::new().produce();
2072
2073		// Producer with specific allowed paths
2074		let limited_producer = origin
2075			.scope(&["services/api".into(), "services/web".into()])
2076			.expect("should create limited producer");
2077
2078		// scope with empty prefix should keep the same restrictions
2079		let same_producer = limited_producer
2080			.scope(&["".into()])
2081			.expect("should create producer with empty prefix");
2082
2083		// Should still have the same publishing restrictions
2084		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
2085		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
2086		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
2087		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
2088	}
2089
2090	#[tokio::test]
2091	async fn test_select_narrowing_to_deeper_path() {
2092		let origin = Origin::random().produce();
2093		let broadcast1 = Broadcast::new().produce();
2094		let broadcast2 = Broadcast::new().produce();
2095		let broadcast3 = Broadcast::new().produce();
2096
2097		// Producer with broad permission
2098		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
2099
2100		// Publish at various depths
2101		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
2102		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
2103		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
2104
2105		// Narrow down to team2 only
2106		let mut team2_consumer = limited_producer
2107			.consume()
2108			.scope(&["org/team2".into()])
2109			.expect("should create team2 consumer");
2110
2111		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
2112		team2_consumer.assert_next_wait(); // Should NOT see team1 content
2113
2114		// Further narrow down to team1/project1
2115		let mut project1_consumer = limited_producer
2116			.consume()
2117			.scope(&["org/team1/project1".into()])
2118			.expect("should create project1 consumer");
2119
2120		// Should only see project1 content at root
2121		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
2122		project1_consumer.assert_next_wait();
2123	}
2124
2125	#[tokio::test]
2126	async fn test_select_with_non_matching_prefix() {
2127		let origin = Origin::random().produce();
2128
2129		// Producer with specific allowed paths
2130		let limited_producer = origin
2131			.scope(&["allowed/path".into()])
2132			.expect("should create limited producer");
2133
2134		// Trying to scope with a completely different prefix should return None
2135		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
2136
2137		// Similarly for scope
2138		assert!(limited_producer.scope(&["other/path".into()]).is_none());
2139	}
2140
2141	// Regression test for https://github.com/moq-dev/moq/issues/910
2142	// with_root panics when String has trailing slash (AsPath for String skips normalization)
2143	#[tokio::test]
2144	async fn test_with_root_trailing_slash_consumer() {
2145		let origin = Origin::random().produce();
2146
2147		// Use an owned String so the trailing slash is NOT normalized away.
2148		let prefix = "some_prefix/".to_string();
2149		let mut consumer = origin.consume().with_root(prefix).unwrap();
2150
2151		let b = origin.create_broadcast("some_prefix/test").unwrap();
2152		consumer.assert_next("test", &b.consume());
2153	}
2154
2155	// Same issue but for the producer side of with_root
2156	#[tokio::test]
2157	async fn test_with_root_trailing_slash_producer() {
2158		let origin = Origin::random().produce();
2159
2160		// Use an owned String so the trailing slash is NOT normalized away.
2161		let prefix = "some_prefix/".to_string();
2162		let rooted = origin.with_root(prefix).unwrap();
2163
2164		let b = rooted.create_broadcast("test").unwrap();
2165
2166		let mut consumer = rooted.consume();
2167		consumer.assert_next("test", &b.consume());
2168	}
2169
2170	// Verify unannounce also doesn't panic with trailing slash
2171	#[tokio::test]
2172	async fn test_with_root_trailing_slash_unannounce() {
2173		tokio::time::pause();
2174
2175		let origin = Origin::random().produce();
2176
2177		let prefix = "some_prefix/".to_string();
2178		let mut consumer = origin.consume().with_root(prefix).unwrap();
2179
2180		let b = origin.create_broadcast("some_prefix/test").unwrap();
2181		consumer.assert_next("test", &b.consume());
2182
2183		// Drop the broadcast producer to trigger unannounce
2184		drop(b);
2185		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2186
2187		// unannounce also calls strip_prefix(&self.root).unwrap()
2188		consumer.assert_next_none("test");
2189	}
2190
2191	#[tokio::test]
2192	async fn test_select_maintains_access_with_wider_prefix() {
2193		let origin = Origin::random().produce();
2194		let broadcast1 = Broadcast::new().produce();
2195		let broadcast2 = Broadcast::new().produce();
2196
2197		// Setup: user with root "demo" allowed to subscribe to specific paths
2198		let demo_producer = origin.with_root("demo").expect("should create demo root");
2199		let user_producer = demo_producer
2200			.scope(&["worm-node".into(), "foobar".into()])
2201			.expect("should create user producer");
2202
2203		// Publish some data
2204		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
2205		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
2206
2207		// Key test: scope with "" should maintain access to allowed roots
2208		let mut consumer = user_producer
2209			.consume()
2210			.scope(&["".into()])
2211			.expect("scope with empty prefix should not fail when user has specific permissions");
2212
2213		// Should still receive broadcasts from allowed paths (order not guaranteed)
2214		let a1 = consumer.try_announced().expect("expected first announcement");
2215		let a2 = consumer.try_announced().expect("expected second announcement");
2216		consumer.assert_next_wait();
2217
2218		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
2219		paths.sort();
2220		assert_eq!(paths, ["foobar", "worm-node/data"]);
2221
2222		// Also test that we can still narrow the scope
2223		let mut narrow_consumer = user_producer
2224			.consume()
2225			.scope(&["worm-node".into()])
2226			.expect("should be able to narrow scope to worm-node");
2227
2228		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
2229		narrow_consumer.assert_next_wait(); // Should not see foobar
2230	}
2231
2232	#[tokio::test]
2233	async fn test_duplicate_prefixes_deduped() {
2234		let origin = Origin::random().produce();
2235		let broadcast = Broadcast::new().produce();
2236
2237		// scope with duplicate prefixes should work (deduped internally)
2238		let producer = origin
2239			.scope(&["demo".into(), "demo".into()])
2240			.expect("should create producer");
2241
2242		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
2243
2244		let mut consumer = producer.consume();
2245		consumer.assert_next("demo/stream", &broadcast.consume());
2246		consumer.assert_next_wait();
2247	}
2248
2249	#[tokio::test]
2250	async fn test_overlapping_prefixes_deduped() {
2251		let origin = Origin::random().produce();
2252		let broadcast = Broadcast::new().produce();
2253
2254		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
2255		let producer = origin
2256			.scope(&["demo".into(), "demo/foo".into()])
2257			.expect("should create producer");
2258
2259		// Can still publish under "demo/bar" since "demo" covers everything
2260		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
2261
2262		let mut consumer = producer.consume();
2263		consumer.assert_next("demo/bar/stream", &broadcast.consume());
2264		consumer.assert_next_wait();
2265	}
2266
2267	#[tokio::test]
2268	async fn test_overlapping_prefixes_no_duplicate_announcements() {
2269		let origin = Origin::random().produce();
2270		let broadcast = Broadcast::new().produce();
2271
2272		// Both "demo" and "demo/foo" are requested — should only have one node
2273		let producer = origin
2274			.scope(&["demo".into(), "demo/foo".into()])
2275			.expect("should create producer");
2276
2277		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
2278
2279		let mut consumer = producer.consume();
2280		// Should only get ONE announcement (not two from overlapping nodes)
2281		consumer.assert_next("demo/foo/stream", &broadcast.consume());
2282		consumer.assert_next_wait();
2283	}
2284
2285	#[tokio::test]
2286	async fn test_allowed_returns_deduped_prefixes() {
2287		let origin = Origin::random().produce();
2288
2289		let producer = origin
2290			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
2291			.expect("should create producer");
2292
2293		let allowed: Vec<_> = producer.allowed().collect();
2294		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
2295	}
2296
2297	#[tokio::test]
2298	async fn test_announced_broadcast_already_announced() {
2299		let origin = Origin::random().produce();
2300		let broadcast = Broadcast::new().produce();
2301
2302		origin.publish_broadcast("test", broadcast.consume());
2303
2304		let consumer = origin.consume();
2305		let result = consumer.announced_broadcast("test").await.expect("should find it");
2306		assert!(result.is_clone(&broadcast.consume()));
2307	}
2308
2309	#[tokio::test]
2310	async fn test_announced_broadcast_delayed() {
2311		tokio::time::pause();
2312
2313		let origin = Origin::random().produce();
2314		let broadcast = Broadcast::new().produce();
2315
2316		let consumer = origin.consume();
2317
2318		// Start waiting before it's announced.
2319		let wait = tokio::spawn({
2320			let consumer = consumer.clone();
2321			async move { consumer.announced_broadcast("test").await }
2322		});
2323
2324		// Give the spawned task a chance to subscribe.
2325		tokio::task::yield_now().await;
2326
2327		origin.publish_broadcast("test", broadcast.consume());
2328
2329		let result = wait.await.unwrap().expect("should find it");
2330		assert!(result.is_clone(&broadcast.consume()));
2331	}
2332
2333	#[tokio::test]
2334	async fn test_announced_broadcast_ignores_unrelated_paths() {
2335		tokio::time::pause();
2336
2337		let origin = Origin::random().produce();
2338		let other = Broadcast::new().produce();
2339		let target = Broadcast::new().produce();
2340
2341		let consumer = origin.consume();
2342
2343		let wait = tokio::spawn({
2344			let consumer = consumer.clone();
2345			async move { consumer.announced_broadcast("target").await }
2346		});
2347
2348		tokio::task::yield_now().await;
2349
2350		// Publish an unrelated broadcast first — announced_broadcast should skip it.
2351		origin.publish_broadcast("other", other.consume());
2352		tokio::task::yield_now().await;
2353		assert!(!wait.is_finished(), "must not resolve on unrelated path");
2354
2355		origin.publish_broadcast("target", target.consume());
2356		let result = wait.await.unwrap().expect("should find target");
2357		assert!(result.is_clone(&target.consume()));
2358	}
2359
2360	#[tokio::test]
2361	async fn test_announced_broadcast_skips_nested_paths() {
2362		tokio::time::pause();
2363
2364		let origin = Origin::random().produce();
2365		let nested = Broadcast::new().produce();
2366		let exact = Broadcast::new().produce();
2367
2368		let consumer = origin.consume();
2369
2370		let wait = tokio::spawn({
2371			let consumer = consumer.clone();
2372			async move { consumer.announced_broadcast("foo").await }
2373		});
2374
2375		tokio::task::yield_now().await;
2376
2377		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
2378		origin.publish_broadcast("foo/bar", nested.consume());
2379		tokio::task::yield_now().await;
2380		assert!(!wait.is_finished(), "must not resolve on a nested path");
2381
2382		origin.publish_broadcast("foo", exact.consume());
2383		let result = wait.await.unwrap().expect("should find foo exactly");
2384		assert!(result.is_clone(&exact.consume()));
2385	}
2386
2387	#[tokio::test]
2388	async fn test_announced_broadcast_disallowed() {
2389		let origin = Origin::random().produce();
2390		let limited = origin
2391			.consume()
2392			.scope(&["allowed".into()])
2393			.expect("should create limited");
2394
2395		// Path is outside allowed prefixes — should return None immediately.
2396		assert!(limited.announced_broadcast("notallowed").await.is_none());
2397	}
2398
2399	#[tokio::test]
2400	async fn test_announced_broadcast_scope_too_narrow() {
2401		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
2402		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
2403		let origin = Origin::random().produce();
2404		let limited = origin
2405			.consume()
2406			.scope(&["foo/specific".into()])
2407			.expect("should create limited");
2408
2409		// now_or_never so we fail fast instead of hanging if the guard regresses.
2410		let result = limited
2411			.announced_broadcast("foo")
2412			.now_or_never()
2413			.expect("must not block");
2414		assert!(result.is_none());
2415	}
2416
2417	// Coalescing tests: a slow consumer that doesn't drain between updates
2418	// should observe a bounded number of deliveries.
2419
2420	#[tokio::test]
2421	async fn test_coalesce_announce_then_unannounce() {
2422		// announce + unannounce that the consumer hasn't observed yet collapses to nothing.
2423		tokio::time::pause();
2424
2425		let origin = Origin::random().produce();
2426		let mut consumer = origin.consume();
2427
2428		let broadcast = Broadcast::new().produce();
2429		origin.publish_broadcast("test", broadcast.consume());
2430		drop(broadcast);
2431
2432		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2433
2434		consumer.assert_next_wait();
2435	}
2436
2437	#[tokio::test]
2438	async fn test_coalesce_announce_unannounce_announce() {
2439		// announce, unannounce, announce that the consumer hasn't drained collapses
2440		// to a single Announce of the latest broadcast.
2441		tokio::time::pause();
2442
2443		let origin = Origin::random().produce();
2444		let mut consumer = origin.consume();
2445
2446		let broadcast1 = Broadcast::new().produce();
2447		let broadcast2 = Broadcast::new().produce();
2448
2449		origin.publish_broadcast("test", broadcast1.consume());
2450		drop(broadcast1);
2451		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2452		origin.publish_broadcast("test", broadcast2.consume());
2453
2454		consumer.assert_next("test", &broadcast2.consume());
2455		consumer.assert_next_wait();
2456	}
2457
2458	#[tokio::test]
2459	async fn test_coalesce_unannounce_announce_preserved() {
2460		// unannounce followed by announce of a different broadcast must be preserved
2461		// as two deliveries so the consumer learns the origin changed.
2462		tokio::time::pause();
2463
2464		let origin = Origin::random().produce();
2465		let broadcast1 = Broadcast::new().produce();
2466		origin.publish_broadcast("test", broadcast1.consume());
2467
2468		let mut consumer = origin.consume();
2469		consumer.assert_next("test", &broadcast1.consume());
2470
2471		// Drop, then publish a fresh broadcast at the same path.
2472		drop(broadcast1);
2473		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2474
2475		let broadcast2 = Broadcast::new().produce();
2476		origin.publish_broadcast("test", broadcast2.consume());
2477
2478		// The consumer must see the unannounce before the new announce.
2479		consumer.assert_next_none("test");
2480		consumer.assert_next("test", &broadcast2.consume());
2481		consumer.assert_next_wait();
2482	}
2483
2484	#[tokio::test]
2485	async fn test_coalesce_unannounce_announce_unannounce() {
2486		// unannounce + announce + unannounce collapses to a single unannounce: the
2487		// embedded announce was never observed.
2488		tokio::time::pause();
2489
2490		let origin = Origin::random().produce();
2491		let broadcast1 = Broadcast::new().produce();
2492		origin.publish_broadcast("test", broadcast1.consume());
2493
2494		let mut consumer = origin.consume();
2495		consumer.assert_next("test", &broadcast1.consume());
2496
2497		drop(broadcast1);
2498		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2499
2500		let broadcast2 = Broadcast::new().produce();
2501		origin.publish_broadcast("test", broadcast2.consume());
2502		drop(broadcast2);
2503		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2504
2505		consumer.assert_next_none("test");
2506		consumer.assert_next_wait();
2507	}
2508
2509	#[tokio::test]
2510	async fn test_coalesce_churn_bounded() {
2511		// A churn loop on a single path should keep the pending set bounded.
2512		// Backup promotion during cleanup can leave the consumer with zero or one
2513		// pending update for "test" depending on the order tasks run; we only
2514		// require that churn doesn't accumulate across iterations.
2515		tokio::time::pause();
2516
2517		let origin = Origin::random().produce();
2518		let mut consumer = origin.consume();
2519
2520		for _ in 0..1000 {
2521			let broadcast = Broadcast::new().produce();
2522			origin.publish_broadcast("test", broadcast.consume());
2523			drop(broadcast);
2524		}
2525		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
2526
2527		let mut collected = Vec::new();
2528		while let Some(update) = consumer.try_announced() {
2529			collected.push(update);
2530		}
2531		assert!(
2532			collected.len() <= 1,
2533			"expected at most one pending update, got {}",
2534			collected.len()
2535		);
2536		assert!(
2537			collected.iter().all(|(path, _)| path == &Path::new("test")),
2538			"unexpected path in pending updates",
2539		);
2540	}
2541
2542	// With no OriginDynamic handler, an unannounced path resolves to Unroutable.
2543	#[tokio::test]
2544	async fn dynamic_request_unroutable_without_handler() {
2545		let origin = Origin::random().produce();
2546		let consumer = origin.consume();
2547		assert!(matches!(
2548			consumer.request_broadcast("missing").await,
2549			Err(Error::Unroutable)
2550		));
2551	}
2552
2553	// A dynamically served broadcast resolves the requester, but is never announced.
2554	#[tokio::test(start_paused = true)]
2555	async fn dynamic_request_served_not_announced() {
2556		let origin = Origin::random().produce();
2557		let mut dynamic = origin.dynamic();
2558		let consumer = origin.consume();
2559
2560		// A separate announce cursor must never observe the dynamic broadcast.
2561		let mut announced = origin.consume();
2562		announced.assert_next_wait();
2563
2564		// Request a path that nobody announced; the future stays pending until served.
2565		// Registration happens up front, so the handler sees the request immediately.
2566		let request_fut = consumer.request_broadcast("fallback");
2567
2568		// The handler serves it with a live broadcast it keeps producing into.
2569		let served = Broadcast::new().produce();
2570
2571		let request = dynamic.requested_broadcast().await.unwrap();
2572		assert_eq!(request.path(), &Path::new("fallback"));
2573		request.accept(served.consume());
2574
2575		let broadcast = request_fut.await.unwrap();
2576		assert!(broadcast.is_clone(&served.consume()));
2577
2578		// Still nothing announced.
2579		announced.assert_next_wait();
2580	}
2581
2582	// Concurrent requests for the same queued path coalesce onto one handler request.
2583	#[tokio::test(start_paused = true)]
2584	async fn dynamic_request_coalesces() {
2585		let origin = Origin::random().produce();
2586		let mut dynamic = origin.dynamic();
2587		let consumer = origin.consume();
2588
2589		// Both register before the handler drains either.
2590		let f1 = consumer.request_broadcast("dup");
2591		let f2 = consumer.request_broadcast("dup");
2592
2593		// Exactly one request reaches the handler.
2594		let request = dynamic.requested_broadcast().await.unwrap();
2595		assert_eq!(request.path(), &Path::new("dup"));
2596		assert!(
2597			dynamic.requested_broadcast().now_or_never().is_none(),
2598			"a coalesced request must not be served twice"
2599		);
2600
2601		// Accepting resolves both awaiting requesters with the same broadcast.
2602		let served = Broadcast::new().produce();
2603		request.accept(served.consume());
2604		assert!(f1.await.unwrap().is_clone(&served.consume()));
2605		assert!(f2.await.unwrap().is_clone(&served.consume()));
2606	}
2607
2608	// Rejecting a request resolves the requester with the error.
2609	#[tokio::test(start_paused = true)]
2610	async fn dynamic_request_rejected() {
2611		let origin = Origin::random().produce();
2612		let mut dynamic = origin.dynamic();
2613		let consumer = origin.consume();
2614
2615		let request_fut = consumer.request_broadcast("fallback");
2616
2617		let request = dynamic.requested_broadcast().await.unwrap();
2618		request.reject(Error::Cancel);
2619
2620		assert!(matches!(request_fut.await, Err(Error::Cancel)));
2621	}
2622
2623	// Dropping the last handler resolves queued requests with an error and reverts to
2624	// resolving Unroutable.
2625	#[tokio::test(start_paused = true)]
2626	async fn dynamic_request_handler_dropped() {
2627		let origin = Origin::random().produce();
2628		let dynamic = origin.dynamic();
2629		let consumer = origin.consume();
2630
2631		let request_fut = consumer.request_broadcast("fallback");
2632		drop(dynamic);
2633		assert!(matches!(request_fut.await, Err(Error::Unroutable)));
2634
2635		// With no handler left, a fresh request resolves Unroutable.
2636		assert!(matches!(
2637			consumer.request_broadcast("again").await,
2638			Err(Error::Unroutable)
2639		));
2640	}
2641
2642	// `accept` is decoupled from the dynamic count: once a handler has picked a request up,
2643	// it can still serve it even if every handler (including itself) drops first, flipping the
2644	// count to zero. The in-flight request must not be rejected as `Unroutable`.
2645	#[tokio::test(start_paused = true)]
2646	async fn dynamic_request_accept_after_handler_dropped() {
2647		let origin = Origin::random().produce();
2648		let mut dynamic = origin.dynamic();
2649		let consumer = origin.consume();
2650
2651		let request_fut = consumer.request_broadcast("fallback");
2652
2653		// The handler picks the request up, then every handler drops (count -> 0).
2654		let request = dynamic.requested_broadcast().await.unwrap();
2655		drop(dynamic);
2656
2657		// Accept still resolves the awaiting requester with the served broadcast.
2658		let served = Broadcast::new().produce();
2659		request.accept(served.consume());
2660		assert!(request_fut.await.unwrap().is_clone(&served.consume()));
2661	}
2662
2663	// A live announcement wins over the dynamic fallback; no request is queued.
2664	#[tokio::test(start_paused = true)]
2665	async fn dynamic_request_prefers_announced() {
2666		let origin = Origin::random().produce();
2667		let mut dynamic = origin.dynamic();
2668		let consumer = origin.consume();
2669
2670		let broadcast = Broadcast::new().produce();
2671		assert!(origin.publish_broadcast("live", broadcast.consume()));
2672
2673		let got = consumer.request_broadcast("live").await.unwrap();
2674		assert!(
2675			got.is_clone(&broadcast.consume()),
2676			"should return the announced broadcast"
2677		);
2678		assert!(
2679			dynamic.requested_broadcast().now_or_never().is_none(),
2680			"an announced path must not queue a fallback request"
2681		);
2682	}
2683
2684	// Cloning a handler and dropping the clone must not flip the count to zero.
2685	#[tokio::test(start_paused = true)]
2686	async fn dynamic_clone_keeps_alive() {
2687		let origin = Origin::random().produce();
2688		let dynamic = origin.dynamic();
2689		let consumer = origin.consume();
2690
2691		drop(dynamic.clone());
2692
2693		// The original handle is still live, so the request registers (stays pending)
2694		// instead of resolving Unroutable.
2695		let request_fut = consumer.request_broadcast("fallback");
2696		assert!(
2697			request_fut.now_or_never().is_none(),
2698			"request should stay pending until served"
2699		);
2700	}
2701}