Expand description
Durable stream primitive (issue #721, PRD #718).
Tenant-scoped, append-only event log with monotonic per-consumer offsets. Streams are the third leg of ADR 0028’s split — queues own per-message delivery state (ACK/NACK/DLQ), ephemeral notifications own no state at all, and streams own an immutable ordered log plus a small per-consumer offset bookkeeping table. Reading a stream never creates pending delivery state and never requires ACK or NACK; advancing the saved offset is the only “I’m done with this prefix” signal.
§Contract surface
StreamRegistry::create_stream— declare a new stream with a retention contract. The stream becomes discoverable viaStreamRegistry::list_streams.StreamRegistry::append/StreamRegistry::append_authorized— append an event payload with an optional stream-identity key. Returns the assigned offset (u64, sequence within the stream).StreamRegistry::read_since/StreamRegistry::read_since_authorized— read up tolimitevents with offset>= from. Read does NOT consume, lease, or leave pending state behind, and does NOT advance the consumer’s saved offset — that is the caller’s explicit responsibility viasave_offset.StreamRegistry::save_offset/StreamRegistry::get_offset— persist a consumer’s progress. Saving is monotonic: a smaller or equal offset is dropped silently and the previously-saved value is returned. This makes the operation safe to retry on duplicate or stale acks without rewinding a consumer past events it already finished.
§Retention contract (first cut)
Each stream carries a StreamRetention describing how the
engine prunes old events. The first cut supports two independent
caps that compose by AND (the stricter wins):
max_events: Option<usize>— drop the oldest events so the log never exceeds N entries.max_age_ms: Option<u64>— drop events older thannow - max_age_ms.
Retention is applied at append time. A retention pass never
rewrites the offset of surviving events — offsets remain sparse
once the head moves forward. Consumers whose saved offset has
fallen below the current head simply skip the truncated prefix
the next time they call read_since; the engine does not raise
an error for “consumer lagged past retention”. Operators who care
about that condition can compare get_offset(consumer) against
the descriptor’s head_offset themselves.
§Authorization model
Mirrors the crate::notifications pattern: the registry never
consults policies directly. Transports evaluate the stream
action (and stream:cross-tenant for cross-tenant addressing)
against the principal’s effective policies and pass the resulting
has_cross_tenant_cap: bool into the _authorized entry points.
Same-scope operations succeed without the extra capability;
everything else returns StreamError::CrossTenantDenied with
the principal / target / stream triple preserved for audit.
§CDC compatibility
StreamEvent carries key and payload as opaque UTF-8
strings, plus the engine-assigned offset and appended_at_ms.
That shape is intentionally the standard change-data-capture log
shape: a later materialized-CDC slice can populate the same event
type from a table’s mutation tail, with key becoming the row
primary key and payload the row JSON. Nothing in this module
commits the engine to a specific CDC strategy — the contract is
deliberately open on that axis.
§Durability
The first slice of the primitive is an in-process append-only
log. The registry is Send + Sync and intended to live behind
an Arc on the runtime; persistence to disk-backed storage is a
follow-up slice tracked under the same PRD (#718) — it does not
change the public contract above, only where the bytes live.
Structs§
- Stream
Descriptor - Public-facing descriptor for stream discovery — what an
introspection surface (e.g. a future
red.streamsvirtual table) would emit per stream. - Stream
Event - A single event in the stream log.
offsetis engine-assigned and monotonically increasing within(scope, stream); retention pruning may advance the head past low offsets but never reuses or rewrites them. - Stream
Registry - In-memory registry of durable streams.
- Stream
Retention - Retention contract for a single stream. Both fields are
independently optional; pass
StreamRetention::default()for an unbounded stream (no retention pruning).
Enums§
- Stream
Error - Errors surfaced by the stream registry.
- Stream
Scope - Scope of a stream — tenant-isolated by default. Mirrors
crate::notifications::NotificationScope.