Skip to main content

reddb_server/
streams.rs

1//! Durable stream primitive (issue #721, PRD #718).
2//!
3//! Tenant-scoped, append-only event log with monotonic per-consumer
4//! offsets. Streams are the third leg of ADR 0028's split — queues
5//! own per-message delivery state (ACK/NACK/DLQ), ephemeral
6//! notifications own no state at all, and streams own an immutable
7//! ordered log plus a small per-consumer offset bookkeeping table.
8//! Reading a stream never creates pending delivery state and never
9//! requires ACK or NACK; advancing the saved offset is the only
10//! "I'm done with this prefix" signal.
11//!
12//! ## Contract surface
13//!
14//! - [`StreamRegistry::create_stream`] — declare a new stream with a
15//!   retention contract. The stream becomes discoverable via
16//!   [`StreamRegistry::list_streams`].
17//! - [`StreamRegistry::append`] / [`StreamRegistry::append_authorized`]
18//!   — append an event payload with an optional stream-identity key.
19//!   Returns the assigned offset (`u64`, sequence within the stream).
20//! - [`StreamRegistry::read_since`] /
21//!   [`StreamRegistry::read_since_authorized`] — read up to `limit`
22//!   events with offset `>= from`. Read does NOT consume, lease, or
23//!   leave pending state behind, and does NOT advance the consumer's
24//!   saved offset — that is the caller's explicit responsibility via
25//!   `save_offset`.
26//! - [`StreamRegistry::save_offset`] /
27//!   [`StreamRegistry::get_offset`] — persist a consumer's progress.
28//!   Saving is **monotonic**: a smaller or equal offset is dropped
29//!   silently and the previously-saved value is returned. This makes
30//!   the operation safe to retry on duplicate or stale acks without
31//!   rewinding a consumer past events it already finished.
32//!
33//! ## Retention contract (first cut)
34//!
35//! Each stream carries a [`StreamRetention`] describing how the
36//! engine prunes old events. The first cut supports two independent
37//! caps that compose by AND (the stricter wins):
38//!
39//! * `max_events: Option<usize>` — drop the oldest events so the log
40//!   never exceeds N entries.
41//! * `max_age_ms: Option<u64>` — drop events older than `now -
42//!   max_age_ms`.
43//!
44//! Retention is applied at append time. A retention pass never
45//! rewrites the offset of surviving events — offsets remain sparse
46//! once the head moves forward. Consumers whose saved offset has
47//! fallen below the current head simply skip the truncated prefix
48//! the next time they call `read_since`; the engine does not raise
49//! an error for "consumer lagged past retention". Operators who care
50//! about that condition can compare `get_offset(consumer)` against
51//! the descriptor's `head_offset` themselves.
52//!
53//! ## Authorization model
54//!
55//! Mirrors the [`crate::notifications`] pattern: the registry never
56//! consults policies directly. Transports evaluate the `stream`
57//! action (and `stream:cross-tenant` for cross-tenant addressing)
58//! against the principal's effective policies and pass the resulting
59//! `has_cross_tenant_cap: bool` into the `_authorized` entry points.
60//! Same-scope operations succeed without the extra capability;
61//! everything else returns [`StreamError::CrossTenantDenied`] with
62//! the principal / target / stream triple preserved for audit.
63//!
64//! ## CDC compatibility
65//!
66//! [`StreamEvent`] carries `key` and `payload` as opaque UTF-8
67//! strings, plus the engine-assigned `offset` and `appended_at_ms`.
68//! That shape is intentionally the standard change-data-capture log
69//! shape: a later materialized-CDC slice can populate the same event
70//! type from a table's mutation tail, with `key` becoming the row
71//! primary key and `payload` the row JSON. Nothing in this module
72//! commits the engine to a specific CDC strategy — the contract is
73//! deliberately open on that axis.
74//!
75//! ## Durability
76//!
77//! The first slice of the primitive is an in-process append-only
78//! log. The registry is `Send + Sync` and intended to live behind
79//! an `Arc` on the runtime; persistence to disk-backed storage is a
80//! follow-up slice tracked under the same PRD (#718) — it does not
81//! change the public contract above, only where the bytes live.
82
83use std::collections::HashMap;
84use std::sync::Arc;
85
86use parking_lot::Mutex;
87
88/// Scope of a stream — tenant-isolated by default. Mirrors
89/// [`crate::notifications::NotificationScope`].
90#[derive(Debug, Clone, PartialEq, Eq, Hash)]
91pub enum StreamScope {
92    /// Tenant-scoped stream — invisible to other tenants.
93    Tenant(String),
94    /// Cross-tenant / platform-global namespace.
95    Global,
96}
97
98impl StreamScope {
99    /// Construct a scope from a principal's tenant binding.
100    ///
101    /// `Some("acme")` → `Tenant("acme")`; `None` → `Global`
102    /// (platform tenant). Same mapping as
103    /// [`crate::notifications::NotificationScope::from_principal_tenant`]
104    /// so a future transport can reuse the resolver.
105    pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
106        match tenant {
107            Some(t) => StreamScope::Tenant(t.to_string()),
108            None => StreamScope::Global,
109        }
110    }
111
112    /// Stable string identifier used in audit events.
113    pub fn label(&self) -> String {
114        match self {
115            StreamScope::Tenant(t) => format!("tenant:{t}"),
116            StreamScope::Global => "global".to_string(),
117        }
118    }
119}
120
121/// Retention contract for a single stream. Both fields are
122/// independently optional; pass `StreamRetention::default()` for an
123/// unbounded stream (no retention pruning).
124#[derive(Debug, Clone, Default, PartialEq, Eq)]
125pub struct StreamRetention {
126    /// Maximum number of events retained. Oldest events past the cap
127    /// are dropped on append. `None` means unbounded.
128    pub max_events: Option<usize>,
129    /// Maximum age in milliseconds. Events older than `now -
130    /// max_age_ms` are dropped on append. `None` means unbounded.
131    pub max_age_ms: Option<u64>,
132}
133
134/// A single event in the stream log. `offset` is engine-assigned
135/// and monotonically increasing within `(scope, stream)`; retention
136/// pruning may advance the head past low offsets but never reuses
137/// or rewrites them.
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct StreamEvent {
140    pub scope: StreamScope,
141    pub stream: String,
142    /// Optional stream-identity key. Carries the caller's
143    /// partition / row-key hint; the engine does not interpret it
144    /// in this slice. CDC-materialization slices may use it as the
145    /// source row's primary key.
146    pub key: Option<String>,
147    /// Opaque UTF-8 payload — typically a JSON document. The engine
148    /// does not parse or validate it.
149    pub payload: String,
150    /// Engine-assigned monotonic sequence number within
151    /// `(scope, stream)`. The first event has offset `1`; offset
152    /// `0` is reserved as the "no progress yet" sentinel returned
153    /// by [`StreamRegistry::get_offset`] when a consumer has never
154    /// saved.
155    pub offset: u64,
156    pub appended_at_ms: u128,
157}
158
159/// Public-facing descriptor for stream discovery — what an
160/// introspection surface (e.g. a future `red.streams` virtual
161/// table) would emit per stream.
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct StreamDescriptor {
164    pub scope: StreamScope,
165    pub name: String,
166    pub retention: StreamRetention,
167    /// Offset of the oldest event still retained, or `0` if the
168    /// stream is empty.
169    pub head_offset: u64,
170    /// Offset of the most recent event, or `0` if the stream is
171    /// empty. The next append will receive `tail_offset + 1`.
172    pub tail_offset: u64,
173    pub event_count: usize,
174}
175
176/// Errors surfaced by the stream registry.
177#[derive(Debug, PartialEq, Eq)]
178pub enum StreamError {
179    /// `create_stream` was called for a `(scope, name)` pair that
180    /// already has a stream.
181    AlreadyExists { scope: StreamScope, name: String },
182    /// An op targeted a stream that has not been created.
183    NotFound { scope: StreamScope, name: String },
184    /// The principal tried to address a stream outside their own
185    /// tenant without the `stream:cross-tenant` capability.
186    CrossTenantDenied {
187        principal_tenant: Option<String>,
188        target: StreamScope,
189        stream: String,
190    },
191}
192
193impl std::fmt::Display for StreamError {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        match self {
196            StreamError::AlreadyExists { scope, name } => {
197                write!(f, "stream: `{}/{}` already exists", scope.label(), name)
198            }
199            StreamError::NotFound { scope, name } => {
200                write!(f, "stream: `{}/{}` not found", scope.label(), name)
201            }
202            StreamError::CrossTenantDenied {
203                principal_tenant,
204                target,
205                stream,
206            } => {
207                let from = principal_tenant.as_deref().unwrap_or("<platform>");
208                write!(
209                    f,
210                    "stream: principal in tenant `{}` is not allowed to address `{}` stream `{}` without the `stream:cross-tenant` capability",
211                    from,
212                    target.label(),
213                    stream
214                )
215            }
216        }
217    }
218}
219
220impl std::error::Error for StreamError {}
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
223struct StreamKey {
224    scope: StreamScope,
225    name: String,
226}
227
228#[derive(Debug)]
229struct DurableStream {
230    retention: StreamRetention,
231    /// Append-only event log. Sorted by offset ascending.
232    /// Retention drops from the front so `events[0].offset` is the
233    /// current head.
234    events: Vec<StreamEvent>,
235    /// Next offset to assign. Starts at `1`; never decreases.
236    next_offset: u64,
237    /// Per-consumer saved offset. Always monotonic — see
238    /// [`StreamRegistry::save_offset`].
239    consumer_offsets: HashMap<String, u64>,
240}
241
242impl DurableStream {
243    fn new(retention: StreamRetention) -> Self {
244        Self {
245            retention,
246            events: Vec::new(),
247            next_offset: 1,
248            consumer_offsets: HashMap::new(),
249        }
250    }
251
252    fn descriptor(&self, scope: StreamScope, name: String) -> StreamDescriptor {
253        let head_offset = self.events.first().map(|e| e.offset).unwrap_or(0);
254        let tail_offset = self.events.last().map(|e| e.offset).unwrap_or(0);
255        StreamDescriptor {
256            scope,
257            name,
258            retention: self.retention.clone(),
259            head_offset,
260            tail_offset,
261            event_count: self.events.len(),
262        }
263    }
264
265    fn apply_retention(&mut self, now_ms: u128) {
266        if let Some(max_events) = self.retention.max_events {
267            while self.events.len() > max_events {
268                self.events.remove(0);
269            }
270        }
271        if let Some(max_age_ms) = self.retention.max_age_ms {
272            let cutoff = now_ms.saturating_sub(max_age_ms as u128);
273            while let Some(first) = self.events.first() {
274                if first.appended_at_ms < cutoff {
275                    self.events.remove(0);
276                } else {
277                    break;
278                }
279            }
280        }
281    }
282}
283
284/// In-memory registry of durable streams.
285#[derive(Default, Clone)]
286pub struct StreamRegistry {
287    inner: Arc<Mutex<HashMap<StreamKey, DurableStream>>>,
288}
289
290impl StreamRegistry {
291    pub fn new() -> Self {
292        Self::default()
293    }
294
295    /// Declare a new stream. Returns
296    /// [`StreamError::AlreadyExists`] if `(scope, name)` is already
297    /// registered. The new stream is immediately discoverable via
298    /// [`Self::list_streams`].
299    pub fn create_stream(
300        &self,
301        scope: StreamScope,
302        name: impl Into<String>,
303        retention: StreamRetention,
304    ) -> Result<(), StreamError> {
305        let name = name.into();
306        let key = StreamKey {
307            scope: scope.clone(),
308            name: name.clone(),
309        };
310        let mut guard = self.inner.lock();
311        if guard.contains_key(&key) {
312            return Err(StreamError::AlreadyExists { scope, name });
313        }
314        guard.insert(key, DurableStream::new(retention));
315        Ok(())
316    }
317
318    /// Whether `(scope, name)` is registered.
319    pub fn exists(&self, scope: &StreamScope, name: &str) -> bool {
320        let key = StreamKey {
321            scope: scope.clone(),
322            name: name.to_string(),
323        };
324        self.inner.lock().contains_key(&key)
325    }
326
327    /// Snapshot every stream in `scope`. Used by introspection
328    /// surfaces (e.g. a future `red.streams` virtual table). Order
329    /// is unspecified; callers that need stable order should sort
330    /// on `name`.
331    pub fn list_streams(&self, scope: &StreamScope) -> Vec<StreamDescriptor> {
332        let guard = self.inner.lock();
333        guard
334            .iter()
335            .filter(|(k, _)| &k.scope == scope)
336            .map(|(k, s)| s.descriptor(k.scope.clone(), k.name.clone()))
337            .collect()
338    }
339
340    /// Describe a single stream, or `None` if not registered.
341    pub fn describe(&self, scope: &StreamScope, name: &str) -> Option<StreamDescriptor> {
342        let key = StreamKey {
343            scope: scope.clone(),
344            name: name.to_string(),
345        };
346        let guard = self.inner.lock();
347        guard.get(&key).map(|s| s.descriptor(key.scope, key.name))
348    }
349
350    /// Append an event. Returns the engine-assigned offset.
351    /// Retention pruning runs after the append, so the new event
352    /// is always retained even if it pushes the head past the cap
353    /// — only older events are dropped.
354    pub fn append(
355        &self,
356        scope: StreamScope,
357        name: impl Into<String>,
358        key: Option<String>,
359        payload: impl Into<String>,
360        now_ms: u128,
361    ) -> Result<u64, StreamError> {
362        let name = name.into();
363        let lookup_key = StreamKey {
364            scope: scope.clone(),
365            name: name.clone(),
366        };
367        let mut guard = self.inner.lock();
368        let stream = guard
369            .get_mut(&lookup_key)
370            .ok_or_else(|| StreamError::NotFound {
371                scope: scope.clone(),
372                name: name.clone(),
373            })?;
374        let offset = stream.next_offset;
375        stream.next_offset += 1;
376        stream.events.push(StreamEvent {
377            scope,
378            stream: name,
379            key,
380            payload: payload.into(),
381            offset,
382            appended_at_ms: now_ms,
383        });
384        stream.apply_retention(now_ms);
385        Ok(offset)
386    }
387
388    /// Authorization-gated [`Self::append`].
389    pub fn append_authorized(
390        &self,
391        principal_tenant: Option<&str>,
392        target: StreamScope,
393        name: impl Into<String>,
394        key: Option<String>,
395        payload: impl Into<String>,
396        has_cross_tenant_cap: bool,
397        now_ms: u128,
398    ) -> Result<u64, StreamError> {
399        let name = name.into();
400        Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
401        self.append(target, name, key, payload, now_ms)
402    }
403
404    /// Read up to `limit` events with offset `>= from`. Pure read —
405    /// does not create pending delivery state, does not advance
406    /// any consumer's saved offset, and does not require ACK/NACK.
407    /// If `from` is below the current head (because retention has
408    /// pruned older events), the returned slice simply starts at
409    /// the head with no error.
410    pub fn read_since(
411        &self,
412        scope: &StreamScope,
413        name: &str,
414        from: u64,
415        limit: usize,
416    ) -> Result<Vec<StreamEvent>, StreamError> {
417        let key = StreamKey {
418            scope: scope.clone(),
419            name: name.to_string(),
420        };
421        let guard = self.inner.lock();
422        let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
423            scope: scope.clone(),
424            name: name.to_string(),
425        })?;
426        Ok(stream
427            .events
428            .iter()
429            .filter(|e| e.offset >= from)
430            .take(limit)
431            .cloned()
432            .collect())
433    }
434
435    /// Authorization-gated [`Self::read_since`].
436    pub fn read_since_authorized(
437        &self,
438        principal_tenant: Option<&str>,
439        target: StreamScope,
440        name: impl Into<String>,
441        from: u64,
442        limit: usize,
443        has_cross_tenant_cap: bool,
444    ) -> Result<Vec<StreamEvent>, StreamError> {
445        let name = name.into();
446        Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
447        self.read_since(&target, &name, from, limit)
448    }
449
450    /// Persist a consumer's offset on `(scope, name)`. Monotonic:
451    /// if `offset` is less than or equal to the currently saved
452    /// value, the save is a no-op and the existing value is
453    /// returned. Otherwise the new value is stored and returned.
454    /// This makes the operation safe to retry on duplicate or
455    /// stale "I'm done with offset N" notifications — a consumer
456    /// can never rewind past events it already finished.
457    pub fn save_offset(
458        &self,
459        scope: &StreamScope,
460        name: &str,
461        consumer: &str,
462        offset: u64,
463    ) -> Result<u64, StreamError> {
464        let key = StreamKey {
465            scope: scope.clone(),
466            name: name.to_string(),
467        };
468        let mut guard = self.inner.lock();
469        let stream = guard.get_mut(&key).ok_or_else(|| StreamError::NotFound {
470            scope: scope.clone(),
471            name: name.to_string(),
472        })?;
473        let entry = stream
474            .consumer_offsets
475            .entry(consumer.to_string())
476            .or_insert(0);
477        if offset > *entry {
478            *entry = offset;
479        }
480        Ok(*entry)
481    }
482
483    /// Retrieve a consumer's saved offset for `(scope, name)`.
484    /// Returns `0` for consumers that have never saved — `0` is
485    /// the reserved "no progress yet" sentinel since the first
486    /// real event is at offset `1`.
487    pub fn get_offset(
488        &self,
489        scope: &StreamScope,
490        name: &str,
491        consumer: &str,
492    ) -> Result<u64, StreamError> {
493        let key = StreamKey {
494            scope: scope.clone(),
495            name: name.to_string(),
496        };
497        let guard = self.inner.lock();
498        let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
499            scope: scope.clone(),
500            name: name.to_string(),
501        })?;
502        Ok(stream.consumer_offsets.get(consumer).copied().unwrap_or(0))
503    }
504
505    fn authorize(
506        principal_tenant: Option<&str>,
507        target: &StreamScope,
508        stream: &str,
509        has_cross_tenant_cap: bool,
510    ) -> Result<(), StreamError> {
511        let same_scope = match (principal_tenant, target) {
512            (Some(pt), StreamScope::Tenant(tt)) => pt == tt,
513            // Platform principal (tenant=None) addressing Global is
514            // same-scope and needs no extra cap, matching the
515            // notifications precedent.
516            (None, StreamScope::Global) => true,
517            _ => false,
518        };
519        if same_scope || has_cross_tenant_cap {
520            return Ok(());
521        }
522        Err(StreamError::CrossTenantDenied {
523            principal_tenant: principal_tenant.map(str::to_string),
524            target: target.clone(),
525            stream: stream.to_string(),
526        })
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    fn t(name: &str) -> StreamScope {
535        StreamScope::Tenant(name.into())
536    }
537
538    #[test]
539    fn create_then_discover_via_list() {
540        let reg = StreamRegistry::new();
541        reg.create_stream(t("acme"), "orders", StreamRetention::default())
542            .unwrap();
543        let listed = reg.list_streams(&t("acme"));
544        assert_eq!(listed.len(), 1);
545        assert_eq!(listed[0].name, "orders");
546        assert_eq!(listed[0].event_count, 0);
547        assert_eq!(listed[0].head_offset, 0);
548        assert_eq!(listed[0].tail_offset, 0);
549        assert!(reg.exists(&t("acme"), "orders"));
550        assert!(reg.describe(&t("acme"), "orders").is_some());
551    }
552
553    #[test]
554    fn duplicate_create_rejected() {
555        let reg = StreamRegistry::new();
556        reg.create_stream(t("acme"), "orders", StreamRetention::default())
557            .unwrap();
558        let err = reg
559            .create_stream(t("acme"), "orders", StreamRetention::default())
560            .expect_err("dup create must fail");
561        assert!(matches!(err, StreamError::AlreadyExists { .. }));
562    }
563
564    #[test]
565    fn append_assigns_monotonic_offsets() {
566        let reg = StreamRegistry::new();
567        reg.create_stream(t("acme"), "orders", StreamRetention::default())
568            .unwrap();
569        let o1 = reg.append(t("acme"), "orders", None, "a", 100).unwrap();
570        let o2 = reg
571            .append(t("acme"), "orders", Some("k".into()), "b", 101)
572            .unwrap();
573        let o3 = reg.append(t("acme"), "orders", None, "c", 102).unwrap();
574        assert_eq!((o1, o2, o3), (1, 2, 3));
575        let desc = reg.describe(&t("acme"), "orders").unwrap();
576        assert_eq!(desc.head_offset, 1);
577        assert_eq!(desc.tail_offset, 3);
578        assert_eq!(desc.event_count, 3);
579    }
580
581    #[test]
582    fn append_on_unknown_stream_errors() {
583        let reg = StreamRegistry::new();
584        let err = reg
585            .append(t("acme"), "missing", None, "x", 0)
586            .expect_err("append on unknown stream must error");
587        assert!(matches!(err, StreamError::NotFound { .. }));
588    }
589
590    #[test]
591    fn read_since_returns_events_from_offset() {
592        let reg = StreamRegistry::new();
593        reg.create_stream(t("acme"), "orders", StreamRetention::default())
594            .unwrap();
595        for (i, payload) in ["a", "b", "c", "d"].iter().enumerate() {
596            reg.append(t("acme"), "orders", None, *payload, 100 + i as u128)
597                .unwrap();
598        }
599        let from_start = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
600        assert_eq!(from_start.len(), 4);
601        assert_eq!(from_start[0].offset, 1);
602        assert_eq!(from_start[3].payload, "d");
603
604        let from_middle = reg.read_since(&t("acme"), "orders", 3, 100).unwrap();
605        assert_eq!(from_middle.len(), 2);
606        assert_eq!(from_middle[0].offset, 3);
607        assert_eq!(from_middle[1].offset, 4);
608
609        let bounded = reg.read_since(&t("acme"), "orders", 0, 2).unwrap();
610        assert_eq!(bounded.len(), 2);
611        assert_eq!(bounded[1].offset, 2);
612    }
613
614    #[test]
615    fn read_does_not_advance_consumer_offset_no_pending_state() {
616        let reg = StreamRegistry::new();
617        reg.create_stream(t("acme"), "orders", StreamRetention::default())
618            .unwrap();
619        for i in 0..3 {
620            reg.append(t("acme"), "orders", None, "x", i).unwrap();
621        }
622        // Read everything multiple times. No ACK/NACK in the API,
623        // and get_offset stays at 0 — proves read leaves no pending
624        // delivery state behind.
625        for _ in 0..3 {
626            let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
627            assert_eq!(events.len(), 3);
628        }
629        assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 0);
630    }
631
632    #[test]
633    fn save_offset_is_monotonic() {
634        let reg = StreamRegistry::new();
635        reg.create_stream(t("acme"), "orders", StreamRetention::default())
636            .unwrap();
637        for i in 0..5 {
638            reg.append(t("acme"), "orders", None, "x", i).unwrap();
639        }
640        assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3);
641        // Stale save (smaller) is a no-op.
642        assert_eq!(
643            reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap(),
644            3,
645            "stale save must not rewind",
646        );
647        // Equal save is a no-op (idempotent retry).
648        assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3,);
649        // Advance forward.
650        assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 5).unwrap(), 5,);
651        assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 5);
652    }
653
654    #[test]
655    fn get_offset_defaults_to_zero_for_new_consumer() {
656        let reg = StreamRegistry::new();
657        reg.create_stream(t("acme"), "orders", StreamRetention::default())
658            .unwrap();
659        assert_eq!(reg.get_offset(&t("acme"), "orders", "fresh").unwrap(), 0);
660    }
661
662    #[test]
663    fn consumer_offsets_are_isolated_per_consumer() {
664        let reg = StreamRegistry::new();
665        reg.create_stream(t("acme"), "orders", StreamRetention::default())
666            .unwrap();
667        reg.append(t("acme"), "orders", None, "x", 0).unwrap();
668        reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap();
669        assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 1);
670        assert_eq!(reg.get_offset(&t("acme"), "orders", "c2").unwrap(), 0);
671    }
672
673    #[test]
674    fn streams_are_tenant_isolated() {
675        let reg = StreamRegistry::new();
676        reg.create_stream(t("acme"), "orders", StreamRetention::default())
677            .unwrap();
678        reg.create_stream(t("globex"), "orders", StreamRetention::default())
679            .unwrap();
680        reg.append(t("acme"), "orders", None, "acme-only", 0)
681            .unwrap();
682        let globex_events = reg.read_since(&t("globex"), "orders", 0, 100).unwrap();
683        assert!(
684            globex_events.is_empty(),
685            "globex must not see acme's events"
686        );
687        // Same name in different scopes resolves to different
688        // streams — list filters by scope.
689        assert_eq!(reg.list_streams(&t("acme")).len(), 1);
690        assert_eq!(reg.list_streams(&t("globex")).len(), 1);
691    }
692
693    #[test]
694    fn retention_max_events_drops_oldest() {
695        let reg = StreamRegistry::new();
696        reg.create_stream(
697            t("acme"),
698            "orders",
699            StreamRetention {
700                max_events: Some(3),
701                max_age_ms: None,
702            },
703        )
704        .unwrap();
705        for i in 0..5 {
706            reg.append(t("acme"), "orders", None, "x", 100 + i as u128)
707                .unwrap();
708        }
709        let desc = reg.describe(&t("acme"), "orders").unwrap();
710        // Newest 3 retained: offsets 3, 4, 5. Head moved to 3.
711        assert_eq!(desc.event_count, 3);
712        assert_eq!(desc.head_offset, 3);
713        assert_eq!(desc.tail_offset, 5);
714        let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
715        assert_eq!(
716            events.iter().map(|e| e.offset).collect::<Vec<_>>(),
717            vec![3, 4, 5],
718        );
719    }
720
721    #[test]
722    fn retention_max_age_drops_old_events() {
723        let reg = StreamRegistry::new();
724        reg.create_stream(
725            t("acme"),
726            "orders",
727            StreamRetention {
728                max_events: None,
729                max_age_ms: Some(1_000),
730            },
731        )
732        .unwrap();
733        reg.append(t("acme"), "orders", None, "old", 0).unwrap();
734        reg.append(t("acme"), "orders", None, "old2", 500).unwrap();
735        // This append's `now_ms` triggers a retention pass — events
736        // with appended_at_ms < (10_000 - 1_000) = 9_000 are dropped.
737        reg.append(t("acme"), "orders", None, "fresh", 10_000)
738            .unwrap();
739        let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
740        assert_eq!(events.len(), 1);
741        assert_eq!(events[0].payload, "fresh");
742        assert_eq!(events[0].offset, 3, "retention must not rewrite offsets");
743    }
744
745    #[test]
746    fn consumer_lagged_past_retention_does_not_error() {
747        let reg = StreamRegistry::new();
748        reg.create_stream(
749            t("acme"),
750            "orders",
751            StreamRetention {
752                max_events: Some(2),
753                max_age_ms: None,
754            },
755        )
756        .unwrap();
757        for i in 0..5 {
758            reg.append(t("acme"), "orders", None, "x", i).unwrap();
759        }
760        // Consumer had saved offset 1, but retention has advanced
761        // the head to 4. read_since must just return the current
762        // window rather than erroring — operators detect lag by
763        // comparing get_offset against descriptor.head_offset.
764        let events = reg.read_since(&t("acme"), "orders", 2, 100).unwrap();
765        assert_eq!(
766            events.iter().map(|e| e.offset).collect::<Vec<_>>(),
767            vec![4, 5],
768        );
769    }
770
771    #[test]
772    fn same_tenant_append_does_not_require_cross_tenant_cap() {
773        let reg = StreamRegistry::new();
774        reg.create_stream(t("acme"), "orders", StreamRetention::default())
775            .unwrap();
776        let offset = reg
777            .append_authorized(Some("acme"), t("acme"), "orders", None, "x", false, 0)
778            .expect("same-tenant append must succeed without cross-tenant cap");
779        assert_eq!(offset, 1);
780
781        reg.read_since_authorized(Some("acme"), t("acme"), "orders", 0, 100, false)
782            .expect("same-tenant read must succeed without cross-tenant cap");
783    }
784
785    #[test]
786    fn cross_tenant_append_denied_without_cap() {
787        let reg = StreamRegistry::new();
788        reg.create_stream(t("globex"), "orders", StreamRetention::default())
789            .unwrap();
790        let err = reg
791            .append_authorized(Some("acme"), t("globex"), "orders", None, "leak", false, 0)
792            .expect_err("cross-tenant append must be denied without cap");
793        match err {
794            StreamError::CrossTenantDenied {
795                principal_tenant,
796                target,
797                stream,
798            } => {
799                assert_eq!(principal_tenant.as_deref(), Some("acme"));
800                assert_eq!(target, t("globex"));
801                assert_eq!(stream, "orders");
802            }
803            other => panic!("unexpected error: {other:?}"),
804        }
805    }
806
807    #[test]
808    fn cross_tenant_read_denied_without_cap() {
809        let reg = StreamRegistry::new();
810        reg.create_stream(t("globex"), "orders", StreamRetention::default())
811            .unwrap();
812        let err = reg
813            .read_since_authorized(Some("acme"), t("globex"), "orders", 0, 100, false)
814            .expect_err("cross-tenant read must be denied without cap");
815        assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
816    }
817
818    #[test]
819    fn cross_tenant_append_allowed_with_cap() {
820        let reg = StreamRegistry::new();
821        reg.create_stream(t("globex"), "orders", StreamRetention::default())
822            .unwrap();
823        let offset = reg
824            .append_authorized(
825                Some("acme"),
826                t("globex"),
827                "orders",
828                None,
829                "allowed",
830                true,
831                0,
832            )
833            .expect("append with cross-tenant cap must succeed");
834        assert_eq!(offset, 1);
835    }
836
837    #[test]
838    fn global_scope_requires_cross_tenant_cap_for_tenant_principal() {
839        let reg = StreamRegistry::new();
840        reg.create_stream(StreamScope::Global, "platform", StreamRetention::default())
841            .unwrap();
842        let err = reg
843            .append_authorized(
844                Some("acme"),
845                StreamScope::Global,
846                "platform",
847                None,
848                "leak",
849                false,
850                0,
851            )
852            .expect_err("tenant principal targeting Global must require cap");
853        assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
854
855        // Platform principal (tenant=None) targeting Global is
856        // same-scope and needs no extra cap.
857        let offset = reg
858            .append_authorized(None, StreamScope::Global, "platform", None, "ok", false, 0)
859            .expect("platform principal targeting global is same-scope");
860        assert_eq!(offset, 1);
861    }
862
863    #[test]
864    fn from_principal_tenant_maps_correctly() {
865        assert_eq!(
866            StreamScope::from_principal_tenant(Some("acme")),
867            StreamScope::Tenant("acme".into())
868        );
869        assert_eq!(
870            StreamScope::from_principal_tenant(None),
871            StreamScope::Global
872        );
873    }
874
875    #[test]
876    fn event_carries_optional_key_for_future_cdc() {
877        let reg = StreamRegistry::new();
878        reg.create_stream(t("acme"), "rows", StreamRetention::default())
879            .unwrap();
880        reg.append(t("acme"), "rows", Some("user:42".into()), "{}", 0)
881            .unwrap();
882        let events = reg.read_since(&t("acme"), "rows", 0, 100).unwrap();
883        assert_eq!(events[0].key.as_deref(), Some("user:42"));
884    }
885}