Skip to main content

moq_net/model/
origin.rs

1use std::{
2	collections::{HashMap, VecDeque},
3	fmt,
4	sync::atomic::{AtomicU64, Ordering},
5};
6
7use rand::Rng;
8use tokio::sync::mpsc;
9use web_async::Lock;
10
11use super::BroadcastConsumer;
12use crate::{
13	AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
14	coding::{Decode, DecodeError, Encode, EncodeError},
15};
16
17/// A relay origin, identified by a 62-bit varint on the wire.
18///
19/// `id` must be non-zero for a real origin; `id == 0` is reserved as a
20/// placeholder for Lite03-style hops where the actual value isn't carried.
21/// Encoding a value outside the 62-bit range (>= 2^62) will fail at the
22/// varint layer; [`Origin::random`] picks a valid random nonzero id.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct Origin {
26	/// Non-zero 62-bit identifier. Encoded as a QUIC varint on the wire.
27	pub id: u64,
28}
29
30impl Origin {
31	/// Placeholder for hop entries whose actual id is not on the wire (Lite03).
32	/// Never encoded for Lite04+: violates the non-zero invariant and would fail to round-trip.
33	pub(crate) const UNKNOWN: Self = Self { id: 0 };
34
35	/// Generate a fresh origin with a random non-zero id. Use this for any
36	/// origin that does not need a stable identity across restarts.
37	///
38	/// TEMPORARY: the wire format allows 62 bits, but older `@moq/lite` JS
39	/// clients decode `AnnounceInterest.exclude_hop` as a u53 (number) and
40	/// throw on anything > 2^53-1. To keep those clients alive against
41	/// fresh relays, we cap the random id at 53 bits. Restore to 62 bits
42	/// once the JS u62 fix has propagated to deployed bundles.
43	pub fn random() -> Self {
44		let mut rng = rand::rng();
45		let id = rng.random_range(1..(1u64 << 53));
46		Self { id }
47	}
48
49	/// Consume this [Origin] to create a producer that carries its id.
50	pub fn produce(self) -> OriginProducer {
51		OriginProducer::new(self)
52	}
53}
54
55impl From<u64> for Origin {
56	fn from(id: u64) -> Self {
57		Self { id }
58	}
59}
60
61impl fmt::Display for Origin {
62	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63		self.id.fmt(f)
64	}
65}
66
67impl<V: Copy> Encode<V> for Origin
68where
69	u64: Encode<V>,
70{
71	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
72		self.id.encode(w, version)
73	}
74}
75
76impl<V: Copy> Decode<V> for Origin
77where
78	u64: Decode<V>,
79{
80	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
81		let id = u64::decode(r, version)?;
82		if id >= 1u64 << 62 {
83			return Err(DecodeError::InvalidValue);
84		}
85		Ok(Self { id })
86	}
87}
88
89/// Maximum number of origins (hops) an [`OriginList`] can hold.
90///
91/// Caps pathological or loop-induced announcements at a reasonable cluster
92/// diameter; appending past this limit returns [`TooManyOrigins`] rather than
93/// silently truncating.
94pub(crate) const MAX_HOPS: usize = 32;
95
96/// Bounded list of [`Origin`] entries, typically the hop chain of a broadcast.
97///
98/// Guarantees `len() <= MAX_HOPS`. Construct via [`OriginList::new`] +
99/// [`OriginList::push`], or fall back to the fallible [`TryFrom<Vec<Origin>>`].
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct OriginList(Vec<Origin>);
103
104/// Returned when an operation would grow an [`OriginList`] past its hop-count cap.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[non_exhaustive]
107pub struct TooManyOrigins;
108
109impl fmt::Display for TooManyOrigins {
110	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111		write!(f, "too many origins (max {MAX_HOPS})")
112	}
113}
114
115impl std::error::Error for TooManyOrigins {}
116
117impl From<TooManyOrigins> for DecodeError {
118	fn from(_: TooManyOrigins) -> Self {
119		DecodeError::BoundsExceeded
120	}
121}
122
123impl OriginList {
124	/// Create an empty list.
125	pub fn new() -> Self {
126		Self(Vec::new())
127	}
128
129	/// Append an [`Origin`]. Returns [`TooManyOrigins`] if the list is full.
130	pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
131		if self.0.len() >= MAX_HOPS {
132			return Err(TooManyOrigins);
133		}
134		self.0.push(origin);
135		Ok(())
136	}
137
138	/// Returns true if any entry matches `origin`.
139	pub fn contains(&self, origin: &Origin) -> bool {
140		self.0.contains(origin)
141	}
142
143	/// Number of entries currently in the list (always `<= MAX_HOPS`).
144	pub fn len(&self) -> usize {
145		self.0.len()
146	}
147
148	/// Whether the list contains no entries.
149	pub fn is_empty(&self) -> bool {
150		self.0.is_empty()
151	}
152
153	/// Iterate over the entries in hop order (oldest first).
154	pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
155		self.0.iter()
156	}
157
158	/// Borrow the entries as a slice.
159	pub fn as_slice(&self) -> &[Origin] {
160		&self.0
161	}
162}
163
164impl TryFrom<Vec<Origin>> for OriginList {
165	type Error = TooManyOrigins;
166
167	fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
168		if v.len() > MAX_HOPS {
169			return Err(TooManyOrigins);
170		}
171		Ok(Self(v))
172	}
173}
174
175impl<'a> IntoIterator for &'a OriginList {
176	type Item = &'a Origin;
177	type IntoIter = std::slice::Iter<'a, Origin>;
178
179	fn into_iter(self) -> Self::IntoIter {
180		self.iter()
181	}
182}
183
184impl<V: Copy> Encode<V> for OriginList
185where
186	u64: Encode<V>,
187	Origin: Encode<V>,
188{
189	fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
190		(self.0.len() as u64).encode(w, version)?;
191		for origin in &self.0 {
192			origin.encode(w, version)?;
193		}
194		Ok(())
195	}
196}
197
198impl<V: Copy> Decode<V> for OriginList
199where
200	u64: Decode<V>,
201	Origin: Decode<V>,
202{
203	fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
204		let count = u64::decode(r, version)? as usize;
205		if count > MAX_HOPS {
206			return Err(DecodeError::BoundsExceeded);
207		}
208		let mut list = Vec::with_capacity(count);
209		for _ in 0..count {
210			list.push(Origin::decode(r, version)?);
211		}
212		Ok(Self(list))
213	}
214}
215
216static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
219struct ConsumerId(u64);
220
221impl ConsumerId {
222	fn new() -> Self {
223		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
224	}
225}
226
227// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
228struct OriginBroadcast {
229	path: PathOwned,
230	active: BroadcastConsumer,
231	backup: VecDeque<BroadcastConsumer>,
232}
233
234#[derive(Clone)]
235struct OriginConsumerNotify {
236	root: PathOwned,
237	tx: mpsc::UnboundedSender<OriginAnnounce>,
238}
239
240impl OriginConsumerNotify {
241	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
242		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
243		self.tx.send((path, Some(broadcast))).expect("consumer closed");
244	}
245
246	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
247		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
248		self.tx.send((path.clone(), None)).expect("consumer closed");
249		self.tx.send((path, Some(broadcast))).expect("consumer closed");
250	}
251
252	fn unannounce(&self, path: impl AsPath) {
253		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
254		self.tx.send((path, None)).expect("consumer closed");
255	}
256}
257
258struct NotifyNode {
259	parent: Option<Lock<NotifyNode>>,
260
261	// Consumers that are subscribed to this node.
262	// We store a consumer ID so we can remove it easily when it closes.
263	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
264}
265
266impl NotifyNode {
267	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
268		Self {
269			parent,
270			consumers: HashMap::new(),
271		}
272	}
273
274	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
275		for consumer in self.consumers.values() {
276			consumer.announce(path.as_path(), broadcast.clone());
277		}
278
279		if let Some(parent) = &self.parent {
280			parent.lock().announce(path, broadcast);
281		}
282	}
283
284	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
285		for consumer in self.consumers.values() {
286			consumer.reannounce(path.as_path(), broadcast.clone());
287		}
288
289		if let Some(parent) = &self.parent {
290			parent.lock().reannounce(path, broadcast);
291		}
292	}
293
294	fn unannounce(&mut self, path: impl AsPath) {
295		for consumer in self.consumers.values() {
296			consumer.unannounce(path.as_path());
297		}
298
299		if let Some(parent) = &self.parent {
300			parent.lock().unannounce(path);
301		}
302	}
303}
304
305struct OriginNode {
306	// The broadcast that is published to this node.
307	broadcast: Option<OriginBroadcast>,
308
309	// Nested nodes, one level down the tree.
310	nested: HashMap<String, Lock<OriginNode>>,
311
312	// Unfortunately, to notify consumers we need to traverse back up the tree.
313	notify: Lock<NotifyNode>,
314}
315
316impl OriginNode {
317	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
318		Self {
319			broadcast: None,
320			nested: HashMap::new(),
321			notify: Lock::new(NotifyNode::new(parent)),
322		}
323	}
324
325	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
326		let (dir, rest) = path.next_part().expect("leaf called with empty path");
327
328		let next = self.entry(dir);
329		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
330	}
331
332	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
333		match self.nested.get(dir) {
334			Some(next) => next.clone(),
335			None => {
336				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
337				self.nested.insert(dir.to_string(), next.clone());
338				next
339			}
340		}
341	}
342
343	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
344		let full = full.as_path();
345		let rest = relative.as_path();
346
347		// If the path has a directory component, then publish it to the nested node.
348		if let Some((dir, relative)) = rest.next_part() {
349			// Not using entry to avoid allocating a string most of the time.
350			self.entry(dir).lock().publish(&full, broadcast, &relative);
351		} else if let Some(existing) = &mut self.broadcast {
352			// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
353			// on ties, the newer broadcast wins, since the previous one may be about to close.
354			//
355			// Drop duplicates (same underlying broadcast delivered via multiple links) so the
356			// backup queue can't accumulate clones of the active entry and trigger redundant
357			// reannouncements when a peer churns.
358			if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
359				return;
360			}
361
362			if broadcast.hops.len() <= existing.active.hops.len() {
363				let old = existing.active.clone();
364				existing.active = broadcast.clone();
365				existing.backup.push_back(old);
366
367				self.notify.lock().reannounce(full, broadcast);
368			} else {
369				// Longer path: keep as a backup in case the active one drops.
370				existing.backup.push_back(broadcast.clone());
371			}
372		} else {
373			// This node is a leaf with no existing broadcast.
374			self.broadcast = Some(OriginBroadcast {
375				path: full.to_owned(),
376				active: broadcast.clone(),
377				backup: VecDeque::new(),
378			});
379			self.notify.lock().announce(full, broadcast);
380		}
381	}
382
383	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
384		self.consume_initial(&mut notify);
385		self.notify.lock().consumers.insert(id, notify);
386	}
387
388	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
389		if let Some(broadcast) = &self.broadcast {
390			notify.announce(&broadcast.path, broadcast.active.clone());
391		}
392
393		// Recursively subscribe to all nested nodes.
394		for nested in self.nested.values() {
395			nested.lock().consume_initial(notify);
396		}
397	}
398
399	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
400		let rest = rest.as_path();
401
402		if let Some((dir, rest)) = rest.next_part() {
403			let node = self.nested.get(dir)?.lock();
404			node.consume_broadcast(&rest)
405		} else {
406			self.broadcast.as_ref().map(|b| b.active.clone())
407		}
408	}
409
410	fn unconsume(&mut self, id: ConsumerId) {
411		self.notify.lock().consumers.remove(&id).expect("consumer not found");
412		if self.is_empty() {
413			//tracing::warn!("TODO: empty node; memory leak");
414			// This happens when consuming a path that is not being broadcasted.
415		}
416	}
417
418	// Returns true if the broadcast should be unannounced.
419	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
420		let full = full.as_path();
421		let relative = relative.as_path();
422
423		if let Some((dir, relative)) = relative.next_part() {
424			let nested = self.entry(dir);
425			let mut locked = nested.lock();
426			locked.remove(&full, broadcast, &relative);
427
428			if locked.is_empty() {
429				drop(locked);
430				self.nested.remove(dir);
431			}
432		} else {
433			let entry = match &mut self.broadcast {
434				Some(existing) => existing,
435				None => return,
436			};
437
438			// See if we can remove the broadcast from the backup list.
439			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
440			if let Some(pos) = pos {
441				entry.backup.remove(pos);
442				// Nothing else to do
443				return;
444			}
445
446			// Okay so it must be the active broadcast or else we fucked up.
447			assert!(entry.active.is_clone(&broadcast));
448
449			// Promote the backup with the shortest hop chain so we keep preferring short paths.
450			// Ties break toward the oldest (FIFO) since min_by_key returns the first minimum.
451			let best = entry
452				.backup
453				.iter()
454				.enumerate()
455				.min_by_key(|(_, b)| b.hops.len())
456				.map(|(i, _)| i);
457			if let Some(idx) = best {
458				let active = entry.backup.remove(idx).expect("index in range");
459				entry.active = active;
460				self.notify.lock().reannounce(full, &entry.active);
461			} else {
462				// No more backups, so remove the entry.
463				self.broadcast = None;
464				self.notify.lock().unannounce(full);
465			}
466		}
467	}
468
469	fn is_empty(&self) -> bool {
470		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
471	}
472}
473
474#[derive(Clone)]
475struct OriginNodes {
476	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
477}
478
479impl OriginNodes {
480	// Returns nested roots that match the prefixes.
481	// PathPrefixes guarantees no duplicates or overlapping prefixes.
482	pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
483		let mut roots = Vec::new();
484
485		for (root, state) in &self.nodes {
486			for prefix in prefixes {
487				if root.has_prefix(prefix) {
488					// Keep the existing node if we're allowed to access it.
489					roots.push((root.to_owned(), state.clone()));
490					continue;
491				}
492
493				if let Some(suffix) = prefix.strip_prefix(root) {
494					// If the requested prefix is larger than the allowed prefix, then we further scope it.
495					let nested = state.lock().leaf(&suffix);
496					roots.push((prefix.to_owned(), nested));
497				}
498			}
499		}
500
501		if roots.is_empty() {
502			None
503		} else {
504			Some(Self { nodes: roots })
505		}
506	}
507
508	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
509		let new_root = new_root.as_path();
510		let mut roots = Vec::new();
511
512		if new_root.is_empty() {
513			return Some(self.clone());
514		}
515
516		for (root, state) in &self.nodes {
517			if let Some(suffix) = root.strip_prefix(&new_root) {
518				// If the old root is longer than the new root, shorten the keys.
519				roots.push((suffix.to_owned(), state.clone()));
520			} else if let Some(suffix) = new_root.strip_prefix(root) {
521				// If the new root is longer than the old root, add a new root.
522				// NOTE: suffix can't be empty
523				let nested = state.lock().leaf(&suffix);
524				roots.push(("".into(), nested));
525			}
526		}
527
528		if roots.is_empty() {
529			None
530		} else {
531			Some(Self { nodes: roots })
532		}
533	}
534
535	// Returns the root that has this prefix.
536	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
537		let path = path.as_path();
538
539		for (root, state) in &self.nodes {
540			if let Some(suffix) = path.strip_prefix(root) {
541				return Some((state.clone(), suffix.to_owned()));
542			}
543		}
544
545		None
546	}
547}
548
549impl Default for OriginNodes {
550	fn default() -> Self {
551		Self {
552			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
553		}
554	}
555}
556
557/// A broadcast path and its associated consumer, or None if closed.
558pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
559
560/// Announces broadcasts to consumers over the network.
561#[derive(Clone)]
562pub struct OriginProducer {
563	// Identity for this origin. Appended to broadcast hops when
564	// re-announcing so downstream relays can detect loops and prefer the
565	// shortest path.
566	info: Origin,
567
568	// The roots of the tree that we are allowed to publish.
569	// A path of "" means we can publish anything.
570	nodes: OriginNodes,
571
572	// The prefix that is automatically stripped from all paths.
573	root: PathOwned,
574}
575
576impl std::ops::Deref for OriginProducer {
577	type Target = Origin;
578
579	fn deref(&self) -> &Self::Target {
580		&self.info
581	}
582}
583
584impl OriginProducer {
585	/// Build a producer for the given origin id with no scoped prefix and no
586	/// pre-existing broadcasts. Prefer [`Origin::produce`].
587	pub fn new(info: Origin) -> Self {
588		Self {
589			info,
590			nodes: OriginNodes::default(),
591			root: PathOwned::default(),
592		}
593	}
594
595	/// Create and publish a new broadcast, returning the producer.
596	///
597	/// This is a helper method when you only want to publish a broadcast to a single origin.
598	/// Returns [None] if the broadcast is not allowed to be published.
599	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
600		let broadcast = Broadcast::new().produce();
601		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
602	}
603
604	/// Publish a broadcast, announcing it to all consumers.
605	///
606	/// The broadcast will be unannounced when it is closed.
607	/// If there is already a broadcast with the same path, the new one replaces the active only
608	/// if its hop path is shorter or equal; otherwise it is queued as a backup.
609	/// When the active broadcast closes, the backup with the shortest hop path is promoted and
610	/// reannounced. Backups that close before being promoted are silently dropped.
611	///
612	/// Returns false if the broadcast is not allowed to be published.
613	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
614		let path = path.as_path();
615
616		// Loop detection: refuse broadcasts whose hop chain already contains our id.
617		if broadcast.hops.contains(&self.info) {
618			return false;
619		}
620
621		let (root, rest) = match self.nodes.get(&path) {
622			Some(root) => root,
623			None => return false,
624		};
625
626		let full = self.root.join(&path);
627
628		root.lock().publish(&full, &broadcast, &rest);
629		let root = root.clone();
630
631		web_async::spawn(async move {
632			broadcast.closed().await;
633			root.lock().remove(&full, broadcast, &rest);
634		});
635
636		true
637	}
638
639	/// Returns a new OriginProducer restricted to publishing under one of `prefixes`.
640	///
641	/// Returns None if there are no legal prefixes (the requested prefixes are
642	/// disjoint from this producer's current scope).
643	// TODO accept PathPrefixes instead of &[Path]
644	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
645		let prefixes = PathPrefixes::new(prefixes);
646		Some(OriginProducer {
647			info: self.info,
648			nodes: self.nodes.select(&prefixes)?,
649			root: self.root.clone(),
650		})
651	}
652
653	/// Subscribe to all announced broadcasts.
654	pub fn consume(&self) -> OriginConsumer {
655		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
656	}
657
658	/// Get a broadcast by path if it has *already* been published.
659	///
660	/// Equivalent to `self.consume().get_broadcast(path)` but skips the
661	/// announcement-cursor allocation, which is currently relatively expensive.
662	#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
663	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
664		let path = path.as_path();
665		let (root, rest) = self.nodes.get(&path)?;
666		let state = root.lock();
667		state.consume_broadcast(&rest)
668	}
669
670	/// Returns a new OriginProducer that automatically strips out the provided prefix.
671	///
672	/// Returns None if the provided root is not authorized; when [`Self::scope`]
673	/// was already used without a wildcard.
674	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
675		let prefix = prefix.as_path();
676
677		Some(Self {
678			info: self.info,
679			root: self.root.join(&prefix).to_owned(),
680			nodes: self.nodes.root(&prefix)?,
681		})
682	}
683
684	/// Returns the root that is automatically stripped from all paths.
685	pub fn root(&self) -> &Path<'_> {
686		&self.root
687	}
688
689	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
690	// TODO return PathPrefixes
691	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
692		self.nodes.nodes.iter().map(|(root, _)| root)
693	}
694
695	/// Converts a relative path to an absolute path.
696	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
697		self.root.join(path)
698	}
699}
700
701/// Consumes announced broadcasts matching against an optional prefix.
702///
703/// NOTE: Clone is expensive, try to avoid it.
704pub struct OriginConsumer {
705	id: ConsumerId,
706	// Identity of the origin this consumer was derived from.
707	info: Origin,
708	nodes: OriginNodes,
709	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
710
711	// A prefix that is automatically stripped from all paths.
712	root: PathOwned,
713}
714
715impl std::ops::Deref for OriginConsumer {
716	type Target = Origin;
717
718	fn deref(&self) -> &Self::Target {
719		&self.info
720	}
721}
722
723impl OriginConsumer {
724	fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
725		let (tx, rx) = mpsc::unbounded_channel();
726
727		let id = ConsumerId::new();
728
729		for (_, state) in &nodes.nodes {
730			let notify = OriginConsumerNotify {
731				root: root.clone(),
732				tx: tx.clone(),
733			};
734			state.lock().consume(id, notify);
735		}
736
737		Self {
738			id,
739			info,
740			nodes,
741			updates: rx,
742			root,
743		}
744	}
745
746	/// Returns the next (un)announced broadcast and the absolute path.
747	///
748	/// The broadcast will only be announced if it was previously unannounced.
749	/// The same path won't be announced/unannounced twice, instead it will toggle.
750	/// Returns None if the consumer is closed.
751	///
752	/// Note: The returned path is absolute and will always match this consumer's prefix.
753	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
754		self.updates.recv().await
755	}
756
757	/// Returns the next (un)announced broadcast and the absolute path without blocking.
758	///
759	/// Returns None if there is no update available; NOT because the consumer is closed.
760	/// You have to use `is_closed` to check if the consumer is closed.
761	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
762		self.updates.try_recv().ok()
763	}
764
765	/// Create another consumer with its own announcement cursor over the same origin.
766	pub fn consume(&self) -> Self {
767		self.clone()
768	}
769
770	/// Get a broadcast by path if it has *already* been announced.
771	///
772	/// Returns `None` when the path is unknown to this consumer right now. Synchronous
773	/// lookup races announcement gossip — a freshly-connected consumer will see `None`
774	/// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`]
775	/// (blocks until announced) unless you can guarantee the announcement has already
776	/// landed (e.g. you're responding to an `announced()` callback).
777	pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
778		let path = path.as_path();
779		let (root, rest) = self.nodes.get(&path)?;
780		let state = root.lock();
781		state.consume_broadcast(&rest)
782	}
783
784	/// Block until a broadcast with the given path is announced and return it.
785	///
786	/// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer
787	/// is closed before the broadcast is announced. The returned broadcast may itself be closed
788	/// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that.
789	///
790	/// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but
791	/// cannot guarantee the announcement has already been received.
792	pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
793		let path = path.as_path();
794
795		// Scope a fresh consumer down to this path so we only wake up for relevant announcements.
796		let mut consumer = self.scope(std::slice::from_ref(&path))?;
797
798		// `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited
799		// to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no
800		// announcement at the exact path `foo` can ever arrive. Bail rather than loop forever.
801		if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
802			return None;
803		}
804
805		loop {
806			let (announced, broadcast) = consumer.announced().await?;
807			// `scope` narrows by prefix, but we only want an exact-path match.
808			if announced.as_path() == path {
809				if let Some(broadcast) = broadcast {
810					return Some(broadcast);
811				}
812			}
813		}
814	}
815
816	/// Returns a new OriginConsumer restricted to broadcasts under one of `prefixes`.
817	///
818	/// Returns None if there are no legal prefixes (the requested prefixes are
819	/// disjoint from this consumer's current scope, so it would always return None).
820	// TODO accept PathPrefixes instead of &[Path]
821	pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
822		let prefixes = PathPrefixes::new(prefixes);
823		Some(OriginConsumer::new(
824			self.info,
825			self.root.clone(),
826			self.nodes.select(&prefixes)?,
827		))
828	}
829
830	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
831	///
832	/// Returns None if the provided root is not authorized; when [`Self::scope`] was
833	/// already used without a wildcard.
834	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
835		let prefix = prefix.as_path();
836
837		Some(Self::new(
838			self.info,
839			self.root.join(&prefix).to_owned(),
840			self.nodes.root(&prefix)?,
841		))
842	}
843
844	/// Returns the prefix that is automatically stripped from all paths.
845	pub fn root(&self) -> &Path<'_> {
846		&self.root
847	}
848
849	/// Iterate over the path prefixes this handle is permitted to publish or subscribe under.
850	// TODO return PathPrefixes
851	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
852		self.nodes.nodes.iter().map(|(root, _)| root)
853	}
854
855	/// Converts a relative path to an absolute path.
856	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
857		self.root.join(path)
858	}
859}
860
861impl Drop for OriginConsumer {
862	fn drop(&mut self) {
863		for (_, root) in &self.nodes.nodes {
864			root.lock().unconsume(self.id);
865		}
866	}
867}
868
869impl Clone for OriginConsumer {
870	fn clone(&self) -> Self {
871		OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
872	}
873}
874
875#[cfg(test)]
876use futures::FutureExt;
877
878#[cfg(test)]
879impl OriginConsumer {
880	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
881		let expected = expected.as_path();
882		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
883		assert_eq!(path, expected, "wrong path");
884		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
885	}
886
887	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
888		let expected = expected.as_path();
889		let (path, active) = self.try_announced().expect("no next");
890		assert_eq!(path, expected, "wrong path");
891		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
892	}
893
894	pub fn assert_next_none(&mut self, expected: impl AsPath) {
895		let expected = expected.as_path();
896		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
897		assert_eq!(path, expected, "wrong path");
898		assert!(active.is_none(), "should be unannounced");
899	}
900
901	pub fn assert_next_wait(&mut self) {
902		if let Some(res) = self.announced().now_or_never() {
903			panic!("next should block: got {:?}", res.map(|(path, _)| path));
904		}
905	}
906
907	/*
908	pub fn assert_next_closed(&mut self) {
909		assert!(
910			self.announced().now_or_never().expect("next blocked").is_none(),
911			"next should be closed"
912		);
913	}
914	*/
915}
916
917#[cfg(test)]
918mod tests {
919	use crate::Broadcast;
920
921	use super::*;
922
923	#[test]
924	fn origin_list_push_fails_at_limit() {
925		let mut list = OriginList::new();
926		for _ in 0..MAX_HOPS {
927			list.push(Origin::random()).unwrap();
928		}
929		assert_eq!(list.len(), MAX_HOPS);
930		assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
931	}
932
933	#[test]
934	fn origin_list_try_from_vec_enforces_limit() {
935		let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
936		assert!(OriginList::try_from(under).is_ok());
937
938		let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
939		assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
940	}
941
942	#[tokio::test]
943	async fn test_announce() {
944		tokio::time::pause();
945
946		let origin = Origin::random().produce();
947		let broadcast1 = Broadcast::new().produce();
948		let broadcast2 = Broadcast::new().produce();
949
950		let mut consumer1 = origin.consume();
951		// Make a new consumer that should get it.
952		consumer1.assert_next_wait();
953
954		// Publish the first broadcast.
955		origin.publish_broadcast("test1", broadcast1.consume());
956
957		consumer1.assert_next("test1", &broadcast1.consume());
958		consumer1.assert_next_wait();
959
960		// Make a new consumer that should get the existing broadcast.
961		// But we don't consume it yet.
962		let mut consumer2 = origin.consume();
963
964		// Publish the second broadcast.
965		origin.publish_broadcast("test2", broadcast2.consume());
966
967		consumer1.assert_next("test2", &broadcast2.consume());
968		consumer1.assert_next_wait();
969
970		consumer2.assert_next("test1", &broadcast1.consume());
971		consumer2.assert_next("test2", &broadcast2.consume());
972		consumer2.assert_next_wait();
973
974		// Close the first broadcast.
975		drop(broadcast1);
976
977		// Wait for the async task to run.
978		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
979
980		// All consumers should get a None now.
981		consumer1.assert_next_none("test1");
982		consumer2.assert_next_none("test1");
983		consumer1.assert_next_wait();
984		consumer2.assert_next_wait();
985
986		// And a new consumer only gets the last broadcast.
987		let mut consumer3 = origin.consume();
988		consumer3.assert_next("test2", &broadcast2.consume());
989		consumer3.assert_next_wait();
990
991		// Close the other producer and make sure it cleans up
992		drop(broadcast2);
993
994		// Wait for the async task to run.
995		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
996
997		consumer1.assert_next_none("test2");
998		consumer2.assert_next_none("test2");
999		consumer3.assert_next_none("test2");
1000
1001		/* TODO close the origin consumer when the producer is dropped
1002		consumer1.assert_next_closed();
1003		consumer2.assert_next_closed();
1004		consumer3.assert_next_closed();
1005		*/
1006	}
1007
1008	#[tokio::test]
1009	async fn test_duplicate() {
1010		tokio::time::pause();
1011
1012		let origin = Origin::random().produce();
1013
1014		let broadcast1 = Broadcast::new().produce();
1015		let broadcast2 = Broadcast::new().produce();
1016		let broadcast3 = Broadcast::new().produce();
1017
1018		let consumer1 = broadcast1.consume();
1019		let consumer2 = broadcast2.consume();
1020		let consumer3 = broadcast3.consume();
1021
1022		let mut consumer = origin.consume();
1023
1024		origin.publish_broadcast("test", consumer1.clone());
1025		origin.publish_broadcast("test", consumer2.clone());
1026		origin.publish_broadcast("test", consumer3.clone());
1027		assert!(consumer.get_broadcast("test").is_some());
1028
1029		// On equal hop lengths, each new publish replaces the active and reannounces.
1030		consumer.assert_next("test", &consumer1);
1031		consumer.assert_next_none("test");
1032		consumer.assert_next("test", &consumer2);
1033		consumer.assert_next_none("test");
1034		consumer.assert_next("test", &consumer3);
1035
1036		// Drop a backup, nothing should change.
1037		drop(broadcast2);
1038
1039		// Wait for the async task to run.
1040		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1041
1042		assert!(consumer.get_broadcast("test").is_some());
1043		consumer.assert_next_wait();
1044
1045		// Drop the active, we should reannounce with the remaining backup.
1046		drop(broadcast3);
1047
1048		// Wait for the async task to run.
1049		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1050
1051		assert!(consumer.get_broadcast("test").is_some());
1052		consumer.assert_next_none("test");
1053		consumer.assert_next("test", &consumer1);
1054
1055		// Drop the final broadcast, we should unannounce.
1056		drop(broadcast1);
1057
1058		// Wait for the async task to run.
1059		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1060		assert!(consumer.get_broadcast("test").is_none());
1061
1062		consumer.assert_next_none("test");
1063		consumer.assert_next_wait();
1064	}
1065
1066	#[tokio::test]
1067	async fn test_duplicate_reverse() {
1068		tokio::time::pause();
1069
1070		let origin = Origin::random().produce();
1071		let broadcast1 = Broadcast::new().produce();
1072		let broadcast2 = Broadcast::new().produce();
1073
1074		origin.publish_broadcast("test", broadcast1.consume());
1075		origin.publish_broadcast("test", broadcast2.consume());
1076		assert!(origin.consume().get_broadcast("test").is_some());
1077
1078		// This is harder, dropping the new broadcast first.
1079		drop(broadcast2);
1080
1081		// Wait for the cleanup async task to run.
1082		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1083		assert!(origin.consume().get_broadcast("test").is_some());
1084
1085		drop(broadcast1);
1086
1087		// Wait for the cleanup async task to run.
1088		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1089		assert!(origin.consume().get_broadcast("test").is_none());
1090	}
1091
1092	#[tokio::test]
1093	async fn test_double_publish() {
1094		tokio::time::pause();
1095
1096		let origin = Origin::random().produce();
1097		let broadcast = Broadcast::new().produce();
1098
1099		// Ensure it doesn't crash.
1100		origin.publish_broadcast("test", broadcast.consume());
1101		origin.publish_broadcast("test", broadcast.consume());
1102
1103		assert!(origin.consume().get_broadcast("test").is_some());
1104
1105		drop(broadcast);
1106
1107		// Wait for the async task to run.
1108		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1109		assert!(origin.consume().get_broadcast("test").is_none());
1110	}
1111	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
1112	#[tokio::test]
1113	#[should_panic]
1114	async fn test_128() {
1115		let origin = Origin::random().produce();
1116		let broadcast = Broadcast::new().produce();
1117
1118		let mut consumer = origin.consume();
1119		for i in 0..256 {
1120			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1121		}
1122
1123		for i in 0..256 {
1124			consumer.assert_next(format!("test{i}"), &broadcast.consume());
1125		}
1126	}
1127
1128	#[tokio::test]
1129	async fn test_128_fix() {
1130		let origin = Origin::random().produce();
1131		let broadcast = Broadcast::new().produce();
1132
1133		let mut consumer = origin.consume();
1134		for i in 0..256 {
1135			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
1136		}
1137
1138		for i in 0..256 {
1139			// try_next does not have the same issue because it's synchronous.
1140			consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
1141		}
1142	}
1143
1144	#[tokio::test]
1145	async fn test_with_root_basic() {
1146		let origin = Origin::random().produce();
1147		let broadcast = Broadcast::new().produce();
1148
1149		// Create a producer with root "/foo"
1150		let foo_producer = origin.with_root("foo").expect("should create root");
1151		assert_eq!(foo_producer.root().as_str(), "foo");
1152
1153		let mut consumer = origin.consume();
1154
1155		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
1156		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
1157		// The original consumer should see the full path
1158		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1159
1160		// A consumer created from the rooted producer should see the stripped path
1161		let mut foo_consumer = foo_producer.consume();
1162		foo_consumer.assert_next("bar/baz", &broadcast.consume());
1163	}
1164
1165	#[tokio::test]
1166	async fn test_with_root_nested() {
1167		let origin = Origin::random().produce();
1168		let broadcast = Broadcast::new().produce();
1169
1170		// Create nested roots
1171		let foo_producer = origin.with_root("foo").expect("should create foo root");
1172		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
1173		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
1174
1175		let mut consumer = origin.consume();
1176
1177		// Publishing to "baz" should actually publish to "foo/bar/baz"
1178		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
1179		// The original consumer sees the full path
1180		consumer.assert_next("foo/bar/baz", &broadcast.consume());
1181
1182		// Consumer from foo_bar_producer sees just "baz"
1183		let mut foo_bar_consumer = foo_bar_producer.consume();
1184		foo_bar_consumer.assert_next("baz", &broadcast.consume());
1185	}
1186
1187	#[tokio::test]
1188	async fn test_publish_scope_allows() {
1189		let origin = Origin::random().produce();
1190		let broadcast = Broadcast::new().produce();
1191
1192		// Create a producer that can only publish to "allowed" paths
1193		let limited_producer = origin
1194			.scope(&["allowed/path1".into(), "allowed/path2".into()])
1195			.expect("should create limited producer");
1196
1197		// Should be able to publish to allowed paths
1198		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
1199		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
1200		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
1201
1202		// Should not be able to publish to disallowed paths
1203		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
1204		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
1205		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
1206	}
1207
1208	#[tokio::test]
1209	async fn test_publish_scope_empty() {
1210		let origin = Origin::random().produce();
1211
1212		// Creating a producer with no allowed paths should return None
1213		assert!(origin.scope(&[]).is_none());
1214	}
1215
1216	#[tokio::test]
1217	async fn test_consume_scope_filters() {
1218		let origin = Origin::random().produce();
1219		let broadcast1 = Broadcast::new().produce();
1220		let broadcast2 = Broadcast::new().produce();
1221		let broadcast3 = Broadcast::new().produce();
1222
1223		let mut consumer = origin.consume();
1224
1225		// Publish to different paths
1226		origin.publish_broadcast("allowed", broadcast1.consume());
1227		origin.publish_broadcast("allowed/nested", broadcast2.consume());
1228		origin.publish_broadcast("notallowed", broadcast3.consume());
1229
1230		// Create a consumer that only sees "allowed" paths
1231		let mut limited_consumer = origin
1232			.consume()
1233			.scope(&["allowed".into()])
1234			.expect("should create limited consumer");
1235
1236		// Should only receive broadcasts under "allowed"
1237		limited_consumer.assert_next("allowed", &broadcast1.consume());
1238		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
1239		limited_consumer.assert_next_wait(); // Should not see "notallowed"
1240
1241		// Unscoped consumer should see all
1242		consumer.assert_next("allowed", &broadcast1.consume());
1243		consumer.assert_next("allowed/nested", &broadcast2.consume());
1244		consumer.assert_next("notallowed", &broadcast3.consume());
1245	}
1246
1247	#[tokio::test]
1248	async fn test_consume_scope_multiple_prefixes() {
1249		let origin = Origin::random().produce();
1250		let broadcast1 = Broadcast::new().produce();
1251		let broadcast2 = Broadcast::new().produce();
1252		let broadcast3 = Broadcast::new().produce();
1253
1254		origin.publish_broadcast("foo/test", broadcast1.consume());
1255		origin.publish_broadcast("bar/test", broadcast2.consume());
1256		origin.publish_broadcast("baz/test", broadcast3.consume());
1257
1258		// Consumer that only sees "foo" and "bar" paths
1259		let mut limited_consumer = origin
1260			.consume()
1261			.scope(&["foo".into(), "bar".into()])
1262			.expect("should create limited consumer");
1263
1264		// Order depends on PathPrefixes canonical sort (lexicographic for same length)
1265		limited_consumer.assert_next("bar/test", &broadcast2.consume());
1266		limited_consumer.assert_next("foo/test", &broadcast1.consume());
1267		limited_consumer.assert_next_wait(); // Should not see "baz/test"
1268	}
1269
1270	#[tokio::test]
1271	async fn test_with_root_and_publish_scope() {
1272		let origin = Origin::random().produce();
1273		let broadcast = Broadcast::new().produce();
1274
1275		// User connects to /foo root
1276		let foo_producer = origin.with_root("foo").expect("should create foo root");
1277
1278		// Limit them to publish only to "bar" and "goop/pee" within /foo
1279		let limited_producer = foo_producer
1280			.scope(&["bar".into(), "goop/pee".into()])
1281			.expect("should create limited producer");
1282
1283		let mut consumer = origin.consume();
1284
1285		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
1286		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
1287		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
1288		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
1289		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
1290
1291		// Should not be able to publish outside allowed paths
1292		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
1293		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
1294		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
1295
1296		// Original consumer sees full paths
1297		consumer.assert_next("foo/bar", &broadcast.consume());
1298		consumer.assert_next("foo/bar/nested", &broadcast.consume());
1299		consumer.assert_next("foo/goop/pee", &broadcast.consume());
1300		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
1301	}
1302
1303	#[tokio::test]
1304	async fn test_with_root_and_consume_scope() {
1305		let origin = Origin::random().produce();
1306		let broadcast1 = Broadcast::new().produce();
1307		let broadcast2 = Broadcast::new().produce();
1308		let broadcast3 = Broadcast::new().produce();
1309
1310		// Publish broadcasts
1311		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
1312		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
1313		origin.publish_broadcast("foo/other/test", broadcast3.consume());
1314
1315		// User connects to /foo root
1316		let foo_producer = origin.with_root("foo").expect("should create foo root");
1317
1318		// Create consumer limited to "bar" and "goop/pee" within /foo
1319		let mut limited_consumer = foo_producer
1320			.consume()
1321			.scope(&["bar".into(), "goop/pee".into()])
1322			.expect("should create limited consumer");
1323
1324		// Should only see allowed paths (without foo prefix)
1325		limited_consumer.assert_next("bar/test", &broadcast1.consume());
1326		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
1327		limited_consumer.assert_next_wait(); // Should not see "other/test"
1328	}
1329
1330	#[tokio::test]
1331	async fn test_with_root_unauthorized() {
1332		let origin = Origin::random().produce();
1333
1334		// First limit the producer to specific paths
1335		let limited_producer = origin
1336			.scope(&["allowed".into()])
1337			.expect("should create limited producer");
1338
1339		// Trying to create a root outside allowed paths should fail
1340		assert!(limited_producer.with_root("notallowed").is_none());
1341
1342		// But creating a root within allowed paths should work
1343		let allowed_root = limited_producer
1344			.with_root("allowed")
1345			.expect("should create allowed root");
1346		assert_eq!(allowed_root.root().as_str(), "allowed");
1347	}
1348
1349	#[tokio::test]
1350	async fn test_wildcard_permission() {
1351		let origin = Origin::random().produce();
1352		let broadcast = Broadcast::new().produce();
1353
1354		// Producer with root access (empty string means wildcard)
1355		let root_producer = origin.clone();
1356
1357		// Should be able to publish anywhere
1358		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1359		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1360
1361		// Can create any root
1362		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1363		assert_eq!(foo_producer.root().as_str(), "foo");
1364	}
1365
1366	#[tokio::test]
1367	async fn test_consume_broadcast_with_permissions() {
1368		let origin = Origin::random().produce();
1369		let broadcast1 = Broadcast::new().produce();
1370		let broadcast2 = Broadcast::new().produce();
1371
1372		origin.publish_broadcast("allowed/test", broadcast1.consume());
1373		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1374
1375		// Create limited consumer
1376		let limited_consumer = origin
1377			.consume()
1378			.scope(&["allowed".into()])
1379			.expect("should create limited consumer");
1380
1381		// Should be able to get allowed broadcast
1382		let result = limited_consumer.get_broadcast("allowed/test");
1383		assert!(result.is_some());
1384		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1385
1386		// Should not be able to get disallowed broadcast
1387		assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
1388
1389		// Original consumer can get both
1390		let consumer = origin.consume();
1391		assert!(consumer.get_broadcast("allowed/test").is_some());
1392		assert!(consumer.get_broadcast("notallowed/test").is_some());
1393	}
1394
1395	#[tokio::test]
1396	async fn test_nested_paths_with_permissions() {
1397		let origin = Origin::random().produce();
1398		let broadcast = Broadcast::new().produce();
1399
1400		// Create producer limited to "a/b/c"
1401		let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
1402
1403		// Should be able to publish to exact path and nested paths
1404		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1405		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1406		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1407
1408		// Should not be able to publish to parent or sibling paths
1409		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1410		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1411		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1412	}
1413
1414	#[tokio::test]
1415	async fn test_multiple_consumers_with_different_permissions() {
1416		let origin = Origin::random().produce();
1417		let broadcast1 = Broadcast::new().produce();
1418		let broadcast2 = Broadcast::new().produce();
1419		let broadcast3 = Broadcast::new().produce();
1420
1421		// Publish to different paths
1422		origin.publish_broadcast("foo/test", broadcast1.consume());
1423		origin.publish_broadcast("bar/test", broadcast2.consume());
1424		origin.publish_broadcast("baz/test", broadcast3.consume());
1425
1426		// Create consumers with different permissions
1427		let mut foo_consumer = origin
1428			.consume()
1429			.scope(&["foo".into()])
1430			.expect("should create foo consumer");
1431
1432		let mut bar_consumer = origin
1433			.consume()
1434			.scope(&["bar".into()])
1435			.expect("should create bar consumer");
1436
1437		let mut foobar_consumer = origin
1438			.consume()
1439			.scope(&["foo".into(), "bar".into()])
1440			.expect("should create foobar consumer");
1441
1442		// Each consumer should only see their allowed paths
1443		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1444		foo_consumer.assert_next_wait();
1445
1446		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1447		bar_consumer.assert_next_wait();
1448
1449		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1450		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1451		foobar_consumer.assert_next_wait();
1452	}
1453
1454	#[tokio::test]
1455	async fn test_select_with_empty_prefix() {
1456		let origin = Origin::random().produce();
1457		let broadcast1 = Broadcast::new().produce();
1458		let broadcast2 = Broadcast::new().produce();
1459
1460		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1461		let demo_producer = origin.with_root("demo").expect("should create demo root");
1462		let limited_producer = demo_producer
1463			.scope(&["worm-node".into(), "foobar".into()])
1464			.expect("should create limited producer");
1465
1466		// Publish some broadcasts
1467		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1468		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1469
1470		// scope with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1471		let mut consumer = limited_producer
1472			.consume()
1473			.scope(&["".into()])
1474			.expect("should create consumer with empty prefix");
1475
1476		// Should see both broadcasts (order depends on PathPrefixes sort)
1477		let a1 = consumer.try_announced().expect("expected first announcement");
1478		let a2 = consumer.try_announced().expect("expected second announcement");
1479		consumer.assert_next_wait();
1480
1481		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1482		paths.sort();
1483		assert_eq!(paths, ["foobar/test", "worm-node/test"]);
1484	}
1485
1486	#[tokio::test]
1487	async fn test_select_narrowing_scope() {
1488		let origin = Origin::random().produce();
1489		let broadcast1 = Broadcast::new().produce();
1490		let broadcast2 = Broadcast::new().produce();
1491		let broadcast3 = Broadcast::new().produce();
1492
1493		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1494		let demo_producer = origin.with_root("demo").expect("should create demo root");
1495		let limited_producer = demo_producer
1496			.scope(&["worm-node".into(), "foobar".into()])
1497			.expect("should create limited producer");
1498
1499		// Publish broadcasts at different levels
1500		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1501		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1502		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1503
1504		// Test 1: scope("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1505		let mut worm_consumer = limited_producer
1506			.consume()
1507			.scope(&["worm-node".into()])
1508			.expect("should create worm-node consumer");
1509
1510		// Should see worm-node content with paths stripped to ""
1511		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1512		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1513		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1514
1515		// Test 2: scope("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1516		let mut foo_consumer = limited_producer
1517			.consume()
1518			.scope(&["worm-node/foo".into()])
1519			.expect("should create worm-node/foo consumer");
1520
1521		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1522		foo_consumer.assert_next_wait(); // Should NOT see other content
1523	}
1524
1525	#[tokio::test]
1526	async fn test_select_multiple_roots_with_empty_prefix() {
1527		let origin = Origin::random().produce();
1528		let broadcast1 = Broadcast::new().produce();
1529		let broadcast2 = Broadcast::new().produce();
1530		let broadcast3 = Broadcast::new().produce();
1531
1532		// Producer with multiple allowed roots
1533		let limited_producer = origin
1534			.scope(&["app1".into(), "app2".into(), "shared".into()])
1535			.expect("should create limited producer");
1536
1537		// Publish to each root
1538		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1539		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1540		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1541
1542		// scope with empty prefix should maintain all roots
1543		let mut consumer = limited_producer
1544			.consume()
1545			.scope(&["".into()])
1546			.expect("should create consumer with empty prefix");
1547
1548		// Should see all broadcasts from all roots
1549		consumer.assert_next("app1/data", &broadcast1.consume());
1550		consumer.assert_next("app2/config", &broadcast2.consume());
1551		consumer.assert_next("shared/resource", &broadcast3.consume());
1552		consumer.assert_next_wait();
1553	}
1554
1555	#[tokio::test]
1556	async fn test_publish_scope_with_empty_prefix() {
1557		let origin = Origin::random().produce();
1558		let broadcast = Broadcast::new().produce();
1559
1560		// Producer with specific allowed paths
1561		let limited_producer = origin
1562			.scope(&["services/api".into(), "services/web".into()])
1563			.expect("should create limited producer");
1564
1565		// scope with empty prefix should keep the same restrictions
1566		let same_producer = limited_producer
1567			.scope(&["".into()])
1568			.expect("should create producer with empty prefix");
1569
1570		// Should still have the same publishing restrictions
1571		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1572		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1573		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1574		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1575	}
1576
1577	#[tokio::test]
1578	async fn test_select_narrowing_to_deeper_path() {
1579		let origin = Origin::random().produce();
1580		let broadcast1 = Broadcast::new().produce();
1581		let broadcast2 = Broadcast::new().produce();
1582		let broadcast3 = Broadcast::new().produce();
1583
1584		// Producer with broad permission
1585		let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
1586
1587		// Publish at various depths
1588		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1589		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1590		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1591
1592		// Narrow down to team2 only
1593		let mut team2_consumer = limited_producer
1594			.consume()
1595			.scope(&["org/team2".into()])
1596			.expect("should create team2 consumer");
1597
1598		team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1599		team2_consumer.assert_next_wait(); // Should NOT see team1 content
1600
1601		// Further narrow down to team1/project1
1602		let mut project1_consumer = limited_producer
1603			.consume()
1604			.scope(&["org/team1/project1".into()])
1605			.expect("should create project1 consumer");
1606
1607		// Should only see project1 content at root
1608		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1609		project1_consumer.assert_next_wait();
1610	}
1611
1612	#[tokio::test]
1613	async fn test_select_with_non_matching_prefix() {
1614		let origin = Origin::random().produce();
1615
1616		// Producer with specific allowed paths
1617		let limited_producer = origin
1618			.scope(&["allowed/path".into()])
1619			.expect("should create limited producer");
1620
1621		// Trying to scope with a completely different prefix should return None
1622		assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
1623
1624		// Similarly for scope
1625		assert!(limited_producer.scope(&["other/path".into()]).is_none());
1626	}
1627
1628	// Regression test for https://github.com/moq-dev/moq/issues/910
1629	// with_root panics when String has trailing slash (AsPath for String skips normalization)
1630	#[tokio::test]
1631	async fn test_with_root_trailing_slash_consumer() {
1632		let origin = Origin::random().produce();
1633
1634		// Use an owned String so the trailing slash is NOT normalized away.
1635		let prefix = "some_prefix/".to_string();
1636		let mut consumer = origin.consume().with_root(prefix).unwrap();
1637
1638		let b = origin.create_broadcast("some_prefix/test").unwrap();
1639		consumer.assert_next("test", &b.consume());
1640	}
1641
1642	// Same issue but for the producer side of with_root
1643	#[tokio::test]
1644	async fn test_with_root_trailing_slash_producer() {
1645		let origin = Origin::random().produce();
1646
1647		// Use an owned String so the trailing slash is NOT normalized away.
1648		let prefix = "some_prefix/".to_string();
1649		let rooted = origin.with_root(prefix).unwrap();
1650
1651		let b = rooted.create_broadcast("test").unwrap();
1652
1653		let mut consumer = rooted.consume();
1654		consumer.assert_next("test", &b.consume());
1655	}
1656
1657	// Verify unannounce also doesn't panic with trailing slash
1658	#[tokio::test]
1659	async fn test_with_root_trailing_slash_unannounce() {
1660		tokio::time::pause();
1661
1662		let origin = Origin::random().produce();
1663
1664		let prefix = "some_prefix/".to_string();
1665		let mut consumer = origin.consume().with_root(prefix).unwrap();
1666
1667		let b = origin.create_broadcast("some_prefix/test").unwrap();
1668		consumer.assert_next("test", &b.consume());
1669
1670		// Drop the broadcast producer to trigger unannounce
1671		drop(b);
1672		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1673
1674		// unannounce also calls strip_prefix(&self.root).unwrap()
1675		consumer.assert_next_none("test");
1676	}
1677
1678	#[tokio::test]
1679	async fn test_select_maintains_access_with_wider_prefix() {
1680		let origin = Origin::random().produce();
1681		let broadcast1 = Broadcast::new().produce();
1682		let broadcast2 = Broadcast::new().produce();
1683
1684		// Setup: user with root "demo" allowed to subscribe to specific paths
1685		let demo_producer = origin.with_root("demo").expect("should create demo root");
1686		let user_producer = demo_producer
1687			.scope(&["worm-node".into(), "foobar".into()])
1688			.expect("should create user producer");
1689
1690		// Publish some data
1691		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1692		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1693
1694		// Key test: scope with "" should maintain access to allowed roots
1695		let mut consumer = user_producer
1696			.consume()
1697			.scope(&["".into()])
1698			.expect("scope with empty prefix should not fail when user has specific permissions");
1699
1700		// Should still receive broadcasts from allowed paths (order not guaranteed)
1701		let a1 = consumer.try_announced().expect("expected first announcement");
1702		let a2 = consumer.try_announced().expect("expected second announcement");
1703		consumer.assert_next_wait();
1704
1705		let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
1706		paths.sort();
1707		assert_eq!(paths, ["foobar", "worm-node/data"]);
1708
1709		// Also test that we can still narrow the scope
1710		let mut narrow_consumer = user_producer
1711			.consume()
1712			.scope(&["worm-node".into()])
1713			.expect("should be able to narrow scope to worm-node");
1714
1715		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1716		narrow_consumer.assert_next_wait(); // Should not see foobar
1717	}
1718
1719	#[tokio::test]
1720	async fn test_duplicate_prefixes_deduped() {
1721		let origin = Origin::random().produce();
1722		let broadcast = Broadcast::new().produce();
1723
1724		// scope with duplicate prefixes should work (deduped internally)
1725		let producer = origin
1726			.scope(&["demo".into(), "demo".into()])
1727			.expect("should create producer");
1728
1729		assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
1730
1731		let mut consumer = producer.consume();
1732		consumer.assert_next("demo/stream", &broadcast.consume());
1733		consumer.assert_next_wait();
1734	}
1735
1736	#[tokio::test]
1737	async fn test_overlapping_prefixes_deduped() {
1738		let origin = Origin::random().produce();
1739		let broadcast = Broadcast::new().produce();
1740
1741		// "demo" and "demo/foo" — "demo/foo" is redundant, only "demo" should remain
1742		let producer = origin
1743			.scope(&["demo".into(), "demo/foo".into()])
1744			.expect("should create producer");
1745
1746		// Can still publish under "demo/bar" since "demo" covers everything
1747		assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
1748
1749		let mut consumer = producer.consume();
1750		consumer.assert_next("demo/bar/stream", &broadcast.consume());
1751		consumer.assert_next_wait();
1752	}
1753
1754	#[tokio::test]
1755	async fn test_overlapping_prefixes_no_duplicate_announcements() {
1756		let origin = Origin::random().produce();
1757		let broadcast = Broadcast::new().produce();
1758
1759		// Both "demo" and "demo/foo" are requested — should only have one node
1760		let producer = origin
1761			.scope(&["demo".into(), "demo/foo".into()])
1762			.expect("should create producer");
1763
1764		assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
1765
1766		let mut consumer = producer.consume();
1767		// Should only get ONE announcement (not two from overlapping nodes)
1768		consumer.assert_next("demo/foo/stream", &broadcast.consume());
1769		consumer.assert_next_wait();
1770	}
1771
1772	#[tokio::test]
1773	async fn test_allowed_returns_deduped_prefixes() {
1774		let origin = Origin::random().produce();
1775
1776		let producer = origin
1777			.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
1778			.expect("should create producer");
1779
1780		let allowed: Vec<_> = producer.allowed().collect();
1781		assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
1782	}
1783
1784	#[tokio::test]
1785	async fn test_announced_broadcast_already_announced() {
1786		let origin = Origin::random().produce();
1787		let broadcast = Broadcast::new().produce();
1788
1789		origin.publish_broadcast("test", broadcast.consume());
1790
1791		let consumer = origin.consume();
1792		let result = consumer.announced_broadcast("test").await.expect("should find it");
1793		assert!(result.is_clone(&broadcast.consume()));
1794	}
1795
1796	#[tokio::test]
1797	async fn test_announced_broadcast_delayed() {
1798		tokio::time::pause();
1799
1800		let origin = Origin::random().produce();
1801		let broadcast = Broadcast::new().produce();
1802
1803		let consumer = origin.consume();
1804
1805		// Start waiting before it's announced.
1806		let wait = tokio::spawn({
1807			let consumer = consumer.clone();
1808			async move { consumer.announced_broadcast("test").await }
1809		});
1810
1811		// Give the spawned task a chance to subscribe.
1812		tokio::task::yield_now().await;
1813
1814		origin.publish_broadcast("test", broadcast.consume());
1815
1816		let result = wait.await.unwrap().expect("should find it");
1817		assert!(result.is_clone(&broadcast.consume()));
1818	}
1819
1820	#[tokio::test]
1821	async fn test_announced_broadcast_ignores_unrelated_paths() {
1822		tokio::time::pause();
1823
1824		let origin = Origin::random().produce();
1825		let other = Broadcast::new().produce();
1826		let target = Broadcast::new().produce();
1827
1828		let consumer = origin.consume();
1829
1830		let wait = tokio::spawn({
1831			let consumer = consumer.clone();
1832			async move { consumer.announced_broadcast("target").await }
1833		});
1834
1835		tokio::task::yield_now().await;
1836
1837		// Publish an unrelated broadcast first — announced_broadcast should skip it.
1838		origin.publish_broadcast("other", other.consume());
1839		tokio::task::yield_now().await;
1840		assert!(!wait.is_finished(), "must not resolve on unrelated path");
1841
1842		origin.publish_broadcast("target", target.consume());
1843		let result = wait.await.unwrap().expect("should find target");
1844		assert!(result.is_clone(&target.consume()));
1845	}
1846
1847	#[tokio::test]
1848	async fn test_announced_broadcast_skips_nested_paths() {
1849		tokio::time::pause();
1850
1851		let origin = Origin::random().produce();
1852		let nested = Broadcast::new().produce();
1853		let exact = Broadcast::new().produce();
1854
1855		let consumer = origin.consume();
1856
1857		let wait = tokio::spawn({
1858			let consumer = consumer.clone();
1859			async move { consumer.announced_broadcast("foo").await }
1860		});
1861
1862		tokio::task::yield_now().await;
1863
1864		// "foo/bar" is under the prefix scope, but it's not the exact path — skip it.
1865		origin.publish_broadcast("foo/bar", nested.consume());
1866		tokio::task::yield_now().await;
1867		assert!(!wait.is_finished(), "must not resolve on a nested path");
1868
1869		origin.publish_broadcast("foo", exact.consume());
1870		let result = wait.await.unwrap().expect("should find foo exactly");
1871		assert!(result.is_clone(&exact.consume()));
1872	}
1873
1874	#[tokio::test]
1875	async fn test_announced_broadcast_disallowed() {
1876		let origin = Origin::random().produce();
1877		let limited = origin
1878			.consume()
1879			.scope(&["allowed".into()])
1880			.expect("should create limited");
1881
1882		// Path is outside allowed prefixes — should return None immediately.
1883		assert!(limited.announced_broadcast("notallowed").await.is_none());
1884	}
1885
1886	#[tokio::test]
1887	async fn test_announced_broadcast_scope_too_narrow() {
1888		// Consumer's scope is narrower than the requested path: asking for `foo` on a consumer
1889		// limited to `foo/specific` can never resolve. Must return None, not loop forever.
1890		let origin = Origin::random().produce();
1891		let limited = origin
1892			.consume()
1893			.scope(&["foo/specific".into()])
1894			.expect("should create limited");
1895
1896		// now_or_never so we fail fast instead of hanging if the guard regresses.
1897		let result = limited
1898			.announced_broadcast("foo")
1899			.now_or_never()
1900			.expect("must not block");
1901		assert!(result.is_none());
1902	}
1903}