1use std::collections::HashMap;
84use std::sync::Arc;
85
86use parking_lot::Mutex;
87
88#[derive(Debug, Clone, PartialEq, Eq, Hash)]
91pub enum StreamScope {
92 Tenant(String),
94 Global,
96}
97
98impl StreamScope {
99 pub fn from_principal_tenant(tenant: Option<&str>) -> Self {
106 match tenant {
107 Some(t) => StreamScope::Tenant(t.to_string()),
108 None => StreamScope::Global,
109 }
110 }
111
112 pub fn label(&self) -> String {
114 match self {
115 StreamScope::Tenant(t) => format!("tenant:{t}"),
116 StreamScope::Global => "global".to_string(),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Default, PartialEq, Eq)]
125pub struct StreamRetention {
126 pub max_events: Option<usize>,
129 pub max_age_ms: Option<u64>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct StreamEvent {
140 pub scope: StreamScope,
141 pub stream: String,
142 pub key: Option<String>,
147 pub payload: String,
150 pub offset: u64,
156 pub appended_at_ms: u128,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
163pub struct StreamDescriptor {
164 pub scope: StreamScope,
165 pub name: String,
166 pub retention: StreamRetention,
167 pub head_offset: u64,
170 pub tail_offset: u64,
173 pub event_count: usize,
174}
175
176#[derive(Debug, PartialEq, Eq)]
178pub enum StreamError {
179 AlreadyExists { scope: StreamScope, name: String },
182 NotFound { scope: StreamScope, name: String },
184 CrossTenantDenied {
187 principal_tenant: Option<String>,
188 target: StreamScope,
189 stream: String,
190 },
191}
192
193impl std::fmt::Display for StreamError {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 match self {
196 StreamError::AlreadyExists { scope, name } => {
197 write!(f, "stream: `{}/{}` already exists", scope.label(), name)
198 }
199 StreamError::NotFound { scope, name } => {
200 write!(f, "stream: `{}/{}` not found", scope.label(), name)
201 }
202 StreamError::CrossTenantDenied {
203 principal_tenant,
204 target,
205 stream,
206 } => {
207 let from = principal_tenant.as_deref().unwrap_or("<platform>");
208 write!(
209 f,
210 "stream: principal in tenant `{}` is not allowed to address `{}` stream `{}` without the `stream:cross-tenant` capability",
211 from,
212 target.label(),
213 stream
214 )
215 }
216 }
217 }
218}
219
220impl std::error::Error for StreamError {}
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
223struct StreamKey {
224 scope: StreamScope,
225 name: String,
226}
227
228#[derive(Debug)]
229struct DurableStream {
230 retention: StreamRetention,
231 events: Vec<StreamEvent>,
235 next_offset: u64,
237 consumer_offsets: HashMap<String, u64>,
240}
241
242impl DurableStream {
243 fn new(retention: StreamRetention) -> Self {
244 Self {
245 retention,
246 events: Vec::new(),
247 next_offset: 1,
248 consumer_offsets: HashMap::new(),
249 }
250 }
251
252 fn descriptor(&self, scope: StreamScope, name: String) -> StreamDescriptor {
253 let head_offset = self.events.first().map(|e| e.offset).unwrap_or(0);
254 let tail_offset = self.events.last().map(|e| e.offset).unwrap_or(0);
255 StreamDescriptor {
256 scope,
257 name,
258 retention: self.retention.clone(),
259 head_offset,
260 tail_offset,
261 event_count: self.events.len(),
262 }
263 }
264
265 fn apply_retention(&mut self, now_ms: u128) {
266 if let Some(max_events) = self.retention.max_events {
267 while self.events.len() > max_events {
268 self.events.remove(0);
269 }
270 }
271 if let Some(max_age_ms) = self.retention.max_age_ms {
272 let cutoff = now_ms.saturating_sub(max_age_ms as u128);
273 while let Some(first) = self.events.first() {
274 if first.appended_at_ms < cutoff {
275 self.events.remove(0);
276 } else {
277 break;
278 }
279 }
280 }
281 }
282}
283
284#[derive(Default, Clone)]
286pub struct StreamRegistry {
287 inner: Arc<Mutex<HashMap<StreamKey, DurableStream>>>,
288}
289
290impl StreamRegistry {
291 pub fn new() -> Self {
292 Self::default()
293 }
294
295 pub fn create_stream(
300 &self,
301 scope: StreamScope,
302 name: impl Into<String>,
303 retention: StreamRetention,
304 ) -> Result<(), StreamError> {
305 let name = name.into();
306 let key = StreamKey {
307 scope: scope.clone(),
308 name: name.clone(),
309 };
310 let mut guard = self.inner.lock();
311 if guard.contains_key(&key) {
312 return Err(StreamError::AlreadyExists { scope, name });
313 }
314 guard.insert(key, DurableStream::new(retention));
315 Ok(())
316 }
317
318 pub fn exists(&self, scope: &StreamScope, name: &str) -> bool {
320 let key = StreamKey {
321 scope: scope.clone(),
322 name: name.to_string(),
323 };
324 self.inner.lock().contains_key(&key)
325 }
326
327 pub fn list_streams(&self, scope: &StreamScope) -> Vec<StreamDescriptor> {
332 let guard = self.inner.lock();
333 guard
334 .iter()
335 .filter(|(k, _)| &k.scope == scope)
336 .map(|(k, s)| s.descriptor(k.scope.clone(), k.name.clone()))
337 .collect()
338 }
339
340 pub fn describe(&self, scope: &StreamScope, name: &str) -> Option<StreamDescriptor> {
342 let key = StreamKey {
343 scope: scope.clone(),
344 name: name.to_string(),
345 };
346 let guard = self.inner.lock();
347 guard.get(&key).map(|s| s.descriptor(key.scope, key.name))
348 }
349
350 pub fn append(
355 &self,
356 scope: StreamScope,
357 name: impl Into<String>,
358 key: Option<String>,
359 payload: impl Into<String>,
360 now_ms: u128,
361 ) -> Result<u64, StreamError> {
362 let name = name.into();
363 let lookup_key = StreamKey {
364 scope: scope.clone(),
365 name: name.clone(),
366 };
367 let mut guard = self.inner.lock();
368 let stream = guard
369 .get_mut(&lookup_key)
370 .ok_or_else(|| StreamError::NotFound {
371 scope: scope.clone(),
372 name: name.clone(),
373 })?;
374 let offset = stream.next_offset;
375 stream.next_offset += 1;
376 stream.events.push(StreamEvent {
377 scope,
378 stream: name,
379 key,
380 payload: payload.into(),
381 offset,
382 appended_at_ms: now_ms,
383 });
384 stream.apply_retention(now_ms);
385 Ok(offset)
386 }
387
388 pub fn append_authorized(
390 &self,
391 principal_tenant: Option<&str>,
392 target: StreamScope,
393 name: impl Into<String>,
394 key: Option<String>,
395 payload: impl Into<String>,
396 has_cross_tenant_cap: bool,
397 now_ms: u128,
398 ) -> Result<u64, StreamError> {
399 let name = name.into();
400 Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
401 self.append(target, name, key, payload, now_ms)
402 }
403
404 pub fn read_since(
411 &self,
412 scope: &StreamScope,
413 name: &str,
414 from: u64,
415 limit: usize,
416 ) -> Result<Vec<StreamEvent>, StreamError> {
417 let key = StreamKey {
418 scope: scope.clone(),
419 name: name.to_string(),
420 };
421 let guard = self.inner.lock();
422 let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
423 scope: scope.clone(),
424 name: name.to_string(),
425 })?;
426 Ok(stream
427 .events
428 .iter()
429 .filter(|e| e.offset >= from)
430 .take(limit)
431 .cloned()
432 .collect())
433 }
434
435 pub fn read_since_authorized(
437 &self,
438 principal_tenant: Option<&str>,
439 target: StreamScope,
440 name: impl Into<String>,
441 from: u64,
442 limit: usize,
443 has_cross_tenant_cap: bool,
444 ) -> Result<Vec<StreamEvent>, StreamError> {
445 let name = name.into();
446 Self::authorize(principal_tenant, &target, &name, has_cross_tenant_cap)?;
447 self.read_since(&target, &name, from, limit)
448 }
449
450 pub fn save_offset(
458 &self,
459 scope: &StreamScope,
460 name: &str,
461 consumer: &str,
462 offset: u64,
463 ) -> Result<u64, StreamError> {
464 let key = StreamKey {
465 scope: scope.clone(),
466 name: name.to_string(),
467 };
468 let mut guard = self.inner.lock();
469 let stream = guard.get_mut(&key).ok_or_else(|| StreamError::NotFound {
470 scope: scope.clone(),
471 name: name.to_string(),
472 })?;
473 let entry = stream
474 .consumer_offsets
475 .entry(consumer.to_string())
476 .or_insert(0);
477 if offset > *entry {
478 *entry = offset;
479 }
480 Ok(*entry)
481 }
482
483 pub fn get_offset(
488 &self,
489 scope: &StreamScope,
490 name: &str,
491 consumer: &str,
492 ) -> Result<u64, StreamError> {
493 let key = StreamKey {
494 scope: scope.clone(),
495 name: name.to_string(),
496 };
497 let guard = self.inner.lock();
498 let stream = guard.get(&key).ok_or_else(|| StreamError::NotFound {
499 scope: scope.clone(),
500 name: name.to_string(),
501 })?;
502 Ok(stream.consumer_offsets.get(consumer).copied().unwrap_or(0))
503 }
504
505 fn authorize(
506 principal_tenant: Option<&str>,
507 target: &StreamScope,
508 stream: &str,
509 has_cross_tenant_cap: bool,
510 ) -> Result<(), StreamError> {
511 let same_scope = match (principal_tenant, target) {
512 (Some(pt), StreamScope::Tenant(tt)) => pt == tt,
513 (None, StreamScope::Global) => true,
517 _ => false,
518 };
519 if same_scope || has_cross_tenant_cap {
520 return Ok(());
521 }
522 Err(StreamError::CrossTenantDenied {
523 principal_tenant: principal_tenant.map(str::to_string),
524 target: target.clone(),
525 stream: stream.to_string(),
526 })
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533
534 fn t(name: &str) -> StreamScope {
535 StreamScope::Tenant(name.into())
536 }
537
538 #[test]
539 fn create_then_discover_via_list() {
540 let reg = StreamRegistry::new();
541 reg.create_stream(t("acme"), "orders", StreamRetention::default())
542 .unwrap();
543 let listed = reg.list_streams(&t("acme"));
544 assert_eq!(listed.len(), 1);
545 assert_eq!(listed[0].name, "orders");
546 assert_eq!(listed[0].event_count, 0);
547 assert_eq!(listed[0].head_offset, 0);
548 assert_eq!(listed[0].tail_offset, 0);
549 assert!(reg.exists(&t("acme"), "orders"));
550 assert!(reg.describe(&t("acme"), "orders").is_some());
551 }
552
553 #[test]
554 fn duplicate_create_rejected() {
555 let reg = StreamRegistry::new();
556 reg.create_stream(t("acme"), "orders", StreamRetention::default())
557 .unwrap();
558 let err = reg
559 .create_stream(t("acme"), "orders", StreamRetention::default())
560 .expect_err("dup create must fail");
561 assert!(matches!(err, StreamError::AlreadyExists { .. }));
562 }
563
564 #[test]
565 fn append_assigns_monotonic_offsets() {
566 let reg = StreamRegistry::new();
567 reg.create_stream(t("acme"), "orders", StreamRetention::default())
568 .unwrap();
569 let o1 = reg.append(t("acme"), "orders", None, "a", 100).unwrap();
570 let o2 = reg
571 .append(t("acme"), "orders", Some("k".into()), "b", 101)
572 .unwrap();
573 let o3 = reg.append(t("acme"), "orders", None, "c", 102).unwrap();
574 assert_eq!((o1, o2, o3), (1, 2, 3));
575 let desc = reg.describe(&t("acme"), "orders").unwrap();
576 assert_eq!(desc.head_offset, 1);
577 assert_eq!(desc.tail_offset, 3);
578 assert_eq!(desc.event_count, 3);
579 }
580
581 #[test]
582 fn append_on_unknown_stream_errors() {
583 let reg = StreamRegistry::new();
584 let err = reg
585 .append(t("acme"), "missing", None, "x", 0)
586 .expect_err("append on unknown stream must error");
587 assert!(matches!(err, StreamError::NotFound { .. }));
588 }
589
590 #[test]
591 fn read_since_returns_events_from_offset() {
592 let reg = StreamRegistry::new();
593 reg.create_stream(t("acme"), "orders", StreamRetention::default())
594 .unwrap();
595 for (i, payload) in ["a", "b", "c", "d"].iter().enumerate() {
596 reg.append(t("acme"), "orders", None, *payload, 100 + i as u128)
597 .unwrap();
598 }
599 let from_start = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
600 assert_eq!(from_start.len(), 4);
601 assert_eq!(from_start[0].offset, 1);
602 assert_eq!(from_start[3].payload, "d");
603
604 let from_middle = reg.read_since(&t("acme"), "orders", 3, 100).unwrap();
605 assert_eq!(from_middle.len(), 2);
606 assert_eq!(from_middle[0].offset, 3);
607 assert_eq!(from_middle[1].offset, 4);
608
609 let bounded = reg.read_since(&t("acme"), "orders", 0, 2).unwrap();
610 assert_eq!(bounded.len(), 2);
611 assert_eq!(bounded[1].offset, 2);
612 }
613
614 #[test]
615 fn read_does_not_advance_consumer_offset_no_pending_state() {
616 let reg = StreamRegistry::new();
617 reg.create_stream(t("acme"), "orders", StreamRetention::default())
618 .unwrap();
619 for i in 0..3 {
620 reg.append(t("acme"), "orders", None, "x", i).unwrap();
621 }
622 for _ in 0..3 {
626 let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
627 assert_eq!(events.len(), 3);
628 }
629 assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 0);
630 }
631
632 #[test]
633 fn save_offset_is_monotonic() {
634 let reg = StreamRegistry::new();
635 reg.create_stream(t("acme"), "orders", StreamRetention::default())
636 .unwrap();
637 for i in 0..5 {
638 reg.append(t("acme"), "orders", None, "x", i).unwrap();
639 }
640 assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3);
641 assert_eq!(
643 reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap(),
644 3,
645 "stale save must not rewind",
646 );
647 assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 3).unwrap(), 3,);
649 assert_eq!(reg.save_offset(&t("acme"), "orders", "c1", 5).unwrap(), 5,);
651 assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 5);
652 }
653
654 #[test]
655 fn get_offset_defaults_to_zero_for_new_consumer() {
656 let reg = StreamRegistry::new();
657 reg.create_stream(t("acme"), "orders", StreamRetention::default())
658 .unwrap();
659 assert_eq!(reg.get_offset(&t("acme"), "orders", "fresh").unwrap(), 0);
660 }
661
662 #[test]
663 fn consumer_offsets_are_isolated_per_consumer() {
664 let reg = StreamRegistry::new();
665 reg.create_stream(t("acme"), "orders", StreamRetention::default())
666 .unwrap();
667 reg.append(t("acme"), "orders", None, "x", 0).unwrap();
668 reg.save_offset(&t("acme"), "orders", "c1", 1).unwrap();
669 assert_eq!(reg.get_offset(&t("acme"), "orders", "c1").unwrap(), 1);
670 assert_eq!(reg.get_offset(&t("acme"), "orders", "c2").unwrap(), 0);
671 }
672
673 #[test]
674 fn streams_are_tenant_isolated() {
675 let reg = StreamRegistry::new();
676 reg.create_stream(t("acme"), "orders", StreamRetention::default())
677 .unwrap();
678 reg.create_stream(t("globex"), "orders", StreamRetention::default())
679 .unwrap();
680 reg.append(t("acme"), "orders", None, "acme-only", 0)
681 .unwrap();
682 let globex_events = reg.read_since(&t("globex"), "orders", 0, 100).unwrap();
683 assert!(
684 globex_events.is_empty(),
685 "globex must not see acme's events"
686 );
687 assert_eq!(reg.list_streams(&t("acme")).len(), 1);
690 assert_eq!(reg.list_streams(&t("globex")).len(), 1);
691 }
692
693 #[test]
694 fn retention_max_events_drops_oldest() {
695 let reg = StreamRegistry::new();
696 reg.create_stream(
697 t("acme"),
698 "orders",
699 StreamRetention {
700 max_events: Some(3),
701 max_age_ms: None,
702 },
703 )
704 .unwrap();
705 for i in 0..5 {
706 reg.append(t("acme"), "orders", None, "x", 100 + i as u128)
707 .unwrap();
708 }
709 let desc = reg.describe(&t("acme"), "orders").unwrap();
710 assert_eq!(desc.event_count, 3);
712 assert_eq!(desc.head_offset, 3);
713 assert_eq!(desc.tail_offset, 5);
714 let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
715 assert_eq!(
716 events.iter().map(|e| e.offset).collect::<Vec<_>>(),
717 vec![3, 4, 5],
718 );
719 }
720
721 #[test]
722 fn retention_max_age_drops_old_events() {
723 let reg = StreamRegistry::new();
724 reg.create_stream(
725 t("acme"),
726 "orders",
727 StreamRetention {
728 max_events: None,
729 max_age_ms: Some(1_000),
730 },
731 )
732 .unwrap();
733 reg.append(t("acme"), "orders", None, "old", 0).unwrap();
734 reg.append(t("acme"), "orders", None, "old2", 500).unwrap();
735 reg.append(t("acme"), "orders", None, "fresh", 10_000)
738 .unwrap();
739 let events = reg.read_since(&t("acme"), "orders", 0, 100).unwrap();
740 assert_eq!(events.len(), 1);
741 assert_eq!(events[0].payload, "fresh");
742 assert_eq!(events[0].offset, 3, "retention must not rewrite offsets");
743 }
744
745 #[test]
746 fn consumer_lagged_past_retention_does_not_error() {
747 let reg = StreamRegistry::new();
748 reg.create_stream(
749 t("acme"),
750 "orders",
751 StreamRetention {
752 max_events: Some(2),
753 max_age_ms: None,
754 },
755 )
756 .unwrap();
757 for i in 0..5 {
758 reg.append(t("acme"), "orders", None, "x", i).unwrap();
759 }
760 let events = reg.read_since(&t("acme"), "orders", 2, 100).unwrap();
765 assert_eq!(
766 events.iter().map(|e| e.offset).collect::<Vec<_>>(),
767 vec![4, 5],
768 );
769 }
770
771 #[test]
772 fn same_tenant_append_does_not_require_cross_tenant_cap() {
773 let reg = StreamRegistry::new();
774 reg.create_stream(t("acme"), "orders", StreamRetention::default())
775 .unwrap();
776 let offset = reg
777 .append_authorized(Some("acme"), t("acme"), "orders", None, "x", false, 0)
778 .expect("same-tenant append must succeed without cross-tenant cap");
779 assert_eq!(offset, 1);
780
781 reg.read_since_authorized(Some("acme"), t("acme"), "orders", 0, 100, false)
782 .expect("same-tenant read must succeed without cross-tenant cap");
783 }
784
785 #[test]
786 fn cross_tenant_append_denied_without_cap() {
787 let reg = StreamRegistry::new();
788 reg.create_stream(t("globex"), "orders", StreamRetention::default())
789 .unwrap();
790 let err = reg
791 .append_authorized(Some("acme"), t("globex"), "orders", None, "leak", false, 0)
792 .expect_err("cross-tenant append must be denied without cap");
793 match err {
794 StreamError::CrossTenantDenied {
795 principal_tenant,
796 target,
797 stream,
798 } => {
799 assert_eq!(principal_tenant.as_deref(), Some("acme"));
800 assert_eq!(target, t("globex"));
801 assert_eq!(stream, "orders");
802 }
803 other => panic!("unexpected error: {other:?}"),
804 }
805 }
806
807 #[test]
808 fn cross_tenant_read_denied_without_cap() {
809 let reg = StreamRegistry::new();
810 reg.create_stream(t("globex"), "orders", StreamRetention::default())
811 .unwrap();
812 let err = reg
813 .read_since_authorized(Some("acme"), t("globex"), "orders", 0, 100, false)
814 .expect_err("cross-tenant read must be denied without cap");
815 assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
816 }
817
818 #[test]
819 fn cross_tenant_append_allowed_with_cap() {
820 let reg = StreamRegistry::new();
821 reg.create_stream(t("globex"), "orders", StreamRetention::default())
822 .unwrap();
823 let offset = reg
824 .append_authorized(
825 Some("acme"),
826 t("globex"),
827 "orders",
828 None,
829 "allowed",
830 true,
831 0,
832 )
833 .expect("append with cross-tenant cap must succeed");
834 assert_eq!(offset, 1);
835 }
836
837 #[test]
838 fn global_scope_requires_cross_tenant_cap_for_tenant_principal() {
839 let reg = StreamRegistry::new();
840 reg.create_stream(StreamScope::Global, "platform", StreamRetention::default())
841 .unwrap();
842 let err = reg
843 .append_authorized(
844 Some("acme"),
845 StreamScope::Global,
846 "platform",
847 None,
848 "leak",
849 false,
850 0,
851 )
852 .expect_err("tenant principal targeting Global must require cap");
853 assert!(matches!(err, StreamError::CrossTenantDenied { .. }));
854
855 let offset = reg
858 .append_authorized(None, StreamScope::Global, "platform", None, "ok", false, 0)
859 .expect("platform principal targeting global is same-scope");
860 assert_eq!(offset, 1);
861 }
862
863 #[test]
864 fn from_principal_tenant_maps_correctly() {
865 assert_eq!(
866 StreamScope::from_principal_tenant(Some("acme")),
867 StreamScope::Tenant("acme".into())
868 );
869 assert_eq!(
870 StreamScope::from_principal_tenant(None),
871 StreamScope::Global
872 );
873 }
874
875 #[test]
876 fn event_carries_optional_key_for_future_cdc() {
877 let reg = StreamRegistry::new();
878 reg.create_stream(t("acme"), "rows", StreamRetention::default())
879 .unwrap();
880 reg.append(t("acme"), "rows", Some("user:42".into()), "{}", 0)
881 .unwrap();
882 let events = reg.read_since(&t("acme"), "rows", 0, 100).unwrap();
883 assert_eq!(events[0].key.as_deref(), Some("user:42"));
884 }
885}