use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum StreamScope {
Tenant(String),
Global,
}
impl StreamScope {
pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
match tenant {
Some(t) => StreamScope::Tenant(t.to_string()),
None => StreamScope::Global,
}
}
pub fn label(&self) -> String {
match self {
StreamScope::Tenant(t) => format!("tenant:{t}"),
StreamScope::Global => "global".to_string(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct StreamRetention {
pub max_events: Option<usize>,
pub max_age_ms: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamEvent {
pub scope: StreamScope,
pub stream: String,
pub key: Option<String>,
pub payload: String,
pub offset: u64,
pub appended_at_ms: u128,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamDescriptor {
pub scope: StreamScope,
pub name: String,
pub retention: StreamRetention,
pub head_offset: u64,
pub tail_offset: u64,
pub event_count: usize,
}
#[derive(Debug, PartialEq, Eq)]
pub enum StreamError {
AlreadyExists { scope: StreamScope, name: String },
NotFound { scope: StreamScope, name: String },
CrossTenantDenied {
principal_tenant: Option<String>,
target: StreamScope,
stream: String,
},
}
impl std::fmt::Display for StreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamError::AlreadyExists { scope, name } => {
write!(f, "stream: `{}/{}` already exists", scope.label(), name)
}
StreamError::NotFound { scope, name } => {
write!(f, "stream: `{}/{}` not found", scope.label(), name)
}
StreamError::CrossTenantDenied {
principal_tenant,
target,
stream,
} => {
let from = principal_tenant.as_deref().unwrap_or("<platform>");
write!(
f,
"stream: principal in tenant `{}` is not allowed to address `{}` stream `{}` without the `stream:cross-tenant` capability",
from,
target.label(),
stream
)
}
}
}
}
impl std::error::Error for StreamError {}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StreamKey {
scope: StreamScope,
name: String,
}
#[derive(Debug)]
struct DurableStream {
retention: StreamRetention,
events: Vec<StreamEvent>,
next_offset: u64,
consumer_offsets: HashMap<String, u64>,
}
impl DurableStream {
fn new(retention: StreamRetention) -> Self {
Self {
retention,
events: Vec::new(),
next_offset: 1,
consumer_offsets: HashMap::new(),
}
}
fn descriptor(&self, scope: StreamScope, name: String) -> StreamDescriptor {
let head_offset = self.events.first().map(|e| e.offset).unwrap_or(0);
let tail_offset = self.events.last().map(|e| e.offset).unwrap_or(0);
StreamDescriptor {
scope,
name,
retention: self.retention.clone(),
head_offset,
tail_offset,
event_count: self.events.len(),
}
}
fn apply_retention(&mut self, now_ms: u128) {
if let Some(max_events) = self.retention.max_events {
while self.events.len() > max_events {
self.events.remove(0);
}
}
if let Some(max_age_ms) = self.retention.max_age_ms {
let cutoff = now_ms.saturating_sub(max_age_ms as u128);
while let Some(first) = self.events.first() {
if first.appended_at_ms < cutoff {
self.events.remove(0);
} else {
break;
}
}
}
}
}
#[derive(Default, Clone)]
pub struct StreamRegistry {
inner: Arc<Mutex<HashMap<StreamKey, DurableStream>>>,
}
impl StreamRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn create_stream(
&self,
scope: StreamScope,
name: impl Into<String>,
retention: StreamRetention,
) -> Result<(), StreamError> {
let name = name.into();
let key = StreamKey {
scope: scope.clone(),
name: name.clone(),
};
let mut guard = self.inner.lock();
if guard.contains_key(&key) {
return Err(StreamError::AlreadyExists { scope, name });
}
guard.insert(key, DurableStream::new(retention));
Ok(())
}
pub fn exists(&self, scope: &StreamScope, name: &str) -> bool {
let key = StreamKey {
scope: scope.clone(),
name: name.to_string(),
};
self.inner.lock().contains_key(&key)
}
pub fn list_streams(&self, scope: &StreamScope) -> Vec<StreamDescriptor> {
let guard = self.inner.lock();
guard
.iter()
.filter(|(k, _)| &k.scope == scope)
.map(|(k, s)| s.descriptor(k.scope.clone(), k.name.clone()))
.collect()
}
pub fn describe(&self, scope: &StreamScope, name: &str) -> Option<StreamDescriptor> {
let key = StreamKey {
scope: scope.clone(),
name: name.to_string(),
};
let guard = self.inner.lock();
guard.get(&key).map(|s| s.descriptor(key.scope, key.name))
}
pub fn append(
&self,
scope: StreamScope,
name: impl Into<String>,
key: Option<String>,
payload: impl Into<String>,
now_ms: u128,
) -> Result<u64, StreamError> {
let name = name.into();
let lookup_key = StreamKey {
scope: scope.clone(),
name: name.clone(),
};
let mut guard = self.inner.lock();
let stream = guard
.get_mut(&lookup_key)
.ok_or_else(|| StreamError::NotFound {
scope: scope.clone(),
name: name.clone(),
})?;
let offset = stream.next_offset;
stream.next_offset += 1;
stream.events.push(StreamEvent {
scope,
stream: name,
key,
payload: payload.into(),
offset,
appended_at_ms: now_ms,
});
stream.apply_retention(now_ms);
Ok(offset)
}
pub fn append_authorized(
&self,
principal_tenant: Option<&str>,
target: StreamScope,
name: impl Into<String>,
key: Option<String>,
payload: impl Into<String>,
has_cross_tenant_cap: bool,
now_ms: u128,
) -> Result<u64, StreamError> {
let name = name.into();
Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
self.append(target, name, key, payload, now_ms)
}
pub fn read_since(
&self,
scope: &StreamScope,
name: &str,
from: u64,
limit: usize,
) -> Result<Vec<StreamEvent>, StreamError> {
let key = StreamKey {
scope: scope.clone(),
name: name.to_string(),
};
let guard = self.inner.lock();
let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
scope: scope.clone(),
name: name.to_string(),
})?;
Ok(stream
.events
.iter()
.filter(|e| e.offset >= from)
.take(limit)
.cloned()
.collect())
}
pub fn read_since_authorized(
&self,
principal_tenant: Option<&str>,
target: StreamScope,
name: impl Into<String>,
from: u64,
limit: usize,
has_cross_tenant_cap: bool,
) -> Result<Vec<StreamEvent>, StreamError> {
let name = name.into();
Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
self.read_since(&target, &name, from, limit)
}
pub fn save_offset(
&self,
scope: &StreamScope,
name: &str,
consumer: &str,
offset: u64,
) -> Result<u64, StreamError> {
let key = StreamKey {
scope: scope.clone(),
name: name.to_string(),
};
let mut guard = self.inner.lock();
let stream = guard.get_mut(&key).ok_or_else(|| StreamError::NotFound {
scope: scope.clone(),
name: name.to_string(),
})?;
let entry = stream
.consumer_offsets
.entry(consumer.to_string())
.or_insert(0);
if offset > *entry {
*entry = offset;
}
Ok(*entry)
}
pub fn get_offset(
&self,
scope: &StreamScope,
name: &str,
consumer: &str,
) -> Result<u64, StreamError> {
let key = StreamKey {
scope: scope.clone(),
name: name.to_string(),
};
let guard = self.inner.lock();
let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
scope: scope.clone(),
name: name.to_string(),
})?;
Ok(stream.consumer_offsets.get(consumer).copied().unwrap_or(0))
}
fn authorize(
principal_tenant: Option<&str>,
target: &StreamScope,
stream: &str,
has_cross_tenant_cap: bool,
) -> Result<(), StreamError> {
let same_scope = match (principal_tenant, target) {
(Some(pt), StreamScope::Tenant(tt)) => pt == tt,
(None, StreamScope::Global) => true,
_ => false,
};
if same_scope || has_cross_tenant_cap {
return Ok(());
}
Err(StreamError::CrossTenantDenied {
principal_tenant: principal_tenant.map(str::to_string),
target: target.clone(),
stream: stream.to_string(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn t(name: &str) -> StreamScope {
StreamScope::Tenant(name.into())
}
#[test]
fn create_then_discover_via_list() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
let listed = reg.list_streams(&t("acme"));
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].name, "orders");
assert_eq!(listed[0].event_count, 0);
assert_eq!(listed[0].head_offset, 0);
assert_eq!(listed[0].tail_offset, 0);
assert!(reg.exists(&t("acme"), "orders"));
assert!(reg.describe(&t("acme"), "orders").is_some());
}
#[test]
fn duplicate_create_rejected() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
let err = reg
.create_stream(t("acme"), "orders", StreamRetention::default())
.expect_err("dup create must fail");
assert!(matches!(err, StreamError::AlreadyExists { .. }));
}
#[test]
fn append_assigns_monotonic_offsets() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
let o1 = reg.append(t("acme"), "orders", None, "a", 100).unwrap();
let o2 = reg
.append(t("acme"), "orders", Some("k".into()), "b", 101)
.unwrap();
let o3 = reg.append(t("acme"), "orders", None, "c", 102).unwrap();
assert_eq!((o1, o2, o3), (1, 2, 3));
let desc = reg.describe(&t("acme"), "orders").unwrap();
assert_eq!(desc.head_offset, 1);
assert_eq!(desc.tail_offset, 3);
assert_eq!(desc.event_count, 3);
}
#[test]
fn append_on_unknown_stream_errors() {
let reg = StreamRegistry::new();
let err = reg
.append(t("acme"), "missing", None, "x", 0)
.expect_err("append on unknown stream must error");
assert!(matches!(err, StreamError::NotFound { .. }));
}
#[test]
fn read_since_returns_events_from_offset() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
for (i, payload) in ["a", "b", "c", "d"].iter().enumerate() {
reg.append(t("acme"), "orders", None, *payload, 100 + i as u128)
.unwrap();
}
let from_start = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
assert_eq!(from_start.len(), 4);
assert_eq!(from_start[0].offset, 1);
assert_eq!(from_start[3].payload, "d");
let from_middle = reg.read_since(&t("acme"), "orders", 3, 100).unwrap();
assert_eq!(from_middle.len(), 2);
assert_eq!(from_middle[0].offset, 3);
assert_eq!(from_middle[1].offset, 4);
let bounded = reg.read_since(&t("acme"), "orders", 0, 2).unwrap();
assert_eq!(bounded.len(), 2);
assert_eq!(bounded[1].offset, 2);
}
#[test]
fn read_does_not_advance_consumer_offset_no_pending_state() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
for i in 0..3 {
reg.append(t("acme"), "orders", None, "x", i).unwrap();
}
for _ in 0..3 {
let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
assert_eq!(events.len(), 3);
}
assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 0);
}
#[test]
fn save_offset_is_monotonic() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
for i in 0..5 {
reg.append(t("acme"), "orders", None, "x", i).unwrap();
}
assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3);
assert_eq!(
reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap(),
3,
"stale save must not rewind",
);
assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3,);
assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 5).unwrap(), 5,);
assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 5);
}
#[test]
fn get_offset_defaults_to_zero_for_new_consumer() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
assert_eq!(reg.get_offset(&t("acme"), "orders", "fresh").unwrap(), 0);
}
#[test]
fn consumer_offsets_are_isolated_per_consumer() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
reg.append(t("acme"), "orders", None, "x", 0).unwrap();
reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap();
assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 1);
assert_eq!(reg.get_offset(&t("acme"), "orders", "c2").unwrap(), 0);
}
#[test]
fn streams_are_tenant_isolated() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
reg.create_stream(t("globex"), "orders", StreamRetention::default())
.unwrap();
reg.append(t("acme"), "orders", None, "acme-only", 0)
.unwrap();
let globex_events = reg.read_since(&t("globex"), "orders", 0, 100).unwrap();
assert!(
globex_events.is_empty(),
"globex must not see acme's events"
);
assert_eq!(reg.list_streams(&t("acme")).len(), 1);
assert_eq!(reg.list_streams(&t("globex")).len(), 1);
}
#[test]
fn retention_max_events_drops_oldest() {
let reg = StreamRegistry::new();
reg.create_stream(
t("acme"),
"orders",
StreamRetention {
max_events: Some(3),
max_age_ms: None,
},
)
.unwrap();
for i in 0..5 {
reg.append(t("acme"), "orders", None, "x", 100 + i as u128)
.unwrap();
}
let desc = reg.describe(&t("acme"), "orders").unwrap();
assert_eq!(desc.event_count, 3);
assert_eq!(desc.head_offset, 3);
assert_eq!(desc.tail_offset, 5);
let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
assert_eq!(
events.iter().map(|e| e.offset).collect::<Vec<_>>(),
vec![3, 4, 5],
);
}
#[test]
fn retention_max_age_drops_old_events() {
let reg = StreamRegistry::new();
reg.create_stream(
t("acme"),
"orders",
StreamRetention {
max_events: None,
max_age_ms: Some(1_000),
},
)
.unwrap();
reg.append(t("acme"), "orders", None, "old", 0).unwrap();
reg.append(t("acme"), "orders", None, "old2", 500).unwrap();
reg.append(t("acme"), "orders", None, "fresh", 10_000)
.unwrap();
let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].payload, "fresh");
assert_eq!(events[0].offset, 3, "retention must not rewrite offsets");
}
#[test]
fn consumer_lagged_past_retention_does_not_error() {
let reg = StreamRegistry::new();
reg.create_stream(
t("acme"),
"orders",
StreamRetention {
max_events: Some(2),
max_age_ms: None,
},
)
.unwrap();
for i in 0..5 {
reg.append(t("acme"), "orders", None, "x", i).unwrap();
}
let events = reg.read_since(&t("acme"), "orders", 2, 100).unwrap();
assert_eq!(
events.iter().map(|e| e.offset).collect::<Vec<_>>(),
vec![4, 5],
);
}
#[test]
fn same_tenant_append_does_not_require_cross_tenant_cap() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "orders", StreamRetention::default())
.unwrap();
let offset = reg
.append_authorized(Some("acme"), t("acme"), "orders", None, "x", false, 0)
.expect("same-tenant append must succeed without cross-tenant cap");
assert_eq!(offset, 1);
reg.read_since_authorized(Some("acme"), t("acme"), "orders", 0, 100, false)
.expect("same-tenant read must succeed without cross-tenant cap");
}
#[test]
fn cross_tenant_append_denied_without_cap() {
let reg = StreamRegistry::new();
reg.create_stream(t("globex"), "orders", StreamRetention::default())
.unwrap();
let err = reg
.append_authorized(Some("acme"), t("globex"), "orders", None, "leak", false, 0)
.expect_err("cross-tenant append must be denied without cap");
match err {
StreamError::CrossTenantDenied {
principal_tenant,
target,
stream,
} => {
assert_eq!(principal_tenant.as_deref(), Some("acme"));
assert_eq!(target, t("globex"));
assert_eq!(stream, "orders");
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn cross_tenant_read_denied_without_cap() {
let reg = StreamRegistry::new();
reg.create_stream(t("globex"), "orders", StreamRetention::default())
.unwrap();
let err = reg
.read_since_authorized(Some("acme"), t("globex"), "orders", 0, 100, false)
.expect_err("cross-tenant read must be denied without cap");
assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
}
#[test]
fn cross_tenant_append_allowed_with_cap() {
let reg = StreamRegistry::new();
reg.create_stream(t("globex"), "orders", StreamRetention::default())
.unwrap();
let offset = reg
.append_authorized(
Some("acme"),
t("globex"),
"orders",
None,
"allowed",
true,
0,
)
.expect("append with cross-tenant cap must succeed");
assert_eq!(offset, 1);
}
#[test]
fn global_scope_requires_cross_tenant_cap_for_tenant_principal() {
let reg = StreamRegistry::new();
reg.create_stream(StreamScope::Global, "platform", StreamRetention::default())
.unwrap();
let err = reg
.append_authorized(
Some("acme"),
StreamScope::Global,
"platform",
None,
"leak",
false,
0,
)
.expect_err("tenant principal targeting Global must require cap");
assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
let offset = reg
.append_authorized(None, StreamScope::Global, "platform", None, "ok", false, 0)
.expect("platform principal targeting global is same-scope");
assert_eq!(offset, 1);
}
#[test]
fn from_principal_tenant_maps_correctly() {
assert_eq!(
StreamScope::from_principal_tenant(Some("acme")),
StreamScope::Tenant("acme".into())
);
assert_eq!(
StreamScope::from_principal_tenant(None),
StreamScope::Global
);
}
#[test]
fn event_carries_optional_key_for_future_cdc() {
let reg = StreamRegistry::new();
reg.create_stream(t("acme"), "rows", StreamRetention::default())
.unwrap();
reg.append(t("acme"), "rows", Some("user:42".into()), "{}", 0)
.unwrap();
let events = reg.read_since(&t("acme"), "rows", 0, 100).unwrap();
assert_eq!(events[0].key.as_deref(), Some("user:42"));
}
}