use std::{
collections::{HashMap, VecDeque},
fmt,
sync::atomic::{AtomicU64, Ordering},
};
use rand::Rng;
use tokio::sync::mpsc;
use web_async::Lock;
use super::BroadcastConsumer;
use crate::{
AsPath, Broadcast, BroadcastProducer, Path, PathOwned, PathPrefixes,
coding::{Decode, DecodeError, Encode, EncodeError},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Origin {
pub id: u64,
}
impl Origin {
pub(crate) const UNKNOWN: Self = Self { id: 0 };
pub fn random() -> Self {
let mut rng = rand::rng();
let id = rng.random_range(1..(1u64 << 62));
Self { id }
}
pub fn produce(self) -> OriginProducer {
OriginProducer::new(self)
}
}
impl From<u64> for Origin {
fn from(id: u64) -> Self {
Self { id }
}
}
impl fmt::Display for Origin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.id.fmt(f)
}
}
impl<V: Copy> Encode<V> for Origin
where
u64: Encode<V>,
{
fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
self.id.encode(w, version)
}
}
impl<V: Copy> Decode<V> for Origin
where
u64: Decode<V>,
{
fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
let id = u64::decode(r, version)?;
if id >= 1u64 << 62 {
return Err(DecodeError::InvalidValue);
}
Ok(Self { id })
}
}
pub(crate) const MAX_HOPS: usize = 32;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct OriginList(Vec<Origin>);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub struct TooManyOrigins;
impl fmt::Display for TooManyOrigins {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "too many origins (max {MAX_HOPS})")
}
}
impl std::error::Error for TooManyOrigins {}
impl From<TooManyOrigins> for DecodeError {
fn from(_: TooManyOrigins) -> Self {
DecodeError::BoundsExceeded
}
}
impl OriginList {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn push(&mut self, origin: Origin) -> Result<(), TooManyOrigins> {
if self.0.len() >= MAX_HOPS {
return Err(TooManyOrigins);
}
self.0.push(origin);
Ok(())
}
pub fn contains(&self, origin: &Origin) -> bool {
self.0.contains(origin)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn iter(&self) -> std::slice::Iter<'_, Origin> {
self.0.iter()
}
pub fn as_slice(&self) -> &[Origin] {
&self.0
}
}
impl TryFrom<Vec<Origin>> for OriginList {
type Error = TooManyOrigins;
fn try_from(v: Vec<Origin>) -> Result<Self, Self::Error> {
if v.len() > MAX_HOPS {
return Err(TooManyOrigins);
}
Ok(Self(v))
}
}
impl<'a> IntoIterator for &'a OriginList {
type Item = &'a Origin;
type IntoIter = std::slice::Iter<'a, Origin>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<V: Copy> Encode<V> for OriginList
where
u64: Encode<V>,
Origin: Encode<V>,
{
fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) -> Result<(), EncodeError> {
(self.0.len() as u64).encode(w, version)?;
for origin in &self.0 {
origin.encode(w, version)?;
}
Ok(())
}
}
impl<V: Copy> Decode<V> for OriginList
where
u64: Decode<V>,
Origin: Decode<V>,
{
fn decode<R: bytes::Buf>(r: &mut R, version: V) -> Result<Self, DecodeError> {
let count = u64::decode(r, version)? as usize;
if count > MAX_HOPS {
return Err(DecodeError::BoundsExceeded);
}
let mut list = Vec::with_capacity(count);
for _ in 0..count {
list.push(Origin::decode(r, version)?);
}
Ok(Self(list))
}
}
static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ConsumerId(u64);
impl ConsumerId {
fn new() -> Self {
Self(NEXT_CONSUMER_ID.fetch_add(1, Ordering::Relaxed))
}
}
struct OriginBroadcast {
path: PathOwned,
active: BroadcastConsumer,
backup: VecDeque<BroadcastConsumer>,
}
#[derive(Clone)]
struct OriginConsumerNotify {
root: PathOwned,
tx: mpsc::UnboundedSender<OriginAnnounce>,
}
impl OriginConsumerNotify {
fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
self.tx.send((path, Some(broadcast))).expect("consumer closed");
}
fn reannounce(&self, path: impl AsPath, broadcast: BroadcastConsumer) {
let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
self.tx.send((path.clone(), None)).expect("consumer closed");
self.tx.send((path, Some(broadcast))).expect("consumer closed");
}
fn unannounce(&self, path: impl AsPath) {
let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned();
self.tx.send((path, None)).expect("consumer closed");
}
}
struct NotifyNode {
parent: Option<Lock<NotifyNode>>,
consumers: HashMap<ConsumerId, OriginConsumerNotify>,
}
impl NotifyNode {
fn new(parent: Option<Lock<NotifyNode>>) -> Self {
Self {
parent,
consumers: HashMap::new(),
}
}
fn announce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
for consumer in self.consumers.values() {
consumer.announce(path.as_path(), broadcast.clone());
}
if let Some(parent) = &self.parent {
parent.lock().announce(path, broadcast);
}
}
fn reannounce(&mut self, path: impl AsPath, broadcast: &BroadcastConsumer) {
for consumer in self.consumers.values() {
consumer.reannounce(path.as_path(), broadcast.clone());
}
if let Some(parent) = &self.parent {
parent.lock().reannounce(path, broadcast);
}
}
fn unannounce(&mut self, path: impl AsPath) {
for consumer in self.consumers.values() {
consumer.unannounce(path.as_path());
}
if let Some(parent) = &self.parent {
parent.lock().unannounce(path);
}
}
}
struct OriginNode {
broadcast: Option<OriginBroadcast>,
nested: HashMap<String, Lock<OriginNode>>,
notify: Lock<NotifyNode>,
}
impl OriginNode {
fn new(parent: Option<Lock<NotifyNode>>) -> Self {
Self {
broadcast: None,
nested: HashMap::new(),
notify: Lock::new(NotifyNode::new(parent)),
}
}
fn leaf(&mut self, path: &Path) -> Lock<OriginNode> {
let (dir, rest) = path.next_part().expect("leaf called with empty path");
let next = self.entry(dir);
if rest.is_empty() { next } else { next.lock().leaf(&rest) }
}
fn entry(&mut self, dir: &str) -> Lock<OriginNode> {
match self.nested.get(dir) {
Some(next) => next.clone(),
None => {
let next = Lock::new(OriginNode::new(Some(self.notify.clone())));
self.nested.insert(dir.to_string(), next.clone());
next
}
}
}
fn publish(&mut self, full: impl AsPath, broadcast: &BroadcastConsumer, relative: impl AsPath) {
let full = full.as_path();
let rest = relative.as_path();
if let Some((dir, relative)) = rest.next_part() {
self.entry(dir).lock().publish(&full, broadcast, &relative);
} else if let Some(existing) = &mut self.broadcast {
if existing.active.is_clone(broadcast) || existing.backup.iter().any(|b| b.is_clone(broadcast)) {
return;
}
if broadcast.hops.len() <= existing.active.hops.len() {
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push_back(old);
self.notify.lock().reannounce(full, broadcast);
} else {
existing.backup.push_back(broadcast.clone());
}
} else {
self.broadcast = Some(OriginBroadcast {
path: full.to_owned(),
active: broadcast.clone(),
backup: VecDeque::new(),
});
self.notify.lock().announce(full, broadcast);
}
}
fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) {
self.consume_initial(&mut notify);
self.notify.lock().consumers.insert(id, notify);
}
fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) {
if let Some(broadcast) = &self.broadcast {
notify.announce(&broadcast.path, broadcast.active.clone());
}
for nested in self.nested.values() {
nested.lock().consume_initial(notify);
}
}
fn consume_broadcast(&self, rest: impl AsPath) -> Option<BroadcastConsumer> {
let rest = rest.as_path();
if let Some((dir, rest)) = rest.next_part() {
let node = self.nested.get(dir)?.lock();
node.consume_broadcast(&rest)
} else {
self.broadcast.as_ref().map(|b| b.active.clone())
}
}
fn unconsume(&mut self, id: ConsumerId) {
self.notify.lock().consumers.remove(&id).expect("consumer not found");
if self.is_empty() {
}
}
fn remove(&mut self, full: impl AsPath, broadcast: BroadcastConsumer, relative: impl AsPath) {
let full = full.as_path();
let relative = relative.as_path();
if let Some((dir, relative)) = relative.next_part() {
let nested = self.entry(dir);
let mut locked = nested.lock();
locked.remove(&full, broadcast, &relative);
if locked.is_empty() {
drop(locked);
self.nested.remove(dir);
}
} else {
let entry = match &mut self.broadcast {
Some(existing) => existing,
None => return,
};
let pos = entry.backup.iter().position(|b| b.is_clone(&broadcast));
if let Some(pos) = pos {
entry.backup.remove(pos);
return;
}
assert!(entry.active.is_clone(&broadcast));
let best = entry
.backup
.iter()
.enumerate()
.min_by_key(|(_, b)| b.hops.len())
.map(|(i, _)| i);
if let Some(idx) = best {
let active = entry.backup.remove(idx).expect("index in range");
entry.active = active;
self.notify.lock().reannounce(full, &entry.active);
} else {
self.broadcast = None;
self.notify.lock().unannounce(full);
}
}
}
fn is_empty(&self) -> bool {
self.broadcast.is_none() && self.nested.is_empty() && self.notify.lock().consumers.is_empty()
}
}
#[derive(Clone)]
struct OriginNodes {
nodes: Vec<(PathOwned, Lock<OriginNode>)>,
}
impl OriginNodes {
pub fn select(&self, prefixes: &PathPrefixes) -> Option<Self> {
let mut roots = Vec::new();
for (root, state) in &self.nodes {
for prefix in prefixes {
if root.has_prefix(prefix) {
roots.push((root.to_owned(), state.clone()));
continue;
}
if let Some(suffix) = prefix.strip_prefix(root) {
let nested = state.lock().leaf(&suffix);
roots.push((prefix.to_owned(), nested));
}
}
}
if roots.is_empty() {
None
} else {
Some(Self { nodes: roots })
}
}
pub fn root(&self, new_root: impl AsPath) -> Option<Self> {
let new_root = new_root.as_path();
let mut roots = Vec::new();
if new_root.is_empty() {
return Some(self.clone());
}
for (root, state) in &self.nodes {
if let Some(suffix) = root.strip_prefix(&new_root) {
roots.push((suffix.to_owned(), state.clone()));
} else if let Some(suffix) = new_root.strip_prefix(root) {
let nested = state.lock().leaf(&suffix);
roots.push(("".into(), nested));
}
}
if roots.is_empty() {
None
} else {
Some(Self { nodes: roots })
}
}
pub fn get(&self, path: impl AsPath) -> Option<(Lock<OriginNode>, PathOwned)> {
let path = path.as_path();
for (root, state) in &self.nodes {
if let Some(suffix) = path.strip_prefix(root) {
return Some((state.clone(), suffix.to_owned()));
}
}
None
}
}
impl Default for OriginNodes {
fn default() -> Self {
Self {
nodes: vec![("".into(), Lock::new(OriginNode::new(None)))],
}
}
}
pub type OriginAnnounce = (PathOwned, Option<BroadcastConsumer>);
#[derive(Clone)]
pub struct OriginProducer {
info: Origin,
nodes: OriginNodes,
root: PathOwned,
}
impl std::ops::Deref for OriginProducer {
type Target = Origin;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl OriginProducer {
pub fn new(info: Origin) -> Self {
Self {
info,
nodes: OriginNodes::default(),
root: PathOwned::default(),
}
}
pub fn create_broadcast(&self, path: impl AsPath) -> Option<BroadcastProducer> {
let broadcast = Broadcast::new().produce();
self.publish_broadcast(path, broadcast.consume()).then_some(broadcast)
}
pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
let path = path.as_path();
if broadcast.hops.contains(&self.info) {
return false;
}
let (root, rest) = match self.nodes.get(&path) {
Some(root) => root,
None => return false,
};
let full = self.root.join(&path);
root.lock().publish(&full, &broadcast, &rest);
let root = root.clone();
web_async::spawn(async move {
broadcast.closed().await;
root.lock().remove(&full, broadcast, &rest);
});
true
}
pub fn scope(&self, prefixes: &[Path]) -> Option<OriginProducer> {
let prefixes = PathPrefixes::new(prefixes);
Some(OriginProducer {
info: self.info,
nodes: self.nodes.select(&prefixes)?,
root: self.root.clone(),
})
}
pub fn consume(&self) -> OriginConsumer {
OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
}
#[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")]
pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
let path = path.as_path();
let (root, rest) = self.nodes.get(&path)?;
let state = root.lock();
state.consume_broadcast(&rest)
}
pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
let prefix = prefix.as_path();
Some(Self {
info: self.info,
root: self.root.join(&prefix).to_owned(),
nodes: self.nodes.root(&prefix)?,
})
}
pub fn root(&self) -> &Path<'_> {
&self.root
}
pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
self.nodes.nodes.iter().map(|(root, _)| root)
}
pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
self.root.join(path)
}
}
pub struct OriginConsumer {
id: ConsumerId,
info: Origin,
nodes: OriginNodes,
updates: mpsc::UnboundedReceiver<OriginAnnounce>,
root: PathOwned,
}
impl std::ops::Deref for OriginConsumer {
type Target = Origin;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl OriginConsumer {
fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let id = ConsumerId::new();
for (_, state) in &nodes.nodes {
let notify = OriginConsumerNotify {
root: root.clone(),
tx: tx.clone(),
};
state.lock().consume(id, notify);
}
Self {
id,
info,
nodes,
updates: rx,
root,
}
}
pub async fn announced(&mut self) -> Option<OriginAnnounce> {
self.updates.recv().await
}
pub fn try_announced(&mut self) -> Option<OriginAnnounce> {
self.updates.try_recv().ok()
}
pub fn consume(&self) -> Self {
self.clone()
}
pub fn get_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
let path = path.as_path();
let (root, rest) = self.nodes.get(&path)?;
let state = root.lock();
state.consume_broadcast(&rest)
}
pub async fn announced_broadcast(&self, path: impl AsPath) -> Option<BroadcastConsumer> {
let path = path.as_path();
let mut consumer = self.scope(std::slice::from_ref(&path))?;
if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) {
return None;
}
loop {
let (announced, broadcast) = consumer.announced().await?;
if announced.as_path() == path {
if let Some(broadcast) = broadcast {
return Some(broadcast);
}
}
}
}
pub fn scope(&self, prefixes: &[Path]) -> Option<OriginConsumer> {
let prefixes = PathPrefixes::new(prefixes);
Some(OriginConsumer::new(
self.info,
self.root.clone(),
self.nodes.select(&prefixes)?,
))
}
pub fn with_root(&self, prefix: impl AsPath) -> Option<Self> {
let prefix = prefix.as_path();
Some(Self::new(
self.info,
self.root.join(&prefix).to_owned(),
self.nodes.root(&prefix)?,
))
}
pub fn root(&self) -> &Path<'_> {
&self.root
}
pub fn allowed(&self) -> impl Iterator<Item = &Path<'_>> {
self.nodes.nodes.iter().map(|(root, _)| root)
}
pub fn absolute(&self, path: impl AsPath) -> Path<'_> {
self.root.join(path)
}
}
impl Drop for OriginConsumer {
fn drop(&mut self) {
for (_, root) in &self.nodes.nodes {
root.lock().unconsume(self.id);
}
}
}
impl Clone for OriginConsumer {
fn clone(&self) -> Self {
OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone())
}
}
#[cfg(test)]
use futures::FutureExt;
#[cfg(test)]
impl OriginConsumer {
pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
let expected = expected.as_path();
let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
assert_eq!(path, expected, "wrong path");
assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
}
pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) {
let expected = expected.as_path();
let (path, active) = self.try_announced().expect("no next");
assert_eq!(path, expected, "wrong path");
assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast");
}
pub fn assert_next_none(&mut self, expected: impl AsPath) {
let expected = expected.as_path();
let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next");
assert_eq!(path, expected, "wrong path");
assert!(active.is_none(), "should be unannounced");
}
pub fn assert_next_wait(&mut self) {
if let Some(res) = self.announced().now_or_never() {
panic!("next should block: got {:?}", res.map(|(path, _)| path));
}
}
}
#[cfg(test)]
mod tests {
use crate::Broadcast;
use super::*;
#[test]
fn origin_list_push_fails_at_limit() {
let mut list = OriginList::new();
for _ in 0..MAX_HOPS {
list.push(Origin::random()).unwrap();
}
assert_eq!(list.len(), MAX_HOPS);
assert_eq!(list.push(Origin::random()), Err(TooManyOrigins));
}
#[test]
fn origin_list_try_from_vec_enforces_limit() {
let under: Vec<Origin> = (0..MAX_HOPS).map(|_| Origin::random()).collect();
assert!(OriginList::try_from(under).is_ok());
let over: Vec<Origin> = (0..MAX_HOPS + 1).map(|_| Origin::random()).collect();
assert_eq!(OriginList::try_from(over), Err(TooManyOrigins));
}
#[tokio::test]
async fn test_announce() {
tokio::time::pause();
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let mut consumer1 = origin.consume();
consumer1.assert_next_wait();
origin.publish_broadcast("test1", broadcast1.consume());
consumer1.assert_next("test1", &broadcast1.consume());
consumer1.assert_next_wait();
let mut consumer2 = origin.consume();
origin.publish_broadcast("test2", broadcast2.consume());
consumer1.assert_next("test2", &broadcast2.consume());
consumer1.assert_next_wait();
consumer2.assert_next("test1", &broadcast1.consume());
consumer2.assert_next("test2", &broadcast2.consume());
consumer2.assert_next_wait();
drop(broadcast1);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
consumer1.assert_next_none("test1");
consumer2.assert_next_none("test1");
consumer1.assert_next_wait();
consumer2.assert_next_wait();
let mut consumer3 = origin.consume();
consumer3.assert_next("test2", &broadcast2.consume());
consumer3.assert_next_wait();
drop(broadcast2);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
consumer1.assert_next_none("test2");
consumer2.assert_next_none("test2");
consumer3.assert_next_none("test2");
}
#[tokio::test]
async fn test_duplicate() {
tokio::time::pause();
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
let consumer1 = broadcast1.consume();
let consumer2 = broadcast2.consume();
let consumer3 = broadcast3.consume();
let mut consumer = origin.consume();
origin.publish_broadcast("test", consumer1.clone());
origin.publish_broadcast("test", consumer2.clone());
origin.publish_broadcast("test", consumer3.clone());
assert!(consumer.get_broadcast("test").is_some());
consumer.assert_next("test", &consumer1);
consumer.assert_next_none("test");
consumer.assert_next("test", &consumer2);
consumer.assert_next_none("test");
consumer.assert_next("test", &consumer3);
drop(broadcast2);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(consumer.get_broadcast("test").is_some());
consumer.assert_next_wait();
drop(broadcast3);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(consumer.get_broadcast("test").is_some());
consumer.assert_next_none("test");
consumer.assert_next("test", &consumer1);
drop(broadcast1);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(consumer.get_broadcast("test").is_none());
consumer.assert_next_none("test");
consumer.assert_next_wait();
}
#[tokio::test]
async fn test_duplicate_reverse() {
tokio::time::pause();
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
origin.publish_broadcast("test", broadcast1.consume());
origin.publish_broadcast("test", broadcast2.consume());
assert!(origin.consume().get_broadcast("test").is_some());
drop(broadcast2);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(origin.consume().get_broadcast("test").is_some());
drop(broadcast1);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(origin.consume().get_broadcast("test").is_none());
}
#[tokio::test]
async fn test_double_publish() {
tokio::time::pause();
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
origin.publish_broadcast("test", broadcast.consume());
origin.publish_broadcast("test", broadcast.consume());
assert!(origin.consume().get_broadcast("test").is_some());
drop(broadcast);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
assert!(origin.consume().get_broadcast("test").is_none());
}
#[tokio::test]
#[should_panic]
async fn test_128() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let mut consumer = origin.consume();
for i in 0..256 {
origin.publish_broadcast(format!("test{i}"), broadcast.consume());
}
for i in 0..256 {
consumer.assert_next(format!("test{i}"), &broadcast.consume());
}
}
#[tokio::test]
async fn test_128_fix() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let mut consumer = origin.consume();
for i in 0..256 {
origin.publish_broadcast(format!("test{i}"), broadcast.consume());
}
for i in 0..256 {
consumer.assert_try_next(format!("test{i}"), &broadcast.consume());
}
}
#[tokio::test]
async fn test_with_root_basic() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let foo_producer = origin.with_root("foo").expect("should create root");
assert_eq!(foo_producer.root().as_str(), "foo");
let mut consumer = origin.consume();
assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume()));
consumer.assert_next("foo/bar/baz", &broadcast.consume());
let mut foo_consumer = foo_producer.consume();
foo_consumer.assert_next("bar/baz", &broadcast.consume());
}
#[tokio::test]
async fn test_with_root_nested() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let foo_producer = origin.with_root("foo").expect("should create foo root");
let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root");
assert_eq!(foo_bar_producer.root().as_str(), "foo/bar");
let mut consumer = origin.consume();
assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume()));
consumer.assert_next("foo/bar/baz", &broadcast.consume());
let mut foo_bar_consumer = foo_bar_producer.consume();
foo_bar_consumer.assert_next("baz", &broadcast.consume());
}
#[tokio::test]
async fn test_publish_scope_allows() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let limited_producer = origin
.scope(&["allowed/path1".into(), "allowed/path2".into()])
.expect("should create limited producer");
assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume()));
assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume()));
assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume()));
}
#[tokio::test]
async fn test_publish_scope_empty() {
let origin = Origin::random().produce();
assert!(origin.scope(&[]).is_none());
}
#[tokio::test]
async fn test_consume_scope_filters() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
let mut consumer = origin.consume();
origin.publish_broadcast("allowed", broadcast1.consume());
origin.publish_broadcast("allowed/nested", broadcast2.consume());
origin.publish_broadcast("notallowed", broadcast3.consume());
let mut limited_consumer = origin
.consume()
.scope(&["allowed".into()])
.expect("should create limited consumer");
limited_consumer.assert_next("allowed", &broadcast1.consume());
limited_consumer.assert_next("allowed/nested", &broadcast2.consume());
limited_consumer.assert_next_wait();
consumer.assert_next("allowed", &broadcast1.consume());
consumer.assert_next("allowed/nested", &broadcast2.consume());
consumer.assert_next("notallowed", &broadcast3.consume());
}
#[tokio::test]
async fn test_consume_scope_multiple_prefixes() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
origin.publish_broadcast("foo/test", broadcast1.consume());
origin.publish_broadcast("bar/test", broadcast2.consume());
origin.publish_broadcast("baz/test", broadcast3.consume());
let mut limited_consumer = origin
.consume()
.scope(&["foo".into(), "bar".into()])
.expect("should create limited consumer");
limited_consumer.assert_next("bar/test", &broadcast2.consume());
limited_consumer.assert_next("foo/test", &broadcast1.consume());
limited_consumer.assert_next_wait(); }
#[tokio::test]
async fn test_with_root_and_publish_scope() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let foo_producer = origin.with_root("foo").expect("should create foo root");
let limited_producer = foo_producer
.scope(&["bar".into(), "goop/pee".into()])
.expect("should create limited producer");
let mut consumer = origin.consume();
assert!(limited_producer.publish_broadcast("bar", broadcast.consume()));
assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume()));
assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume()));
assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("baz", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume()));
consumer.assert_next("foo/bar", &broadcast.consume());
consumer.assert_next("foo/bar/nested", &broadcast.consume());
consumer.assert_next("foo/goop/pee", &broadcast.consume());
consumer.assert_next("foo/goop/pee/nested", &broadcast.consume());
}
#[tokio::test]
async fn test_with_root_and_consume_scope() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
origin.publish_broadcast("foo/bar/test", broadcast1.consume());
origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume());
origin.publish_broadcast("foo/other/test", broadcast3.consume());
let foo_producer = origin.with_root("foo").expect("should create foo root");
let mut limited_consumer = foo_producer
.consume()
.scope(&["bar".into(), "goop/pee".into()])
.expect("should create limited consumer");
limited_consumer.assert_next("bar/test", &broadcast1.consume());
limited_consumer.assert_next("goop/pee/test", &broadcast2.consume());
limited_consumer.assert_next_wait(); }
#[tokio::test]
async fn test_with_root_unauthorized() {
let origin = Origin::random().produce();
let limited_producer = origin
.scope(&["allowed".into()])
.expect("should create limited producer");
assert!(limited_producer.with_root("notallowed").is_none());
let allowed_root = limited_producer
.with_root("allowed")
.expect("should create allowed root");
assert_eq!(allowed_root.root().as_str(), "allowed");
}
#[tokio::test]
async fn test_wildcard_permission() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let root_producer = origin.clone();
assert!(root_producer.publish_broadcast("any/path", broadcast.consume()));
assert!(root_producer.publish_broadcast("other/path", broadcast.consume()));
let foo_producer = root_producer.with_root("foo").expect("should create any root");
assert_eq!(foo_producer.root().as_str(), "foo");
}
#[tokio::test]
async fn test_consume_broadcast_with_permissions() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
origin.publish_broadcast("allowed/test", broadcast1.consume());
origin.publish_broadcast("notallowed/test", broadcast2.consume());
let limited_consumer = origin
.consume()
.scope(&["allowed".into()])
.expect("should create limited consumer");
let result = limited_consumer.get_broadcast("allowed/test");
assert!(result.is_some());
assert!(result.unwrap().is_clone(&broadcast1.consume()));
assert!(limited_consumer.get_broadcast("notallowed/test").is_none());
let consumer = origin.consume();
assert!(consumer.get_broadcast("allowed/test").is_some());
assert!(consumer.get_broadcast("notallowed/test").is_some());
}
#[tokio::test]
async fn test_nested_paths_with_permissions() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let limited_producer = origin.scope(&["a/b/c".into()]).expect("should create limited producer");
assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume()));
assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume()));
assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("a", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume()));
assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume()));
}
#[tokio::test]
async fn test_multiple_consumers_with_different_permissions() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
origin.publish_broadcast("foo/test", broadcast1.consume());
origin.publish_broadcast("bar/test", broadcast2.consume());
origin.publish_broadcast("baz/test", broadcast3.consume());
let mut foo_consumer = origin
.consume()
.scope(&["foo".into()])
.expect("should create foo consumer");
let mut bar_consumer = origin
.consume()
.scope(&["bar".into()])
.expect("should create bar consumer");
let mut foobar_consumer = origin
.consume()
.scope(&["foo".into(), "bar".into()])
.expect("should create foobar consumer");
foo_consumer.assert_next("foo/test", &broadcast1.consume());
foo_consumer.assert_next_wait();
bar_consumer.assert_next("bar/test", &broadcast2.consume());
bar_consumer.assert_next_wait();
foobar_consumer.assert_next("bar/test", &broadcast2.consume());
foobar_consumer.assert_next("foo/test", &broadcast1.consume());
foobar_consumer.assert_next_wait();
}
#[tokio::test]
async fn test_select_with_empty_prefix() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let demo_producer = origin.with_root("demo").expect("should create demo root");
let limited_producer = demo_producer
.scope(&["worm-node".into(), "foobar".into()])
.expect("should create limited producer");
assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume()));
assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume()));
let mut consumer = limited_producer
.consume()
.scope(&["".into()])
.expect("should create consumer with empty prefix");
let a1 = consumer.try_announced().expect("expected first announcement");
let a2 = consumer.try_announced().expect("expected second announcement");
consumer.assert_next_wait();
let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
paths.sort();
assert_eq!(paths, ["foobar/test", "worm-node/test"]);
}
#[tokio::test]
async fn test_select_narrowing_scope() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
let demo_producer = origin.with_root("demo").expect("should create demo root");
let limited_producer = demo_producer
.scope(&["worm-node".into(), "foobar".into()])
.expect("should create limited producer");
assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume()));
assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume()));
assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume()));
let mut worm_consumer = limited_producer
.consume()
.scope(&["worm-node".into()])
.expect("should create worm-node consumer");
worm_consumer.assert_next("worm-node", &broadcast1.consume());
worm_consumer.assert_next("worm-node/foo", &broadcast2.consume());
worm_consumer.assert_next_wait();
let mut foo_consumer = limited_producer
.consume()
.scope(&["worm-node/foo".into()])
.expect("should create worm-node/foo consumer");
foo_consumer.assert_next("worm-node/foo", &broadcast2.consume());
foo_consumer.assert_next_wait(); }
#[tokio::test]
async fn test_select_multiple_roots_with_empty_prefix() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
let limited_producer = origin
.scope(&["app1".into(), "app2".into(), "shared".into()])
.expect("should create limited producer");
assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume()));
assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume()));
assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume()));
let mut consumer = limited_producer
.consume()
.scope(&["".into()])
.expect("should create consumer with empty prefix");
consumer.assert_next("app1/data", &broadcast1.consume());
consumer.assert_next("app2/config", &broadcast2.consume());
consumer.assert_next("shared/resource", &broadcast3.consume());
consumer.assert_next_wait();
}
#[tokio::test]
async fn test_publish_scope_with_empty_prefix() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let limited_producer = origin
.scope(&["services/api".into(), "services/web".into()])
.expect("should create limited producer");
let same_producer = limited_producer
.scope(&["".into()])
.expect("should create producer with empty prefix");
assert!(same_producer.publish_broadcast("services/api", broadcast.consume()));
assert!(same_producer.publish_broadcast("services/web", broadcast.consume()));
assert!(!same_producer.publish_broadcast("services/db", broadcast.consume()));
assert!(!same_producer.publish_broadcast("other", broadcast.consume()));
}
#[tokio::test]
async fn test_select_narrowing_to_deeper_path() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let broadcast3 = Broadcast::new().produce();
let limited_producer = origin.scope(&["org".into()]).expect("should create limited producer");
assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume()));
assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume()));
assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume()));
let mut team2_consumer = limited_producer
.consume()
.scope(&["org/team2".into()])
.expect("should create team2 consumer");
team2_consumer.assert_next("org/team2/project1", &broadcast3.consume());
team2_consumer.assert_next_wait();
let mut project1_consumer = limited_producer
.consume()
.scope(&["org/team1/project1".into()])
.expect("should create project1 consumer");
project1_consumer.assert_next("org/team1/project1", &broadcast1.consume());
project1_consumer.assert_next_wait();
}
#[tokio::test]
async fn test_select_with_non_matching_prefix() {
let origin = Origin::random().produce();
let limited_producer = origin
.scope(&["allowed/path".into()])
.expect("should create limited producer");
assert!(limited_producer.consume().scope(&["different/path".into()]).is_none());
assert!(limited_producer.scope(&["other/path".into()]).is_none());
}
#[tokio::test]
async fn test_with_root_trailing_slash_consumer() {
let origin = Origin::random().produce();
let prefix = "some_prefix/".to_string();
let mut consumer = origin.consume().with_root(prefix).unwrap();
let b = origin.create_broadcast("some_prefix/test").unwrap();
consumer.assert_next("test", &b.consume());
}
#[tokio::test]
async fn test_with_root_trailing_slash_producer() {
let origin = Origin::random().produce();
let prefix = "some_prefix/".to_string();
let rooted = origin.with_root(prefix).unwrap();
let b = rooted.create_broadcast("test").unwrap();
let mut consumer = rooted.consume();
consumer.assert_next("test", &b.consume());
}
#[tokio::test]
async fn test_with_root_trailing_slash_unannounce() {
tokio::time::pause();
let origin = Origin::random().produce();
let prefix = "some_prefix/".to_string();
let mut consumer = origin.consume().with_root(prefix).unwrap();
let b = origin.create_broadcast("some_prefix/test").unwrap();
consumer.assert_next("test", &b.consume());
drop(b);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
consumer.assert_next_none("test");
}
#[tokio::test]
async fn test_select_maintains_access_with_wider_prefix() {
let origin = Origin::random().produce();
let broadcast1 = Broadcast::new().produce();
let broadcast2 = Broadcast::new().produce();
let demo_producer = origin.with_root("demo").expect("should create demo root");
let user_producer = demo_producer
.scope(&["worm-node".into(), "foobar".into()])
.expect("should create user producer");
assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume()));
assert!(user_producer.publish_broadcast("foobar", broadcast2.consume()));
let mut consumer = user_producer
.consume()
.scope(&["".into()])
.expect("scope with empty prefix should not fail when user has specific permissions");
let a1 = consumer.try_announced().expect("expected first announcement");
let a2 = consumer.try_announced().expect("expected second announcement");
consumer.assert_next_wait();
let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect();
paths.sort();
assert_eq!(paths, ["foobar", "worm-node/data"]);
let mut narrow_consumer = user_producer
.consume()
.scope(&["worm-node".into()])
.expect("should be able to narrow scope to worm-node");
narrow_consumer.assert_next("worm-node/data", &broadcast1.consume());
narrow_consumer.assert_next_wait(); }
#[tokio::test]
async fn test_duplicate_prefixes_deduped() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let producer = origin
.scope(&["demo".into(), "demo".into()])
.expect("should create producer");
assert!(producer.publish_broadcast("demo/stream", broadcast.consume()));
let mut consumer = producer.consume();
consumer.assert_next("demo/stream", &broadcast.consume());
consumer.assert_next_wait();
}
#[tokio::test]
async fn test_overlapping_prefixes_deduped() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let producer = origin
.scope(&["demo".into(), "demo/foo".into()])
.expect("should create producer");
assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume()));
let mut consumer = producer.consume();
consumer.assert_next("demo/bar/stream", &broadcast.consume());
consumer.assert_next_wait();
}
#[tokio::test]
async fn test_overlapping_prefixes_no_duplicate_announcements() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let producer = origin
.scope(&["demo".into(), "demo/foo".into()])
.expect("should create producer");
assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume()));
let mut consumer = producer.consume();
consumer.assert_next("demo/foo/stream", &broadcast.consume());
consumer.assert_next_wait();
}
#[tokio::test]
async fn test_allowed_returns_deduped_prefixes() {
let origin = Origin::random().produce();
let producer = origin
.scope(&["demo".into(), "demo/foo".into(), "anon".into()])
.expect("should create producer");
let allowed: Vec<_> = producer.allowed().collect();
assert_eq!(allowed.len(), 2, "demo/foo should be subsumed by demo");
}
#[tokio::test]
async fn test_announced_broadcast_already_announced() {
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
origin.publish_broadcast("test", broadcast.consume());
let consumer = origin.consume();
let result = consumer.announced_broadcast("test").await.expect("should find it");
assert!(result.is_clone(&broadcast.consume()));
}
#[tokio::test]
async fn test_announced_broadcast_delayed() {
tokio::time::pause();
let origin = Origin::random().produce();
let broadcast = Broadcast::new().produce();
let consumer = origin.consume();
let wait = tokio::spawn({
let consumer = consumer.clone();
async move { consumer.announced_broadcast("test").await }
});
tokio::task::yield_now().await;
origin.publish_broadcast("test", broadcast.consume());
let result = wait.await.unwrap().expect("should find it");
assert!(result.is_clone(&broadcast.consume()));
}
#[tokio::test]
async fn test_announced_broadcast_ignores_unrelated_paths() {
tokio::time::pause();
let origin = Origin::random().produce();
let other = Broadcast::new().produce();
let target = Broadcast::new().produce();
let consumer = origin.consume();
let wait = tokio::spawn({
let consumer = consumer.clone();
async move { consumer.announced_broadcast("target").await }
});
tokio::task::yield_now().await;
origin.publish_broadcast("other", other.consume());
tokio::task::yield_now().await;
assert!(!wait.is_finished(), "must not resolve on unrelated path");
origin.publish_broadcast("target", target.consume());
let result = wait.await.unwrap().expect("should find target");
assert!(result.is_clone(&target.consume()));
}
#[tokio::test]
async fn test_announced_broadcast_skips_nested_paths() {
tokio::time::pause();
let origin = Origin::random().produce();
let nested = Broadcast::new().produce();
let exact = Broadcast::new().produce();
let consumer = origin.consume();
let wait = tokio::spawn({
let consumer = consumer.clone();
async move { consumer.announced_broadcast("foo").await }
});
tokio::task::yield_now().await;
origin.publish_broadcast("foo/bar", nested.consume());
tokio::task::yield_now().await;
assert!(!wait.is_finished(), "must not resolve on a nested path");
origin.publish_broadcast("foo", exact.consume());
let result = wait.await.unwrap().expect("should find foo exactly");
assert!(result.is_clone(&exact.consume()));
}
#[tokio::test]
async fn test_announced_broadcast_disallowed() {
let origin = Origin::random().produce();
let limited = origin
.consume()
.scope(&["allowed".into()])
.expect("should create limited");
assert!(limited.announced_broadcast("notallowed").await.is_none());
}
#[tokio::test]
async fn test_announced_broadcast_scope_too_narrow() {
let origin = Origin::random().produce();
let limited = origin
.consume()
.scope(&["foo/specific".into()])
.expect("should create limited");
let result = limited
.announced_broadcast("foo")
.now_or_never()
.expect("must not block");
assert!(result.is_none());
}
}