Skip to main content

reddb_server/
notifications.rs

1//! Ephemeral notification primitive (issue #720, PRD #718).
2//!
3//! Tenant-scoped pub/sub signals with no replay, ACK, consumer
4//! offset, pending delivery, or DLQ. Offline listeners miss
5//! notifications by design — applications that need durability
6//! should use queues or streams instead. ADR 0028 pins this
7//! boundary: queue wait, notification, and stream are separate
8//! primitives because their state machines are incompatible.
9//!
10//! ## Contract surface
11//!
12//! - `NotificationRegistry::publish_authorized` — capability-gated
13//!   publish that records whether the principal is allowed to
14//!   target the requested scope. Same-tenant publishes succeed
15//!   without an explicit capability; cross-tenant or global
16//!   publishes require the caller to assert
17//!   `has_cross_tenant_cap`, which is supplied by the calling
18//!   transport after evaluating the `notify:cross-tenant` action
19//!   against the principal's effective policies.
20//! - `NotificationRegistry::subscribe_authorized` — same capability
21//!   gate, but for the read side: subscribing to another tenant's
22//!   channel (or to the global namespace) requires the cross-tenant
23//!   capability.
24//! - `NotificationRegistry::publish` /
25//!   `NotificationRegistry::subscribe` — unauthenticated entry
26//!   points used by tests and by callers that have already proven
27//!   they sit above the authorization boundary. Transports should
28//!   prefer the `_authorized` variants.
29//!
30//! ## No-replay semantics
31//!
32//! The registry stores one Tokio broadcast channel per
33//! `(scope, channel)` key. A late subscriber's
34//! `broadcast::Sender::subscribe` cursor starts at the channel's
35//! current tail, so notifications published before the subscriber
36//! connected are not delivered — that is the no-replay guarantee.
37//! Offline listeners that reconnect therefore start with an empty
38//! queue and observe only future notifications, which is the
39//! deliberate trade-off: ephemeral channels do not buffer for
40//! disconnected consumers. Channels with no active receivers
41//! drop the underlying sender, so memory cost is bounded by the
42//! number of *connected* listeners.
43
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use parking_lot::Mutex;
48use tokio::sync::broadcast;
49
50/// Scope of a notification channel.
51///
52/// `Tenant(id)` is the default and matches RedDB's tenancy model —
53/// channels live inside a tenant and are invisible to other
54/// tenants. `Global` is the cross-tenant / platform namespace and
55/// requires the `notify:cross-tenant` capability to address.
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
57pub enum NotificationScope {
58    /// Tenant-scoped channel.
59    Tenant(String),
60    /// Cross-tenant / platform-global channel.
61    Global,
62}
63
64impl NotificationScope {
65    /// Construct a scope from a principal's tenant binding.
66    ///
67    /// `Some("acme")` becomes `Tenant("acme")`; `None` becomes
68    /// `Global` (the platform tenant — matches
69    /// `auth::UserId::platform`).
70    pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
71        match tenant {
72            Some(t) => NotificationScope::Tenant(t.to_string()),
73            None => NotificationScope::Global,
74        }
75    }
76
77    /// Stable string identifier used in audit events.
78    pub fn label(&self) -> String {
79        match self {
80            NotificationScope::Tenant(t) => format!("tenant:{t}"),
81            NotificationScope::Global => "global".to_string(),
82        }
83    }
84}
85
86/// A single notification delivered to one connected listener.
87///
88/// Carries the routing tuple (scope + channel) plus an opaque
89/// UTF-8 payload. `published_at_ms` is monotonically meaningful
90/// only within the running process; offline-replay use cases
91/// should be modelled as queues or streams, not notifications.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct NotificationEvent {
94    pub scope: NotificationScope,
95    pub channel: String,
96    pub payload: String,
97    pub published_at_ms: u128,
98}
99
100/// Errors returned by the notification authorization gate.
101#[derive(Debug, PartialEq, Eq)]
102pub enum NotificationError {
103    /// The principal tried to address a channel outside their own
104    /// tenant without the `notify:cross-tenant` capability. The
105    /// caller knows the principal's tenant and the requested
106    /// scope; the error preserves both so audit can reconstruct
107    /// the denial.
108    CrossTenantDenied {
109        principal_tenant: Option<String>,
110        target: NotificationScope,
111        channel: String,
112    },
113}
114
115impl std::fmt::Display for NotificationError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            NotificationError::CrossTenantDenied {
119                principal_tenant,
120                target,
121                channel,
122            } => {
123                let from = principal_tenant.as_deref().unwrap_or("<platform>");
124                write!(
125                    f,
126                    "notification: principal in tenant `{}` is not allowed to address `{}` channel `{}` without the `notify:cross-tenant` capability",
127                    from,
128                    target.label(),
129                    channel
130                )
131            }
132        }
133    }
134}
135
136impl std::error::Error for NotificationError {}
137
138/// Per-channel broadcast capacity. Tuned for low-fanout signals;
139/// callers that need higher throughput should batch into a single
140/// notification or graduate to a durable stream.
141const CHANNEL_CAPACITY: usize = 256;
142
143/// In-memory registry of ephemeral notification channels.
144///
145/// The registry is `Send + Sync` and intended to live behind an
146/// `Arc` on the runtime — typically one per server process. It
147/// owns no on-disk state, no WAL entries, and no audit-event tail:
148/// a process restart drops every channel and every connected
149/// listener, which is the intended ephemeral contract.
150#[derive(Default, Clone)]
151pub struct NotificationRegistry {
152    inner: Arc<Mutex<HashMap<ChannelKey, broadcast::Sender<NotificationEvent>>>>,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156struct ChannelKey {
157    scope: NotificationScope,
158    channel: String,
159}
160
161impl NotificationRegistry {
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Subscribe to `(scope, channel)`.
167    ///
168    /// The returned receiver only observes notifications published
169    /// AFTER `subscribe` returns. Drop the receiver to unsubscribe;
170    /// the underlying channel is reaped from the registry when its
171    /// last receiver is dropped and no senders remain outstanding.
172    pub fn subscribe(
173        &self,
174        scope: NotificationScope,
175        channel: impl Into<String>,
176    ) -> broadcast::Receiver<NotificationEvent> {
177        let key = ChannelKey {
178            scope,
179            channel: channel.into(),
180        };
181        let mut guard = self.inner.lock();
182        let sender = guard
183            .entry(key)
184            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
185        sender.subscribe()
186    }
187
188    /// Publish a payload on `(scope, channel)` and return the
189    /// number of currently connected listeners that received the
190    /// event. Returns `0` if no listeners are connected — the
191    /// notification is dropped (no buffering, no replay).
192    pub fn publish(
193        &self,
194        scope: NotificationScope,
195        channel: impl Into<String>,
196        payload: impl Into<String>,
197        now_ms: u128,
198    ) -> usize {
199        let channel = channel.into();
200        let key = ChannelKey {
201            scope: scope.clone(),
202            channel: channel.clone(),
203        };
204        let event = NotificationEvent {
205            scope,
206            channel,
207            payload: payload.into(),
208            published_at_ms: now_ms,
209        };
210
211        let sender = {
212            let guard = self.inner.lock();
213            guard.get(&key).cloned()
214        };
215        let Some(sender) = sender else {
216            return 0;
217        };
218
219        // If no live receivers, drop the event and reap the
220        // channel so future publishes on a dead channel don't
221        // accumulate buffered messages — ephemeral semantics.
222        if sender.receiver_count() == 0 {
223            self.inner.lock().remove(&key);
224            return 0;
225        }
226        sender.send(event).unwrap_or(0)
227    }
228
229    /// Authorization-gated publish.
230    ///
231    /// `principal_tenant` is the publisher's tenant binding
232    /// (`None` = platform/global). `target` is the channel scope
233    /// the publisher asked for. `has_cross_tenant_cap` must be
234    /// `true` if the calling transport's policy evaluator
235    /// previously granted the principal the `notify:cross-tenant`
236    /// action; the registry does not consult policies directly,
237    /// keeping the authorization boundary on the transport side
238    /// (mirrors the AiProviderGate pattern from #711).
239    pub fn publish_authorized(
240        &self,
241        principal_tenant: Option<&str>,
242        target: NotificationScope,
243        channel: impl Into<String>,
244        payload: impl Into<String>,
245        has_cross_tenant_cap: bool,
246        now_ms: u128,
247    ) -> Result<usize, NotificationError> {
248        let channel = channel.into();
249        Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
250        Ok(self.publish(target, channel, payload, now_ms))
251    }
252
253    /// Authorization-gated subscribe — mirror of
254    /// [`Self::publish_authorized`] for the read side.
255    pub fn subscribe_authorized(
256        &self,
257        principal_tenant: Option<&str>,
258        target: NotificationScope,
259        channel: impl Into<String>,
260        has_cross_tenant_cap: bool,
261    ) -> Result<broadcast::Receiver<NotificationEvent>, NotificationError> {
262        let channel = channel.into();
263        Self::authorize(principal_tenant, &target, &channel, has_cross_tenant_cap)?;
264        Ok(self.subscribe(target, channel))
265    }
266
267    fn authorize(
268        principal_tenant: Option<&str>,
269        target: &NotificationScope,
270        channel: &str,
271        has_cross_tenant_cap: bool,
272    ) -> Result<(), NotificationError> {
273        let same_scope = match (principal_tenant, target) {
274            (Some(pt), NotificationScope::Tenant(tt)) => pt == tt,
275            // A platform/system principal (tenant=None) addressing
276            // the Global namespace is operating inside its own scope
277            // and needs no extra capability.
278            (None, NotificationScope::Global) => true,
279            _ => false,
280        };
281        if same_scope || has_cross_tenant_cap {
282            return Ok(());
283        }
284        Err(NotificationError::CrossTenantDenied {
285            principal_tenant: principal_tenant.map(str::to_string),
286            target: target.clone(),
287            channel: channel.to_string(),
288        })
289    }
290
291    /// Number of channels currently registered. Test/diagnostic
292    /// helper — operators should not depend on this value.
293    pub fn channel_count(&self) -> usize {
294        self.inner.lock().len()
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    fn now() -> u128 {
303        // Deterministic-enough counter for tests; the actual value
304        // isn't asserted on, only that publishes carry *some*
305        // timestamp.
306        1
307    }
308
309    #[test]
310    fn same_tenant_publish_subscribe_round_trip() {
311        let reg = NotificationRegistry::new();
312        let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
313        let delivered = reg.publish(
314            NotificationScope::Tenant("acme".into()),
315            "deploys",
316            "v1.2.3",
317            now(),
318        );
319        assert_eq!(delivered, 1, "one connected listener should receive");
320        let event = rx.try_recv().expect("event delivered");
321        assert_eq!(event.channel, "deploys");
322        assert_eq!(event.payload, "v1.2.3");
323        assert_eq!(event.scope, NotificationScope::Tenant("acme".into()));
324    }
325
326    #[test]
327    fn channels_are_tenant_isolated() {
328        let reg = NotificationRegistry::new();
329        let mut rx_acme = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
330        let mut rx_globex = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
331
332        // Publish to acme only — globex must not see it. Same
333        // channel name, different tenant scope.
334        reg.publish(
335            NotificationScope::Tenant("acme".into()),
336            "deploys",
337            "acme-only",
338            now(),
339        );
340
341        assert_eq!(rx_acme.try_recv().unwrap().payload, "acme-only");
342        assert!(
343            rx_globex.try_recv().is_err(),
344            "globex must not see acme's notification"
345        );
346    }
347
348    #[test]
349    fn channel_names_are_scoped_independently() {
350        let reg = NotificationRegistry::new();
351        let mut rx_a = reg.subscribe(NotificationScope::Tenant("acme".into()), "a");
352        let mut rx_b = reg.subscribe(NotificationScope::Tenant("acme".into()), "b");
353
354        reg.publish(NotificationScope::Tenant("acme".into()), "a", "to-a", now());
355
356        assert_eq!(rx_a.try_recv().unwrap().payload, "to-a");
357        assert!(rx_b.try_recv().is_err());
358    }
359
360    #[test]
361    fn offline_listeners_miss_notifications_no_replay() {
362        let reg = NotificationRegistry::new();
363
364        // Phase 1: a subscriber connects, then disconnects.
365        {
366            let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
367        }
368
369        // Phase 2: publish while no one is listening — dropped.
370        let delivered = reg.publish(
371            NotificationScope::Tenant("acme".into()),
372            "deploys",
373            "v1.0.0",
374            now(),
375        );
376        assert_eq!(delivered, 0, "publish with no listeners delivers 0");
377
378        // Phase 3: subscriber reconnects — must NOT see the
379        // pre-reconnect notification.
380        let mut rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
381        assert!(
382            rx.try_recv().is_err(),
383            "reconnected listener must not receive pre-reconnect notifications",
384        );
385
386        // Phase 4: publish after reconnect — listener gets the
387        // new notification.
388        reg.publish(
389            NotificationScope::Tenant("acme".into()),
390            "deploys",
391            "v2.0.0",
392            now(),
393        );
394        assert_eq!(rx.try_recv().unwrap().payload, "v2.0.0");
395    }
396
397    #[test]
398    fn fanout_to_all_connected_listeners() {
399        let reg = NotificationRegistry::new();
400        let mut rx1 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
401        let mut rx2 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
402        let mut rx3 = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
403
404        let delivered = reg.publish(
405            NotificationScope::Tenant("acme".into()),
406            "deploys",
407            "fanout",
408            now(),
409        );
410        assert_eq!(delivered, 3);
411        for rx in [&mut rx1, &mut rx2, &mut rx3] {
412            assert_eq!(rx.try_recv().unwrap().payload, "fanout");
413        }
414    }
415
416    #[test]
417    fn same_tenant_publish_does_not_require_cross_tenant_cap() {
418        let reg = NotificationRegistry::new();
419        let mut rx = reg
420            .subscribe_authorized(
421                Some("acme"),
422                NotificationScope::Tenant("acme".into()),
423                "deploys",
424                false, // no cross-tenant cap
425            )
426            .expect("same-tenant subscribe must succeed without cross-tenant cap");
427
428        let delivered = reg
429            .publish_authorized(
430                Some("acme"),
431                NotificationScope::Tenant("acme".into()),
432                "deploys",
433                "v1",
434                false,
435                now(),
436            )
437            .expect("same-tenant publish must succeed without cross-tenant cap");
438        assert_eq!(delivered, 1);
439        assert_eq!(rx.try_recv().unwrap().payload, "v1");
440    }
441
442    #[test]
443    fn cross_tenant_publish_denied_without_cap() {
444        let reg = NotificationRegistry::new();
445        let err = reg
446            .publish_authorized(
447                Some("acme"),
448                NotificationScope::Tenant("globex".into()),
449                "deploys",
450                "leak",
451                false,
452                now(),
453            )
454            .expect_err("cross-tenant publish must be denied without cap");
455        match err {
456            NotificationError::CrossTenantDenied {
457                principal_tenant,
458                target,
459                channel,
460            } => {
461                assert_eq!(principal_tenant.as_deref(), Some("acme"));
462                assert_eq!(target, NotificationScope::Tenant("globex".into()));
463                assert_eq!(channel, "deploys");
464            }
465        }
466    }
467
468    #[test]
469    fn cross_tenant_subscribe_denied_without_cap() {
470        let reg = NotificationRegistry::new();
471        let err = reg
472            .subscribe_authorized(
473                Some("acme"),
474                NotificationScope::Tenant("globex".into()),
475                "deploys",
476                false,
477            )
478            .expect_err("cross-tenant subscribe must be denied without cap");
479        assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
480    }
481
482    #[test]
483    fn cross_tenant_publish_allowed_with_cap() {
484        let reg = NotificationRegistry::new();
485        let mut rx = reg.subscribe(NotificationScope::Tenant("globex".into()), "deploys");
486        let delivered = reg
487            .publish_authorized(
488                Some("acme"),
489                NotificationScope::Tenant("globex".into()),
490                "deploys",
491                "allowed",
492                true,
493                now(),
494            )
495            .expect("publish with cross-tenant cap must succeed");
496        assert_eq!(delivered, 1);
497        assert_eq!(rx.try_recv().unwrap().payload, "allowed");
498    }
499
500    #[test]
501    fn global_scope_requires_cross_tenant_cap() {
502        let reg = NotificationRegistry::new();
503        let err = reg
504            .publish_authorized(
505                Some("acme"),
506                NotificationScope::Global,
507                "platform",
508                "leak",
509                false,
510                now(),
511            )
512            .expect_err("targeting Global from a tenant must require cap");
513        assert!(matches!(err, NotificationError::CrossTenantDenied { .. }));
514
515        // Platform principals (tenant=None) addressing Global is
516        // same-scope and requires no extra cap.
517        let _ = reg
518            .publish_authorized(
519                None,
520                NotificationScope::Global,
521                "platform",
522                "ok",
523                false,
524                now(),
525            )
526            .expect("platform principal targeting global is same-scope");
527    }
528
529    #[test]
530    fn channel_is_reaped_when_last_receiver_drops() {
531        let reg = NotificationRegistry::new();
532        {
533            let _rx = reg.subscribe(NotificationScope::Tenant("acme".into()), "deploys");
534            assert_eq!(reg.channel_count(), 1);
535        }
536        // Receiver dropped. The channel record itself is reaped
537        // on the next publish to that key.
538        reg.publish(
539            NotificationScope::Tenant("acme".into()),
540            "deploys",
541            "noop",
542            now(),
543        );
544        assert_eq!(reg.channel_count(), 0);
545    }
546
547    #[test]
548    fn from_principal_tenant_maps_correctly() {
549        assert_eq!(
550            NotificationScope::from_principal_tenant(Some("acme")),
551            NotificationScope::Tenant("acme".into())
552        );
553        assert_eq!(
554            NotificationScope::from_principal_tenant(None),
555            NotificationScope::Global
556        );
557    }
558}