Skip to main content

moq_lite/model/
origin.rs

1use std::{
2	collections::HashMap,
3	sync::atomic::{AtomicU64, Ordering},
4};
5use tokio::sync::mpsc;
6use web_async::Lock;
7
8use super::BroadcastConsumer;
9use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned};
10
11static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14struct ConsumerId(u64);
15
16impl ConsumerId {
17	fn new() -> Self {
18		Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
19	}
20}
21
22// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
23struct OriginBroadcast {
24	path: PathOwned,
25	active: BroadcastConsumer,
26	backup: Vec<BroadcastConsumer>,
27}
28
29#[derive(Clone)]
30struct OriginConsumerNotify {
31	root: PathOwned,
32	tx: mpsc::UnboundedSender<OriginAnnounce>,
33}
34
35impl OriginConsumerNotify {
36	fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
37		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
38		self.tx.send((path, Some(broadcast))).expect("consumer closed");
39	}
40
41	fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
42		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
43		self.tx.send((path.clone(), None)).expect("consumer closed");
44		self.tx.send((path, Some(broadcast))).expect("consumer closed");
45	}
46
47	fn unannounce(&self, path: impl AsPath) {
48		let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
49		self.tx.send((path, None)).expect("consumer closed");
50	}
51}
52
53struct NotifyNode {
54	parent: Option<Lock<NotifyNode>>,
55
56	// Consumers that are subscribed to this node.
57	// We store a consumer ID so we can remove it easily when it closes.
58	consumers: HashMap<ConsumerId, OriginConsumerNotify>,
59}
60
61impl NotifyNode {
62	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
63		Self {
64			parent,
65			consumers: HashMap::new(),
66		}
67	}
68
69	fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
70		for consumer in self.consumers.values() {
71			consumer.announce(path.as_path(), broadcast.clone());
72		}
73
74		if let Some(parent) = &self.parent {
75			parent.lock().announce(path, broadcast);
76		}
77	}
78
79	fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
80		for consumer in self.consumers.values() {
81			consumer.reannounce(path.as_path(), broadcast.clone());
82		}
83
84		if let Some(parent) = &self.parent {
85			parent.lock().reannounce(path, broadcast);
86		}
87	}
88
89	fn unannounce(&mut self, path: impl AsPath) {
90		for consumer in self.consumers.values() {
91			consumer.unannounce(path.as_path());
92		}
93
94		if let Some(parent) = &self.parent {
95			parent.lock().unannounce(path);
96		}
97	}
98}
99
100struct OriginNode {
101	// The broadcast that is published to this node.
102	broadcast: Option<OriginBroadcast>,
103
104	// Nested nodes, one level down the tree.
105	nested: HashMap<String, Lock<OriginNode>>,
106
107	// Unfortunately, to notify consumers we need to traverse back up the tree.
108	notify: Lock<NotifyNode>,
109}
110
111impl OriginNode {
112	fn new(parent: Option<Lock<NotifyNode>>) -> Self {
113		Self {
114			broadcast: None,
115			nested: HashMap::new(),
116			notify: Lock::new(NotifyNode::new(parent)),
117		}
118	}
119
120	fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
121		let (dir, rest) = path.next_part().expect("leaf called with empty path");
122
123		let next = self.entry(dir);
124		if rest.is_empty() { next } else { next.lock().leaf(&rest) }
125	}
126
127	fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
128		match self.nested.get(dir) {
129			Some(next) => next.clone(),
130			None => {
131				let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
132				self.nested.insert(dir.to_string(), next.clone());
133				next
134			}
135		}
136	}
137
138	fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
139		let full = full.as_path();
140		let rest = relative.as_path();
141
142		// If the path has a directory component, then publish it to the nested node.
143		if let Some((dir, relative)) = rest.next_part() {
144			// Not using entry to avoid allocating a string most of the time.
145			self.entry(dir).lock().publish(&full, broadcast, &relative);
146		} else if let Some(existing) = &mut self.broadcast {
147			// This node is a leaf with an existing broadcast.
148			let old = existing.active.clone();
149			existing.active = broadcast.clone();
150			existing.backup.push(old);
151
152			self.notify.lock().reannounce(full, broadcast);
153		} else {
154			// This node is a leaf with no existing broadcast.
155			self.broadcast = Some(OriginBroadcast {
156				path: full.to_owned(),
157				active: broadcast.clone(),
158				backup: Vec::new(),
159			});
160			self.notify.lock().announce(full, broadcast);
161		}
162	}
163
164	fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
165		self.consume_initial(&mut notify);
166		self.notify.lock().consumers.insert(id, notify);
167	}
168
169	fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
170		if let Some(broadcast) = &self.broadcast {
171			notify.announce(&broadcast.path, broadcast.active.clone());
172		}
173
174		// Recursively subscribe to all nested nodes.
175		for nested in self.nested.values() {
176			nested.lock().consume_initial(notify);
177		}
178	}
179
180	fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
181		let rest = rest.as_path();
182
183		if let Some((dir, rest)) = rest.next_part() {
184			let node = self.nested.get(dir)?.lock();
185			node.consume_broadcast(&rest)
186		} else {
187			self.broadcast.as_ref().map(|b| b.active.clone())
188		}
189	}
190
191	fn unconsume(&mut self, id: ConsumerId) {
192		self.notify.lock().consumers.remove(&id).expect("consumer not found");
193		if self.is_empty() {
194			//tracing::warn!("TODO: empty node; memory leak");
195			// This happens when consuming a path that is not being broadcasted.
196		}
197	}
198
199	// Returns true if the broadcast should be unannounced.
200	fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
201		let full = full.as_path();
202		let relative = relative.as_path();
203
204		if let Some((dir, relative)) = relative.next_part() {
205			let nested = self.entry(dir);
206			let mut locked = nested.lock();
207			locked.remove(&full, broadcast, &relative);
208
209			if locked.is_empty() {
210				drop(locked);
211				self.nested.remove(dir);
212			}
213		} else {
214			let entry = match &mut self.broadcast {
215				Some(existing) => existing,
216				None => return,
217			};
218
219			// See if we can remove the broadcast from the backup list.
220			let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
221			if let Some(pos) = pos {
222				entry.backup.remove(pos);
223				// Nothing else to do
224				return;
225			}
226
227			// Okay so it must be the active broadcast or else we fucked up.
228			assert!(entry.active.is_clone(&broadcast));
229
230			// If there's a backup broadcast, then announce it.
231			if let Some(active) = entry.backup.pop() {
232				entry.active = active;
233				self.notify.lock().reannounce(full, &entry.active);
234			} else {
235				// No more backups, so remove the entry.
236				self.broadcast = None;
237				self.notify.lock().unannounce(full);
238			}
239		}
240	}
241
242	fn is_empty(&self) -> bool {
243		self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
244	}
245}
246
247#[derive(Clone)]
248struct OriginNodes {
249	nodes: Vec<(PathOwned, Lock<OriginNode>)>,
250}
251
252impl OriginNodes {
253	// Returns nested roots that match the prefixes.
254	// TODO enforce that prefixes can't overlap.
255	pub fn select(&self, prefixes: &[Path]) -> Option<Self> {
256		let mut roots = Vec::new();
257
258		for (root, state) in &self.nodes {
259			for prefix in prefixes {
260				if root.has_prefix(prefix) {
261					// Keep the existing node if we're allowed to access it.
262					roots.push((root.to_owned(), state.clone()));
263					continue;
264				}
265
266				if let Some(suffix) = prefix.strip_prefix(root) {
267					// If the requested prefix is larger than the allowed prefix, then we further scope it.
268					let nested = state.lock().leaf(&suffix);
269					roots.push((prefix.to_owned(), nested));
270				}
271			}
272		}
273
274		if roots.is_empty() {
275			None
276		} else {
277			Some(Self { nodes: roots })
278		}
279	}
280
281	pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
282		let new_root = new_root.as_path();
283		let mut roots = Vec::new();
284
285		if new_root.is_empty() {
286			return Some(self.clone());
287		}
288
289		for (root, state) in &self.nodes {
290			if let Some(suffix) = root.strip_prefix(&new_root) {
291				// If the old root is longer than the new root, shorten the keys.
292				roots.push((suffix.to_owned(), state.clone()));
293			} else if let Some(suffix) = new_root.strip_prefix(root) {
294				// If the new root is longer than the old root, add a new root.
295				// NOTE: suffix can't be empty
296				let nested = state.lock().leaf(&suffix);
297				roots.push(("".into(), nested));
298			}
299		}
300
301		if roots.is_empty() {
302			None
303		} else {
304			Some(Self { nodes: roots })
305		}
306	}
307
308	// Returns the root that has this prefix.
309	pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
310		let path = path.as_path();
311
312		for (root, state) in &self.nodes {
313			if let Some(suffix) = path.strip_prefix(root) {
314				return Some((state.clone(), suffix.to_owned()));
315			}
316		}
317
318		None
319	}
320}
321
322impl Default for OriginNodes {
323	fn default() -> Self {
324		Self {
325			nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
326		}
327	}
328}
329
330/// A broadcast path and its associated consumer, or None if closed.
331pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
332
333/// A collection of broadcasts that can be published and subscribed to.
334pub struct Origin {}
335
336impl Origin {
337	pub fn produce() -> OriginProducer {
338		OriginProducer::new()
339	}
340}
341
342/// Announces broadcasts to consumers over the network.
343#[derive(Clone, Default)]
344pub struct OriginProducer {
345	// The roots of the tree that we are allowed to publish.
346	// A path of "" means we can publish anything.
347	nodes: OriginNodes,
348
349	/// The prefix that is automatically stripped from all paths.
350	root: PathOwned,
351}
352
353impl OriginProducer {
354	pub fn new() -> Self {
355		Self::default()
356	}
357
358	/// Create and publish a new broadcast, returning the producer.
359	///
360	/// This is a helper method when you only want to publish a broadcast to a single origin.
361	/// Returns [None] if the broadcast is not allowed to be published.
362	pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
363		let broadcast = Broadcast::produce();
364		self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
365	}
366
367	/// Publish a broadcast, announcing it to all consumers.
368	///
369	/// The broadcast will be unannounced when it is closed.
370	/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
371	/// If the old broadcast is closed before the new one, then nothing will happen.
372	/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
373	///
374	/// Returns false if the broadcast is not allowed to be published.
375	pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
376		let path = path.as_path();
377
378		let (root, rest) = match self.nodes.get(&path) {
379			Some(root) => root,
380			None => return false,
381		};
382
383		let full = self.root.join(&path);
384
385		root.lock().publish(&full, &broadcast, &rest);
386		let root = root.clone();
387
388		web_async::spawn(async move {
389			broadcast.closed().await;
390			root.lock().remove(&full, broadcast, &rest);
391		});
392
393		true
394	}
395
396	/// Returns a new OriginProducer where all published broadcasts MUST match one of the prefixes.
397	///
398	/// Returns None if there are no legal prefixes.
399	pub fn publish_only(&self, prefixes: &[Path]) -> Option<OriginProducer> {
400		Some(OriginProducer {
401			nodes: self.nodes.select(prefixes)?,
402			root: self.root.clone(),
403		})
404	}
405
406	/// Subscribe to all announced broadcasts.
407	pub fn consume(&self) -> OriginConsumer {
408		OriginConsumer::new(self.root.clone(), self.nodes.clone())
409	}
410
411	/// Subscribe to all announced broadcasts matching the prefix.
412	///
413	/// TODO: Don't use overlapping prefixes or duplicates will be published.
414	///
415	/// Returns None if there are no legal prefixes.
416	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
417		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
418	}
419
420	/// Subscribe to a specific broadcast.
421	///
422	/// Returns None if the broadcast is not found.
423	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
424		let path = path.as_path();
425		let (root, rest) = self.nodes.get(&path)?;
426		let state = root.lock();
427		state.consume_broadcast(&rest)
428	}
429
430	/// Returns a new OriginProducer that automatically strips out the provided prefix.
431	///
432	/// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard.
433	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
434		let prefix = prefix.as_path();
435
436		Some(Self {
437			root: self.root.join(&prefix).to_owned(),
438			nodes: self.nodes.root(&prefix)?,
439		})
440	}
441
442	/// Returns the root that is automatically stripped from all paths.
443	pub fn root(&self) -> &Path<'_> {
444		&self.root
445	}
446
447	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
448		self.nodes.nodes.iter().map(|(root, _)| root)
449	}
450
451	/// Converts a relative path to an absolute path.
452	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
453		self.root.join(path)
454	}
455}
456
457/// Consumes announced broadcasts matching against an optional prefix.
458///
459/// NOTE: Clone is expensive, try to avoid it.
460pub struct OriginConsumer {
461	id: ConsumerId,
462	nodes: OriginNodes,
463	updates: mpsc::UnboundedReceiver<OriginAnnounce>,
464
465	/// A prefix that is automatically stripped from all paths.
466	root: PathOwned,
467}
468
469impl OriginConsumer {
470	fn new(root: PathOwned, nodes: OriginNodes) -> Self {
471		let (tx, rx) = mpsc::unbounded_channel();
472
473		let id = ConsumerId::new();
474
475		for (_, state) in &nodes.nodes {
476			let notify = OriginConsumerNotify {
477				root: root.clone(),
478				tx: tx.clone(),
479			};
480			state.lock().consume(id, notify);
481		}
482
483		Self {
484			id,
485			nodes,
486			updates: rx,
487			root,
488		}
489	}
490
491	/// Returns the next (un)announced broadcast and the absolute path.
492	///
493	/// The broadcast will only be announced if it was previously unannounced.
494	/// The same path won't be announced/unannounced twice, instead it will toggle.
495	/// Returns None if the consumer is closed.
496	///
497	/// Note: The returned path is absolute and will always match this consumer's prefix.
498	pub async fn announced(&mut self) -> Option<OriginAnnounce> {
499		self.updates.recv().await
500	}
501
502	/// Returns the next (un)announced broadcast and the absolute path without blocking.
503	///
504	/// Returns None if there is no update available; NOT because the consumer is closed.
505	/// You have to use `is_closed` to check if the consumer is closed.
506	pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
507		self.updates.try_recv().ok()
508	}
509
510	pub fn consume(&self) -> Self {
511		self.clone()
512	}
513
514	/// Get a specific broadcast by path.
515	///
516	/// TODO This should include announcement support.
517	///
518	/// Returns None if the path hasn't been announced yet.
519	pub fn consume_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
520		let path = path.as_path();
521		let (root, rest) = self.nodes.get(&path)?;
522		let state = root.lock();
523		state.consume_broadcast(&rest)
524	}
525
526	/// Returns a new OriginConsumer that only consumes broadcasts matching one of the prefixes.
527	///
528	/// Returns None if there are no legal prefixes (would always return None).
529	pub fn consume_only(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
530		Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?))
531	}
532
533	/// Returns a new OriginConsumer that automatically strips out the provided prefix.
534	///
535	/// Returns None if the provided root is not authorized; when consume_only was already used without a wildcard.
536	pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
537		let prefix = prefix.as_path();
538
539		Some(Self::new(self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?))
540	}
541
542	/// Returns the prefix that is automatically stripped from all paths.
543	pub fn root(&self) -> &Path<'_> {
544		&self.root
545	}
546
547	pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
548		self.nodes.nodes.iter().map(|(root, _)| root)
549	}
550
551	/// Converts a relative path to an absolute path.
552	pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
553		self.root.join(path)
554	}
555}
556
557impl Drop for OriginConsumer {
558	fn drop(&mut self) {
559		for (_, root) in &self.nodes.nodes {
560			root.lock().unconsume(self.id);
561		}
562	}
563}
564
565impl Clone for OriginConsumer {
566	fn clone(&self) -> Self {
567		OriginConsumer::new(self.root.clone(), self.nodes.clone())
568	}
569}
570
571#[cfg(test)]
572use futures::FutureExt;
573
574#[cfg(test)]
575impl OriginConsumer {
576	pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
577		let expected = expected.as_path();
578		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
579		assert_eq!(path, expected, "wrong path");
580		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
581	}
582
583	pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
584		let expected = expected.as_path();
585		let (path, active) = self.try_announced().expect("no next");
586		assert_eq!(path, expected, "wrong path");
587		assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
588	}
589
590	pub fn assert_next_none(&mut self, expected: impl AsPath) {
591		let expected = expected.as_path();
592		let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
593		assert_eq!(path, expected, "wrong path");
594		assert!(active.is_none(), "should be unannounced");
595	}
596
597	pub fn assert_next_wait(&mut self) {
598		if let Some(res) = self.announced().now_or_never() {
599			panic!("next should block: got {:?}", res.map(|(path, _)| path));
600		}
601	}
602
603	/*
604	pub fn assert_next_closed(&mut self) {
605		assert!(
606			self.announced().now_or_never().expect("next blocked").is_none(),
607			"next should be closed"
608		);
609	}
610	*/
611}
612
613#[cfg(test)]
614mod tests {
615	use crate::Broadcast;
616
617	use super::*;
618
619	#[tokio::test]
620	async fn test_announce() {
621		let origin = Origin::produce();
622		let broadcast1 = Broadcast::produce();
623		let broadcast2 = Broadcast::produce();
624
625		let mut consumer1 = origin.consume();
626		// Make a new consumer that should get it.
627		consumer1.assert_next_wait();
628
629		// Publish the first broadcast.
630		origin.publish_broadcast("test1", broadcast1.consume());
631
632		consumer1.assert_next("test1", &broadcast1.consume());
633		consumer1.assert_next_wait();
634
635		// Make a new consumer that should get the existing broadcast.
636		// But we don't consume it yet.
637		let mut consumer2 = origin.consume();
638
639		// Publish the second broadcast.
640		origin.publish_broadcast("test2", broadcast2.consume());
641
642		consumer1.assert_next("test2", &broadcast2.consume());
643		consumer1.assert_next_wait();
644
645		consumer2.assert_next("test1", &broadcast1.consume());
646		consumer2.assert_next("test2", &broadcast2.consume());
647		consumer2.assert_next_wait();
648
649		// Close the first broadcast.
650		drop(broadcast1);
651
652		// Wait for the async task to run.
653		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
654
655		// All consumers should get a None now.
656		consumer1.assert_next_none("test1");
657		consumer2.assert_next_none("test1");
658		consumer1.assert_next_wait();
659		consumer2.assert_next_wait();
660
661		// And a new consumer only gets the last broadcast.
662		let mut consumer3 = origin.consume();
663		consumer3.assert_next("test2", &broadcast2.consume());
664		consumer3.assert_next_wait();
665
666		// Close the other producer and make sure it cleans up
667		drop(broadcast2);
668
669		// Wait for the async task to run.
670		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
671
672		consumer1.assert_next_none("test2");
673		consumer2.assert_next_none("test2");
674		consumer3.assert_next_none("test2");
675
676		/* TODO close the origin consumer when the producer is dropped
677		consumer1.assert_next_closed();
678		consumer2.assert_next_closed();
679		consumer3.assert_next_closed();
680		*/
681	}
682
683	#[tokio::test]
684	async fn test_duplicate() {
685		let origin = Origin::produce();
686
687		let broadcast1 = Broadcast::produce();
688		let broadcast2 = Broadcast::produce();
689		let broadcast3 = Broadcast::produce();
690
691		let consumer1 = broadcast1.consume();
692		let consumer2 = broadcast2.consume();
693		let consumer3 = broadcast3.consume();
694
695		origin.publish_broadcast("test", consumer1.clone());
696		origin.publish_broadcast("test", consumer2.clone());
697		origin.publish_broadcast("test", consumer3.clone());
698
699		let mut consumer = origin.consume();
700		assert!(consumer.consume_broadcast("test").is_some());
701
702		consumer.assert_next("test", &consumer1);
703		consumer.assert_next_none("test");
704		consumer.assert_next("test", &consumer2);
705		consumer.assert_next_none("test");
706		consumer.assert_next("test", &consumer3);
707
708		// Drop the backup, nothing should change.
709		drop(broadcast2);
710
711		// Wait for the async task to run.
712		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
713
714		assert!(consumer.consume_broadcast("test").is_some());
715		consumer.assert_next_wait();
716
717		// Drop the active, we should reannounce.
718		drop(broadcast3);
719
720		// Wait for the async task to run.
721		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
722
723		assert!(consumer.consume_broadcast("test").is_some());
724		consumer.assert_next_none("test");
725		consumer.assert_next("test", &consumer1);
726
727		// Drop the final broadcast, we should unannounce.
728		drop(broadcast1);
729
730		// Wait for the async task to run.
731		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
732		assert!(consumer.consume_broadcast("test").is_none());
733
734		consumer.assert_next_none("test");
735		consumer.assert_next_wait();
736	}
737
738	#[tokio::test]
739	async fn test_duplicate_reverse() {
740		let origin = Origin::produce();
741		let broadcast1 = Broadcast::produce();
742		let broadcast2 = Broadcast::produce();
743
744		origin.publish_broadcast("test", broadcast1.consume());
745		origin.publish_broadcast("test", broadcast2.consume());
746		assert!(origin.consume_broadcast("test").is_some());
747
748		// This is harder, dropping the new broadcast first.
749		drop(broadcast2);
750
751		// Wait for the cleanup async task to run.
752		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
753		assert!(origin.consume_broadcast("test").is_some());
754
755		drop(broadcast1);
756
757		// Wait for the cleanup async task to run.
758		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
759		assert!(origin.consume_broadcast("test").is_none());
760	}
761
762	#[tokio::test]
763	async fn test_double_publish() {
764		let origin = Origin::produce();
765		let broadcast = Broadcast::produce();
766
767		// Ensure it doesn't crash.
768		origin.publish_broadcast("test", broadcast.consume());
769		origin.publish_broadcast("test", broadcast.consume());
770
771		assert!(origin.consume_broadcast("test").is_some());
772
773		drop(broadcast);
774
775		// Wait for the async task to run.
776		tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
777		assert!(origin.consume_broadcast("test").is_none());
778	}
779	// There was a tokio bug where only the first 127 broadcasts would be received instantly.
780	#[tokio::test]
781	#[should_panic]
782	async fn test_128() {
783		let origin = Origin::produce();
784		let broadcast = Broadcast::produce();
785
786		for i in 0..256 {
787			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
788		}
789
790		let mut consumer = origin.consume();
791		for i in 0..256 {
792			consumer.assert_next(format!("test{i}"), &broadcast.consume());
793		}
794	}
795
796	#[tokio::test]
797	async fn test_128_fix() {
798		let origin = Origin::produce();
799		let broadcast = Broadcast::produce();
800
801		for i in 0..256 {
802			origin.publish_broadcast(format!("test{i}"), broadcast.consume());
803		}
804
805		let mut consumer = origin.consume();
806		for i in 0..256 {
807			// try_next does not have the same issue because it's synchronous.
808			consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
809		}
810	}
811
812	#[tokio::test]
813	async fn test_with_root_basic() {
814		let origin = Origin::produce();
815		let broadcast = Broadcast::produce();
816
817		// Create a producer with root "/foo"
818		let foo_producer = origin.with_root("foo").expect("should create root");
819		assert_eq!(foo_producer.root().as_str(), "foo");
820
821		// When publishing to "bar/baz", it should actually publish to "foo/bar/baz"
822		assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
823
824		let mut consumer = origin.consume();
825		// The original consumer should see the full path
826		consumer.assert_next("foo/bar/baz", &broadcast.consume());
827
828		// A consumer created from the rooted producer should see the stripped path
829		let mut foo_consumer = foo_producer.consume();
830		foo_consumer.assert_next("bar/baz", &broadcast.consume());
831	}
832
833	#[tokio::test]
834	async fn test_with_root_nested() {
835		let origin = Origin::produce();
836		let broadcast = Broadcast::produce();
837
838		// Create nested roots
839		let foo_producer = origin.with_root("foo").expect("should create foo root");
840		let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
841		assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
842
843		// Publishing to "baz" should actually publish to "foo/bar/baz"
844		assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
845
846		let mut consumer = origin.consume();
847		// The original consumer sees the full path
848		consumer.assert_next("foo/bar/baz", &broadcast.consume());
849
850		// Consumer from foo_bar_producer sees just "baz"
851		let mut foo_bar_consumer = foo_bar_producer.consume();
852		foo_bar_consumer.assert_next("baz", &broadcast.consume());
853	}
854
855	#[tokio::test]
856	async fn test_publish_only_allows() {
857		let origin = Origin::produce();
858		let broadcast = Broadcast::produce();
859
860		// Create a producer that can only publish to "allowed" paths
861		let limited_producer = origin
862			.publish_only(&["allowed/path1".into(), "allowed/path2".into()])
863			.expect("should create limited producer");
864
865		// Should be able to publish to allowed paths
866		assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
867		assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
868		assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
869
870		// Should not be able to publish to disallowed paths
871		assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
872		assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path
873		assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
874	}
875
876	#[tokio::test]
877	async fn test_publish_only_empty() {
878		let origin = Origin::produce();
879
880		// Creating a producer with no allowed paths should return None
881		assert!(origin.publish_only(&[]).is_none());
882	}
883
884	#[tokio::test]
885	async fn test_consume_only_filters() {
886		let origin = Origin::produce();
887		let broadcast1 = Broadcast::produce();
888		let broadcast2 = Broadcast::produce();
889		let broadcast3 = Broadcast::produce();
890
891		// Publish to different paths
892		origin.publish_broadcast("allowed", broadcast1.consume());
893		origin.publish_broadcast("allowed/nested", broadcast2.consume());
894		origin.publish_broadcast("notallowed", broadcast3.consume());
895
896		// Create a consumer that only sees "allowed" paths
897		let mut limited_consumer = origin
898			.consume_only(&["allowed".into()])
899			.expect("should create limited consumer");
900
901		// Should only receive broadcasts under "allowed"
902		limited_consumer.assert_next("allowed", &broadcast1.consume());
903		limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
904		limited_consumer.assert_next_wait(); // Should not see "notallowed"
905
906		// Unscoped consumer should see all
907		let mut consumer = origin.consume();
908		consumer.assert_next("allowed", &broadcast1.consume());
909		consumer.assert_next("allowed/nested", &broadcast2.consume());
910		consumer.assert_next("notallowed", &broadcast3.consume());
911	}
912
913	#[tokio::test]
914	async fn test_consume_only_multiple_prefixes() {
915		let origin = Origin::produce();
916		let broadcast1 = Broadcast::produce();
917		let broadcast2 = Broadcast::produce();
918		let broadcast3 = Broadcast::produce();
919
920		origin.publish_broadcast("foo/test", broadcast1.consume());
921		origin.publish_broadcast("bar/test", broadcast2.consume());
922		origin.publish_broadcast("baz/test", broadcast3.consume());
923
924		// Consumer that only sees "foo" and "bar" paths
925		let mut limited_consumer = origin
926			.consume_only(&["foo".into(), "bar".into()])
927			.expect("should create limited consumer");
928
929		limited_consumer.assert_next("foo/test", &broadcast1.consume());
930		limited_consumer.assert_next("bar/test", &broadcast2.consume());
931		limited_consumer.assert_next_wait(); // Should not see "baz/test"
932	}
933
934	#[tokio::test]
935	async fn test_with_root_and_publish_only() {
936		let origin = Origin::produce();
937		let broadcast = Broadcast::produce();
938
939		// User connects to /foo root
940		let foo_producer = origin.with_root("foo").expect("should create foo root");
941
942		// Limit them to publish only to "bar" and "goop/pee" within /foo
943		let limited_producer = foo_producer
944			.publish_only(&["bar".into(), "goop/pee".into()])
945			.expect("should create limited producer");
946
947		// Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee)
948		assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
949		assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
950		assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
951		assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
952
953		// Should not be able to publish outside allowed paths
954		assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
955		assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed
956		assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
957
958		// Original consumer sees full paths
959		let mut consumer = origin.consume();
960		consumer.assert_next("foo/bar", &broadcast.consume());
961		consumer.assert_next("foo/bar/nested", &broadcast.consume());
962		consumer.assert_next("foo/goop/pee", &broadcast.consume());
963		consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
964	}
965
966	#[tokio::test]
967	async fn test_with_root_and_consume_only() {
968		let origin = Origin::produce();
969		let broadcast1 = Broadcast::produce();
970		let broadcast2 = Broadcast::produce();
971		let broadcast3 = Broadcast::produce();
972
973		// Publish broadcasts
974		origin.publish_broadcast("foo/bar/test", broadcast1.consume());
975		origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
976		origin.publish_broadcast("foo/other/test", broadcast3.consume());
977
978		// User connects to /foo root
979		let foo_producer = origin.with_root("foo").expect("should create foo root");
980
981		// Create consumer limited to "bar" and "goop/pee" within /foo
982		let mut limited_consumer = foo_producer
983			.consume_only(&["bar".into(), "goop/pee".into()])
984			.expect("should create limited consumer");
985
986		// Should only see allowed paths (without foo prefix)
987		limited_consumer.assert_next("bar/test", &broadcast1.consume());
988		limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
989		limited_consumer.assert_next_wait(); // Should not see "other/test"
990	}
991
992	#[tokio::test]
993	async fn test_with_root_unauthorized() {
994		let origin = Origin::produce();
995
996		// First limit the producer to specific paths
997		let limited_producer = origin
998			.publish_only(&["allowed".into()])
999			.expect("should create limited producer");
1000
1001		// Trying to create a root outside allowed paths should fail
1002		assert!(limited_producer.with_root("notallowed").is_none());
1003
1004		// But creating a root within allowed paths should work
1005		let allowed_root = limited_producer
1006			.with_root("allowed")
1007			.expect("should create allowed root");
1008		assert_eq!(allowed_root.root().as_str(), "allowed");
1009	}
1010
1011	#[tokio::test]
1012	async fn test_wildcard_permission() {
1013		let origin = Origin::produce();
1014		let broadcast = Broadcast::produce();
1015
1016		// Producer with root access (empty string means wildcard)
1017		let root_producer = origin.clone();
1018
1019		// Should be able to publish anywhere
1020		assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
1021		assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
1022
1023		// Can create any root
1024		let foo_producer = root_producer.with_root("foo").expect("should create any root");
1025		assert_eq!(foo_producer.root().as_str(), "foo");
1026	}
1027
1028	#[tokio::test]
1029	async fn test_consume_broadcast_with_permissions() {
1030		let origin = Origin::produce();
1031		let broadcast1 = Broadcast::produce();
1032		let broadcast2 = Broadcast::produce();
1033
1034		origin.publish_broadcast("allowed/test", broadcast1.consume());
1035		origin.publish_broadcast("notallowed/test", broadcast2.consume());
1036
1037		// Create limited consumer
1038		let limited_consumer = origin
1039			.consume_only(&["allowed".into()])
1040			.expect("should create limited consumer");
1041
1042		// Should be able to get allowed broadcast
1043		let result = limited_consumer.consume_broadcast("allowed/test");
1044		assert!(result.is_some());
1045		assert!(result.unwrap().is_clone(&broadcast1.consume()));
1046
1047		// Should not be able to get disallowed broadcast
1048		assert!(limited_consumer.consume_broadcast("notallowed/test").is_none());
1049
1050		// Original consumer can get both
1051		let consumer = origin.consume();
1052		assert!(consumer.consume_broadcast("allowed/test").is_some());
1053		assert!(consumer.consume_broadcast("notallowed/test").is_some());
1054	}
1055
1056	#[tokio::test]
1057	async fn test_nested_paths_with_permissions() {
1058		let origin = Origin::produce();
1059		let broadcast = Broadcast::produce();
1060
1061		// Create producer limited to "a/b/c"
1062		let limited_producer = origin
1063			.publish_only(&["a/b/c".into()])
1064			.expect("should create limited producer");
1065
1066		// Should be able to publish to exact path and nested paths
1067		assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
1068		assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
1069		assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
1070
1071		// Should not be able to publish to parent or sibling paths
1072		assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
1073		assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
1074		assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
1075	}
1076
1077	#[tokio::test]
1078	async fn test_multiple_consumers_with_different_permissions() {
1079		let origin = Origin::produce();
1080		let broadcast1 = Broadcast::produce();
1081		let broadcast2 = Broadcast::produce();
1082		let broadcast3 = Broadcast::produce();
1083
1084		// Publish to different paths
1085		origin.publish_broadcast("foo/test", broadcast1.consume());
1086		origin.publish_broadcast("bar/test", broadcast2.consume());
1087		origin.publish_broadcast("baz/test", broadcast3.consume());
1088
1089		// Create consumers with different permissions
1090		let mut foo_consumer = origin
1091			.consume_only(&["foo".into()])
1092			.expect("should create foo consumer");
1093
1094		let mut bar_consumer = origin
1095			.consume_only(&["bar".into()])
1096			.expect("should create bar consumer");
1097
1098		let mut foobar_consumer = origin
1099			.consume_only(&["foo".into(), "bar".into()])
1100			.expect("should create foobar consumer");
1101
1102		// Each consumer should only see their allowed paths
1103		foo_consumer.assert_next("foo/test", &broadcast1.consume());
1104		foo_consumer.assert_next_wait();
1105
1106		bar_consumer.assert_next("bar/test", &broadcast2.consume());
1107		bar_consumer.assert_next_wait();
1108
1109		foobar_consumer.assert_next("foo/test", &broadcast1.consume());
1110		foobar_consumer.assert_next("bar/test", &broadcast2.consume());
1111		foobar_consumer.assert_next_wait();
1112	}
1113
1114	#[tokio::test]
1115	async fn test_select_with_empty_prefix() {
1116		let origin = Origin::produce();
1117		let broadcast1 = Broadcast::produce();
1118		let broadcast2 = Broadcast::produce();
1119
1120		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1121		let demo_producer = origin.with_root("demo").expect("should create demo root");
1122		let limited_producer = demo_producer
1123			.publish_only(&["worm-node".into(), "foobar".into()])
1124			.expect("should create limited producer");
1125
1126		// Publish some broadcasts
1127		assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
1128		assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
1129
1130		// consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes
1131		let mut consumer = limited_producer
1132			.consume_only(&["".into()])
1133			.expect("should create consumer with empty prefix");
1134
1135		// Should still see both broadcasts
1136		consumer.assert_next("worm-node/test", &broadcast1.consume());
1137		consumer.assert_next("foobar/test", &broadcast2.consume());
1138		consumer.assert_next_wait();
1139	}
1140
1141	#[tokio::test]
1142	async fn test_select_narrowing_scope() {
1143		let origin = Origin::produce();
1144		let broadcast1 = Broadcast::produce();
1145		let broadcast2 = Broadcast::produce();
1146		let broadcast3 = Broadcast::produce();
1147
1148		// User with root "demo" allowed to subscribe to "worm-node" and "foobar"
1149		let demo_producer = origin.with_root("demo").expect("should create demo root");
1150		let limited_producer = demo_producer
1151			.publish_only(&["worm-node".into(), "foobar".into()])
1152			.expect("should create limited producer");
1153
1154		// Publish broadcasts at different levels
1155		assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
1156		assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
1157		assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
1158
1159		// Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY
1160		let mut worm_consumer = limited_producer
1161			.consume_only(&["worm-node".into()])
1162			.expect("should create worm-node consumer");
1163
1164		// Should see worm-node content with paths stripped to ""
1165		worm_consumer.assert_next("worm-node", &broadcast1.consume());
1166		worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1167		worm_consumer.assert_next_wait(); // Should NOT see foobar content
1168
1169		// Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo"
1170		let mut foo_consumer = limited_producer
1171			.consume_only(&["worm-node/foo".into()])
1172			.expect("should create worm-node/foo consumer");
1173
1174		foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
1175		foo_consumer.assert_next_wait(); // Should NOT see other content
1176	}
1177
1178	#[tokio::test]
1179	async fn test_select_multiple_roots_with_empty_prefix() {
1180		let origin = Origin::produce();
1181		let broadcast1 = Broadcast::produce();
1182		let broadcast2 = Broadcast::produce();
1183		let broadcast3 = Broadcast::produce();
1184
1185		// Producer with multiple allowed roots
1186		let limited_producer = origin
1187			.publish_only(&["app1".into(), "app2".into(), "shared".into()])
1188			.expect("should create limited producer");
1189
1190		// Publish to each root
1191		assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
1192		assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
1193		assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
1194
1195		// consume_only with empty prefix should maintain all roots
1196		let mut consumer = limited_producer
1197			.consume_only(&["".into()])
1198			.expect("should create consumer with empty prefix");
1199
1200		// Should see all broadcasts from all roots
1201		consumer.assert_next("app1/data", &broadcast1.consume());
1202		consumer.assert_next("app2/config", &broadcast2.consume());
1203		consumer.assert_next("shared/resource", &broadcast3.consume());
1204		consumer.assert_next_wait();
1205	}
1206
1207	#[tokio::test]
1208	async fn test_publish_only_with_empty_prefix() {
1209		let origin = Origin::produce();
1210		let broadcast = Broadcast::produce();
1211
1212		// Producer with specific allowed paths
1213		let limited_producer = origin
1214			.publish_only(&["services/api".into(), "services/web".into()])
1215			.expect("should create limited producer");
1216
1217		// publish_only with empty prefix should keep the same restrictions
1218		let same_producer = limited_producer
1219			.publish_only(&["".into()])
1220			.expect("should create producer with empty prefix");
1221
1222		// Should still have the same publishing restrictions
1223		assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
1224		assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
1225		assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
1226		assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
1227	}
1228
1229	#[tokio::test]
1230	async fn test_select_narrowing_to_deeper_path() {
1231		let origin = Origin::produce();
1232		let broadcast1 = Broadcast::produce();
1233		let broadcast2 = Broadcast::produce();
1234		let broadcast3 = Broadcast::produce();
1235
1236		// Producer with broad permission
1237		let limited_producer = origin
1238			.publish_only(&["org".into()])
1239			.expect("should create limited producer");
1240
1241		// Publish at various depths
1242		assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
1243		assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
1244		assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
1245
1246		// Narrow down to team1 only
1247		let mut team1_consumer = limited_producer
1248			.consume_only(&["org/team2".into()])
1249			.expect("should create team1 consumer");
1250
1251		team1_consumer.assert_next("org/team2/project1", &broadcast3.consume());
1252		team1_consumer.assert_next_wait(); // Should NOT see team1 content
1253
1254		// Further narrow down to team1/project1
1255		let mut project1_consumer = limited_producer
1256			.consume_only(&["org/team1/project1".into()])
1257			.expect("should create project1 consumer");
1258
1259		// Should only see project1 content at root
1260		project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
1261		project1_consumer.assert_next_wait();
1262	}
1263
1264	#[tokio::test]
1265	async fn test_select_with_non_matching_prefix() {
1266		let origin = Origin::produce();
1267
1268		// Producer with specific allowed paths
1269		let limited_producer = origin
1270			.publish_only(&["allowed/path".into()])
1271			.expect("should create limited producer");
1272
1273		// Trying to consume_only with a completely different prefix should return None
1274		assert!(limited_producer.consume_only(&["different/path".into()]).is_none());
1275
1276		// Similarly for publish_only
1277		assert!(limited_producer.publish_only(&["other/path".into()]).is_none());
1278	}
1279
1280	#[tokio::test]
1281	async fn test_select_maintains_access_with_wider_prefix() {
1282		let origin = Origin::produce();
1283		let broadcast1 = Broadcast::produce();
1284		let broadcast2 = Broadcast::produce();
1285
1286		// Setup: user with root "demo" allowed to subscribe to specific paths
1287		let demo_producer = origin.with_root("demo").expect("should create demo root");
1288		let user_producer = demo_producer
1289			.publish_only(&["worm-node".into(), "foobar".into()])
1290			.expect("should create user producer");
1291
1292		// Publish some data
1293		assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
1294		assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
1295
1296		// Key test: consume_only with "" should maintain access to allowed roots
1297		let mut consumer = user_producer
1298			.consume_only(&["".into()])
1299			.expect("consume_only with empty prefix should not fail when user has specific permissions");
1300
1301		// Should still receive broadcasts from allowed paths
1302		consumer.assert_next("worm-node/data", &broadcast1.consume());
1303		consumer.assert_next("foobar", &broadcast2.consume());
1304		consumer.assert_next_wait();
1305
1306		// Also test that we can still narrow the scope
1307		let mut narrow_consumer = user_producer
1308			.consume_only(&["worm-node".into()])
1309			.expect("should be able to narrow scope to worm-node");
1310
1311		narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
1312		narrow_consumer.assert_next_wait(); // Should not see foobar
1313	}
1314}