use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::broadcast;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum NotificationScope {
Tenant(String),
Global,
}
impl NotificationScope {
pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
match tenant {
Some(t) => NotificationScope::Tenant(t.to_string()),
None => NotificationScope::Global,
}
}
pub fn label(&self) -> String {
match self {
NotificationScope::Tenant(t) => format!("tenant:{t}"),
NotificationScope::Global => "global".to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NotificationEvent {
pub scope: NotificationScope,
pub channel: String,
pub payload: String,
pub published_at_ms: u128,
}
#[derive(Debug, PartialEq, Eq)]
pub enum NotificationError {
CrossTenantDenied {
principal_tenant: Option<String>,
target: NotificationScope,
channel: String,
},
}
impl std::fmt::Display for NotificationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NotificationError::CrossTenantDenied {
principal_tenant,
target,
channel,
} => {
let from = principal_tenant.as_deref().unwrap_or("<platform>");
write!(
f,
"notification: principal in tenant `{}` is not allowed to address `{}` channel `{}` without the `notify:cross-tenant` capability",
from,
target.label(),
channel
)
}
}
}
}
impl std::error::Error for NotificationError {}
const CHANNEL_CAPACITY: usize = 256;
#[derive(Default, Clone)]
pub struct NotificationRegistry {
inner: Arc<Mutex<HashMap<ChannelKey, broadcast::Sender<NotificationEvent>>>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ChannelKey {
scope: NotificationScope,
channel: String,
}
impl NotificationRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(
&self,
scope: NotificationScope,
channel: impl Into<String>,
) -> broadcast::Receiver<NotificationEvent> {
let key = ChannelKey {
scope,
channel: channel.into(),
};
let mut guard = self.inner.lock();
let sender = guard
.entry(key)
.or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
sender.subscribe()
}
pub fn publish(
&self,
scope: NotificationScope,
channel: impl Into<String>,
payload: impl Into<String>,
now_ms: u128,
) -> usize {
let channel = channel.into();
let key = ChannelKey {
scope: scope.clone(),
channel: channel.clone(),
};
let event = NotificationEvent {
scope,
channel,
payload: payload.into(),
published_at_ms: now_ms,
};
let sender = {
let guard = self.inner.lock();
guard.get(&key).cloned()
};
let Some(sender) = sender else {
return 0;
};
if sender.receiver_count() == 0 {
self.inner.lock().remove(&key);
return 0;
}
sender.send(event).unwrap_or(0)
}
pub fn publish_authorized(
&self,
principal_tenant: Option<&str>,
target: NotificationScope,
channel: impl Into<String>,
payload: impl Into<String>,
has_cross_tenant_cap: bool,
now_ms: u128,
) -> Result<usize, NotificationError> {
let channel = channel.into();
Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
Ok(self.publish(target, channel, payload, now_ms))
}
pub fn subscribe_authorized(
&self,
principal_tenant: Option<&str>,
target: NotificationScope,
channel: impl Into<String>,
has_cross_tenant_cap: bool,
) -> Result<broadcast::Receiver<NotificationEvent>, NotificationError> {
let channel = channel.into();
Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
Ok(self.subscribe(target, channel))
}
fn authorize(
principal_tenant: Option<&str>,
target: &NotificationScope,
channel: &str,
has_cross_tenant_cap: bool,
) -> Result<(), NotificationError> {
let same_scope = match (principal_tenant, target) {
(Some(pt), NotificationScope::Tenant(tt)) => pt == tt,
(None, NotificationScope::Global) => true,
_ => false,
};
if same_scope || has_cross_tenant_cap {
return Ok(());
}
Err(NotificationError::CrossTenantDenied {
principal_tenant: principal_tenant.map(str::to_string),
target: target.clone(),
channel: channel.to_string(),
})
}
pub fn channel_count(&self) -> usize {
self.inner.lock().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn now() -> u128 {
1
}
#[test]
fn same_tenant_publish_subscribe_round_trip() {
let reg = NotificationRegistry::new();
let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
let delivered = reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"v1.2.3",
now(),
);
assert_eq!(delivered, 1, "one connected listener should receive");
let event = rx.try_recv().expect("event delivered");
assert_eq!(event.channel, "deploys");
assert_eq!(event.payload, "v1.2.3");
assert_eq!(event.scope, NotificationScope::Tenant("acme".into()));
}
#[test]
fn channels_are_tenant_isolated() {
let reg = NotificationRegistry::new();
let mut rx_acme = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
let mut rx_globex = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"acme-only",
now(),
);
assert_eq!(rx_acme.try_recv().unwrap().payload, "acme-only");
assert!(
rx_globex.try_recv().is_err(),
"globex must not see acme's notification"
);
}
#[test]
fn channel_names_are_scoped_independently() {
let reg = NotificationRegistry::new();
let mut rx_a = reg.subscribe(NotificationScope::Tenant("acme".into()), "a");
let mut rx_b = reg.subscribe(NotificationScope::Tenant("acme".into()), "b");
reg.publish(NotificationScope::Tenant("acme".into()), "a", "to-a", now());
assert_eq!(rx_a.try_recv().unwrap().payload, "to-a");
assert!(rx_b.try_recv().is_err());
}
#[test]
fn offline_listeners_miss_notifications_no_replay() {
let reg = NotificationRegistry::new();
{
let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
}
let delivered = reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"v1.0.0",
now(),
);
assert_eq!(delivered, 0, "publish with no listeners delivers 0");
let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
assert!(
rx.try_recv().is_err(),
"reconnected listener must not receive pre-reconnect notifications",
);
reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"v2.0.0",
now(),
);
assert_eq!(rx.try_recv().unwrap().payload, "v2.0.0");
}
#[test]
fn fanout_to_all_connected_listeners() {
let reg = NotificationRegistry::new();
let mut rx1 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
let mut rx2 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
let mut rx3 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
let delivered = reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"fanout",
now(),
);
assert_eq!(delivered, 3);
for rx in [&mut rx1, &mut rx2, &mut rx3] {
assert_eq!(rx.try_recv().unwrap().payload, "fanout");
}
}
#[test]
fn same_tenant_publish_does_not_require_cross_tenant_cap() {
let reg = NotificationRegistry::new();
let mut rx = reg
.subscribe_authorized(
Some("acme"),
NotificationScope::Tenant("acme".into()),
"deploys",
false, )
.expect("same-tenant subscribe must succeed without cross-tenant cap");
let delivered = reg
.publish_authorized(
Some("acme"),
NotificationScope::Tenant("acme".into()),
"deploys",
"v1",
false,
now(),
)
.expect("same-tenant publish must succeed without cross-tenant cap");
assert_eq!(delivered, 1);
assert_eq!(rx.try_recv().unwrap().payload, "v1");
}
#[test]
fn cross_tenant_publish_denied_without_cap() {
let reg = NotificationRegistry::new();
let err = reg
.publish_authorized(
Some("acme"),
NotificationScope::Tenant("globex".into()),
"deploys",
"leak",
false,
now(),
)
.expect_err("cross-tenant publish must be denied without cap");
match err {
NotificationError::CrossTenantDenied {
principal_tenant,
target,
channel,
} => {
assert_eq!(principal_tenant.as_deref(), Some("acme"));
assert_eq!(target, NotificationScope::Tenant("globex".into()));
assert_eq!(channel, "deploys");
}
}
}
#[test]
fn cross_tenant_subscribe_denied_without_cap() {
let reg = NotificationRegistry::new();
let err = reg
.subscribe_authorized(
Some("acme"),
NotificationScope::Tenant("globex".into()),
"deploys",
false,
)
.expect_err("cross-tenant subscribe must be denied without cap");
assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
}
#[test]
fn cross_tenant_publish_allowed_with_cap() {
let reg = NotificationRegistry::new();
let mut rx = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
let delivered = reg
.publish_authorized(
Some("acme"),
NotificationScope::Tenant("globex".into()),
"deploys",
"allowed",
true,
now(),
)
.expect("publish with cross-tenant cap must succeed");
assert_eq!(delivered, 1);
assert_eq!(rx.try_recv().unwrap().payload, "allowed");
}
#[test]
fn global_scope_requires_cross_tenant_cap() {
let reg = NotificationRegistry::new();
let err = reg
.publish_authorized(
Some("acme"),
NotificationScope::Global,
"platform",
"leak",
false,
now(),
)
.expect_err("targeting Global from a tenant must require cap");
assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
let _ = reg
.publish_authorized(
None,
NotificationScope::Global,
"platform",
"ok",
false,
now(),
)
.expect("platform principal targeting global is same-scope");
}
#[test]
fn channel_is_reaped_when_last_receiver_drops() {
let reg = NotificationRegistry::new();
{
let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
assert_eq!(reg.channel_count(), 1);
}
reg.publish(
NotificationScope::Tenant("acme".into()),
"deploys",
"noop",
now(),
);
assert_eq!(reg.channel_count(), 0);
}
#[test]
fn from_principal_tenant_maps_correctly() {
assert_eq!(
NotificationScope::from_principal_tenant(Some("acme")),
NotificationScope::Tenant("acme".into())
);
assert_eq!(
NotificationScope::from_principal_tenant(None),
NotificationScope::Global
);
}
}