1use std::collections::HashMap;
45use std::sync::Arc;
46
47use parking_lot::Mutex;
48use tokio::sync::broadcast;
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub enum NotificationScope {
58 Tenant(String),
60 Global,
62}
63
64impl NotificationScope {
65 pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
71 match tenant {
72 Some(t) => NotificationScope::Tenant(t.to_string()),
73 None => NotificationScope::Global,
74 }
75 }
76
77 pub fn label(&self) -> String {
79 match self {
80 NotificationScope::Tenant(t) => format!("tenant:{t}"),
81 NotificationScope::Global => "global".to_string(),
82 }
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct NotificationEvent {
94 pub scope: NotificationScope,
95 pub channel: String,
96 pub payload: String,
97 pub published_at_ms: u128,
98}
99
100#[derive(Debug, PartialEq, Eq)]
102pub enum NotificationError {
103 CrossTenantDenied {
109 principal_tenant: Option<String>,
110 target: NotificationScope,
111 channel: String,
112 },
113}
114
115impl std::fmt::Display for NotificationError {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 NotificationError::CrossTenantDenied {
119 principal_tenant,
120 target,
121 channel,
122 } => {
123 let from = principal_tenant.as_deref().unwrap_or("<platform>");
124 write!(
125 f,
126 "notification: principal in tenant `{}` is not allowed to address `{}` channel `{}` without the `notify:cross-tenant` capability",
127 from,
128 target.label(),
129 channel
130 )
131 }
132 }
133 }
134}
135
136impl std::error::Error for NotificationError {}
137
138const CHANNEL_CAPACITY: usize = 256;
142
143#[derive(Default, Clone)]
151pub struct NotificationRegistry {
152 inner: Arc<Mutex<HashMap<ChannelKey, broadcast::Sender<NotificationEvent>>>>,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156struct ChannelKey {
157 scope: NotificationScope,
158 channel: String,
159}
160
161impl NotificationRegistry {
162 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn subscribe(
173 &self,
174 scope: NotificationScope,
175 channel: impl Into<String>,
176 ) -> broadcast::Receiver<NotificationEvent> {
177 let key = ChannelKey {
178 scope,
179 channel: channel.into(),
180 };
181 let mut guard = self.inner.lock();
182 let sender = guard
183 .entry(key)
184 .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
185 sender.subscribe()
186 }
187
188 pub fn publish(
193 &self,
194 scope: NotificationScope,
195 channel: impl Into<String>,
196 payload: impl Into<String>,
197 now_ms: u128,
198 ) -> usize {
199 let channel = channel.into();
200 let key = ChannelKey {
201 scope: scope.clone(),
202 channel: channel.clone(),
203 };
204 let event = NotificationEvent {
205 scope,
206 channel,
207 payload: payload.into(),
208 published_at_ms: now_ms,
209 };
210
211 let sender = {
212 let guard = self.inner.lock();
213 guard.get(&key).cloned()
214 };
215 let Some(sender) = sender else {
216 return 0;
217 };
218
219 if sender.receiver_count() == 0 {
223 self.inner.lock().remove(&key);
224 return 0;
225 }
226 sender.send(event).unwrap_or(0)
227 }
228
229 pub fn publish_authorized(
240 &self,
241 principal_tenant: Option<&str>,
242 target: NotificationScope,
243 channel: impl Into<String>,
244 payload: impl Into<String>,
245 has_cross_tenant_cap: bool,
246 now_ms: u128,
247 ) -> Result<usize, NotificationError> {
248 let channel = channel.into();
249 Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
250 Ok(self.publish(target, channel, payload, now_ms))
251 }
252
253 pub fn subscribe_authorized(
256 &self,
257 principal_tenant: Option<&str>,
258 target: NotificationScope,
259 channel: impl Into<String>,
260 has_cross_tenant_cap: bool,
261 ) -> Result<broadcast::Receiver<NotificationEvent>, NotificationError> {
262 let channel = channel.into();
263 Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
264 Ok(self.subscribe(target, channel))
265 }
266
267 fn authorize(
268 principal_tenant: Option<&str>,
269 target: &NotificationScope,
270 channel: &str,
271 has_cross_tenant_cap: bool,
272 ) -> Result<(), NotificationError> {
273 let same_scope = match (principal_tenant, target) {
274 (Some(pt), NotificationScope::Tenant(tt)) => pt == tt,
275 (None, NotificationScope::Global) => true,
279 _ => false,
280 };
281 if same_scope || has_cross_tenant_cap {
282 return Ok(());
283 }
284 Err(NotificationError::CrossTenantDenied {
285 principal_tenant: principal_tenant.map(str::to_string),
286 target: target.clone(),
287 channel: channel.to_string(),
288 })
289 }
290
291 pub fn channel_count(&self) -> usize {
294 self.inner.lock().len()
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 fn now() -> u128 {
303 1
307 }
308
309 #[test]
310 fn same_tenant_publish_subscribe_round_trip() {
311 let reg = NotificationRegistry::new();
312 let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
313 let delivered = reg.publish(
314 NotificationScope::Tenant("acme".into()),
315 "deploys",
316 "v1.2.3",
317 now(),
318 );
319 assert_eq!(delivered, 1, "one connected listener should receive");
320 let event = rx.try_recv().expect("event delivered");
321 assert_eq!(event.channel, "deploys");
322 assert_eq!(event.payload, "v1.2.3");
323 assert_eq!(event.scope, NotificationScope::Tenant("acme".into()));
324 }
325
326 #[test]
327 fn channels_are_tenant_isolated() {
328 let reg = NotificationRegistry::new();
329 let mut rx_acme = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
330 let mut rx_globex = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
331
332 reg.publish(
335 NotificationScope::Tenant("acme".into()),
336 "deploys",
337 "acme-only",
338 now(),
339 );
340
341 assert_eq!(rx_acme.try_recv().unwrap().payload, "acme-only");
342 assert!(
343 rx_globex.try_recv().is_err(),
344 "globex must not see acme's notification"
345 );
346 }
347
348 #[test]
349 fn channel_names_are_scoped_independently() {
350 let reg = NotificationRegistry::new();
351 let mut rx_a = reg.subscribe(NotificationScope::Tenant("acme".into()), "a");
352 let mut rx_b = reg.subscribe(NotificationScope::Tenant("acme".into()), "b");
353
354 reg.publish(NotificationScope::Tenant("acme".into()), "a", "to-a", now());
355
356 assert_eq!(rx_a.try_recv().unwrap().payload, "to-a");
357 assert!(rx_b.try_recv().is_err());
358 }
359
360 #[test]
361 fn offline_listeners_miss_notifications_no_replay() {
362 let reg = NotificationRegistry::new();
363
364 {
366 let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
367 }
368
369 let delivered = reg.publish(
371 NotificationScope::Tenant("acme".into()),
372 "deploys",
373 "v1.0.0",
374 now(),
375 );
376 assert_eq!(delivered, 0, "publish with no listeners delivers 0");
377
378 let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
381 assert!(
382 rx.try_recv().is_err(),
383 "reconnected listener must not receive pre-reconnect notifications",
384 );
385
386 reg.publish(
389 NotificationScope::Tenant("acme".into()),
390 "deploys",
391 "v2.0.0",
392 now(),
393 );
394 assert_eq!(rx.try_recv().unwrap().payload, "v2.0.0");
395 }
396
397 #[test]
398 fn fanout_to_all_connected_listeners() {
399 let reg = NotificationRegistry::new();
400 let mut rx1 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
401 let mut rx2 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
402 let mut rx3 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
403
404 let delivered = reg.publish(
405 NotificationScope::Tenant("acme".into()),
406 "deploys",
407 "fanout",
408 now(),
409 );
410 assert_eq!(delivered, 3);
411 for rx in [&mut rx1, &mut rx2, &mut rx3] {
412 assert_eq!(rx.try_recv().unwrap().payload, "fanout");
413 }
414 }
415
416 #[test]
417 fn same_tenant_publish_does_not_require_cross_tenant_cap() {
418 let reg = NotificationRegistry::new();
419 let mut rx = reg
420 .subscribe_authorized(
421 Some("acme"),
422 NotificationScope::Tenant("acme".into()),
423 "deploys",
424 false, )
426 .expect("same-tenant subscribe must succeed without cross-tenant cap");
427
428 let delivered = reg
429 .publish_authorized(
430 Some("acme"),
431 NotificationScope::Tenant("acme".into()),
432 "deploys",
433 "v1",
434 false,
435 now(),
436 )
437 .expect("same-tenant publish must succeed without cross-tenant cap");
438 assert_eq!(delivered, 1);
439 assert_eq!(rx.try_recv().unwrap().payload, "v1");
440 }
441
442 #[test]
443 fn cross_tenant_publish_denied_without_cap() {
444 let reg = NotificationRegistry::new();
445 let err = reg
446 .publish_authorized(
447 Some("acme"),
448 NotificationScope::Tenant("globex".into()),
449 "deploys",
450 "leak",
451 false,
452 now(),
453 )
454 .expect_err("cross-tenant publish must be denied without cap");
455 match err {
456 NotificationError::CrossTenantDenied {
457 principal_tenant,
458 target,
459 channel,
460 } => {
461 assert_eq!(principal_tenant.as_deref(), Some("acme"));
462 assert_eq!(target, NotificationScope::Tenant("globex".into()));
463 assert_eq!(channel, "deploys");
464 }
465 }
466 }
467
468 #[test]
469 fn cross_tenant_subscribe_denied_without_cap() {
470 let reg = NotificationRegistry::new();
471 let err = reg
472 .subscribe_authorized(
473 Some("acme"),
474 NotificationScope::Tenant("globex".into()),
475 "deploys",
476 false,
477 )
478 .expect_err("cross-tenant subscribe must be denied without cap");
479 assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
480 }
481
482 #[test]
483 fn cross_tenant_publish_allowed_with_cap() {
484 let reg = NotificationRegistry::new();
485 let mut rx = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
486 let delivered = reg
487 .publish_authorized(
488 Some("acme"),
489 NotificationScope::Tenant("globex".into()),
490 "deploys",
491 "allowed",
492 true,
493 now(),
494 )
495 .expect("publish with cross-tenant cap must succeed");
496 assert_eq!(delivered, 1);
497 assert_eq!(rx.try_recv().unwrap().payload, "allowed");
498 }
499
500 #[test]
501 fn global_scope_requires_cross_tenant_cap() {
502 let reg = NotificationRegistry::new();
503 let err = reg
504 .publish_authorized(
505 Some("acme"),
506 NotificationScope::Global,
507 "platform",
508 "leak",
509 false,
510 now(),
511 )
512 .expect_err("targeting Global from a tenant must require cap");
513 assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
514
515 let _ = reg
518 .publish_authorized(
519 None,
520 NotificationScope::Global,
521 "platform",
522 "ok",
523 false,
524 now(),
525 )
526 .expect("platform principal targeting global is same-scope");
527 }
528
529 #[test]
530 fn channel_is_reaped_when_last_receiver_drops() {
531 let reg = NotificationRegistry::new();
532 {
533 let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
534 assert_eq!(reg.channel_count(), 1);
535 }
536 reg.publish(
539 NotificationScope::Tenant("acme".into()),
540 "deploys",
541 "noop",
542 now(),
543 );
544 assert_eq!(reg.channel_count(), 0);
545 }
546
547 #[test]
548 fn from_principal_tenant_maps_correctly() {
549 assert_eq!(
550 NotificationScope::from_principal_tenant(Some("acme")),
551 NotificationScope::Tenant("acme".into())
552 );
553 assert_eq!(
554 NotificationScope::from_principal_tenant(None),
555 NotificationScope::Global
556 );
557 }
558}