1use std::collections::{HashMap, VecDeque};
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::{Arc, RwLock};
42use std::time::{Duration, Instant};
43
44pub const DEFAULT_MAX_EVENTS_PER_STREAM: usize = 100;
46
47pub const DEFAULT_TTL_SECS: u64 = 3600;
49
50pub type EventId = String;
52
53pub type StreamId = String;
55
56#[derive(Debug, Clone)]
58pub struct EventEntry {
59 pub id: EventId,
61 pub stream_id: StreamId,
63 pub data: Option<serde_json::Value>,
65 pub created_at: Instant,
67}
68
69impl EventEntry {
70 fn new(id: EventId, stream_id: StreamId, data: Option<serde_json::Value>) -> Self {
72 Self {
73 id,
74 stream_id,
75 data,
76 created_at: Instant::now(),
77 }
78 }
79
80 fn is_expired(&self, ttl: Option<Duration>) -> bool {
82 match ttl {
83 Some(ttl) => self.created_at.elapsed() > ttl,
84 None => false,
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct EventStoreConfig {
92 pub max_events_per_stream: usize,
94 pub ttl: Option<Duration>,
96}
97
98impl Default for EventStoreConfig {
99 fn default() -> Self {
100 Self {
101 max_events_per_stream: DEFAULT_MAX_EVENTS_PER_STREAM,
102 ttl: Some(Duration::from_secs(DEFAULT_TTL_SECS)),
103 }
104 }
105}
106
107impl EventStoreConfig {
108 #[must_use]
110 pub fn no_expiry() -> Self {
111 Self {
112 ttl: None,
113 ..Default::default()
114 }
115 }
116
117 #[must_use]
119 pub fn max_events(mut self, max: usize) -> Self {
120 self.max_events_per_stream = max;
121 self
122 }
123
124 #[must_use]
126 pub fn ttl(mut self, ttl: Duration) -> Self {
127 self.ttl = Some(ttl);
128 self
129 }
130
131 #[must_use]
133 pub fn no_ttl(mut self) -> Self {
134 self.ttl = None;
135 self
136 }
137}
138
139#[derive(Debug)]
141struct StreamEvents {
142 events: VecDeque<EventEntry>,
144 index: HashMap<EventId, usize>,
146}
147
148impl StreamEvents {
149 fn new() -> Self {
150 Self {
151 events: VecDeque::new(),
152 index: HashMap::new(),
153 }
154 }
155
156 fn push(&mut self, entry: EventEntry, max_events: usize) {
158 while self.events.len() >= max_events {
160 if let Some(oldest) = self.events.pop_front() {
161 self.index.remove(&oldest.id);
162 }
163 self.rebuild_index();
165 }
166
167 let idx = self.events.len();
168 self.index.insert(entry.id.clone(), idx);
169 self.events.push_back(entry);
170 }
171
172 fn remove_expired(&mut self, ttl: Option<Duration>) {
174 if ttl.is_none() {
175 return;
176 }
177
178 let mut removed = false;
179 while let Some(front) = self.events.front() {
180 if front.is_expired(ttl) {
181 if let Some(entry) = self.events.pop_front() {
182 self.index.remove(&entry.id);
183 removed = true;
184 }
185 } else {
186 break;
187 }
188 }
189
190 if removed {
191 self.rebuild_index();
192 }
193 }
194
195 fn rebuild_index(&mut self) {
197 self.index.clear();
198 for (idx, entry) in self.events.iter().enumerate() {
199 self.index.insert(entry.id.clone(), idx);
200 }
201 }
202
203 fn events_after(&self, after_id: Option<&str>) -> Vec<EventEntry> {
205 match after_id {
206 None => self.events.iter().cloned().collect(),
207 Some(id) => {
208 if let Some(&idx) = self.index.get(id) {
209 self.events.iter().skip(idx + 1).cloned().collect()
210 } else {
211 Vec::new()
213 }
214 }
215 }
216 }
217
218 fn contains(&self, event_id: &str) -> bool {
220 self.index.contains_key(event_id)
221 }
222}
223
224#[derive(Debug)]
234pub struct EventStore {
235 config: EventStoreConfig,
237 streams: RwLock<HashMap<StreamId, StreamEvents>>,
239 event_counter: AtomicU64,
241}
242
243impl Default for EventStore {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249impl EventStore {
250 #[must_use]
252 pub fn new() -> Self {
253 Self::with_config(EventStoreConfig::default())
254 }
255
256 #[must_use]
258 pub fn with_config(config: EventStoreConfig) -> Self {
259 Self {
260 config,
261 streams: RwLock::new(HashMap::new()),
262 event_counter: AtomicU64::new(0),
263 }
264 }
265
266 #[must_use]
268 pub fn config(&self) -> &EventStoreConfig {
269 &self.config
270 }
271
272 fn generate_event_id(&self) -> EventId {
274 let counter = self.event_counter.fetch_add(1, Ordering::Relaxed);
275 let timestamp = std::time::SystemTime::now()
276 .duration_since(std::time::UNIX_EPOCH)
277 .unwrap_or_default()
278 .as_millis();
279 format!("{timestamp}-{counter}")
280 }
281
282 pub fn store_event(&self, stream_id: &str, data: Option<serde_json::Value>) -> EventId {
293 let event_id = self.generate_event_id();
294 let entry = EventEntry::new(event_id.clone(), stream_id.to_string(), data);
295
296 let mut streams = self
297 .streams
298 .write()
299 .unwrap_or_else(std::sync::PoisonError::into_inner);
300
301 let stream = streams
302 .entry(stream_id.to_string())
303 .or_insert_with(StreamEvents::new);
304
305 stream.remove_expired(self.config.ttl);
307
308 stream.push(entry, self.config.max_events_per_stream);
310
311 event_id
312 }
313
314 pub fn store_priming_event(&self, stream_id: &str) -> EventId {
319 self.store_event(stream_id, None)
320 }
321
322 #[must_use]
333 pub fn get_events_after(&self, stream_id: &str, after_id: Option<&str>) -> Vec<EventEntry> {
334 let mut streams = self
335 .streams
336 .write()
337 .unwrap_or_else(std::sync::PoisonError::into_inner);
338
339 if let Some(stream) = streams.get_mut(stream_id) {
340 stream.remove_expired(self.config.ttl);
342 stream.events_after(after_id)
343 } else {
344 Vec::new()
345 }
346 }
347
348 pub fn replay_events_after<F>(&self, last_event_id: &str, mut callback: F) -> Option<StreamId>
362 where
363 F: FnMut(&EventEntry),
364 {
365 let streams = self
366 .streams
367 .read()
368 .unwrap_or_else(std::sync::PoisonError::into_inner);
369
370 for (stream_id, stream) in streams.iter() {
372 if stream.contains(last_event_id) {
373 let events = stream.events_after(Some(last_event_id));
374 for event in events {
375 callback(&event);
376 }
377 return Some(stream_id.clone());
378 }
379 }
380
381 None
382 }
383
384 #[must_use]
390 pub fn find_stream_for_event(&self, event_id: &str) -> Option<StreamId> {
391 let streams = self
392 .streams
393 .read()
394 .unwrap_or_else(std::sync::PoisonError::into_inner);
395
396 for (stream_id, stream) in streams.iter() {
397 if stream.contains(event_id) {
398 return Some(stream_id.clone());
399 }
400 }
401
402 None
403 }
404
405 pub fn clear_stream(&self, stream_id: &str) {
409 let mut streams = self
410 .streams
411 .write()
412 .unwrap_or_else(std::sync::PoisonError::into_inner);
413 streams.remove(stream_id);
414 }
415
416 pub fn cleanup_expired(&self) {
421 if self.config.ttl.is_none() {
422 return;
423 }
424
425 let mut streams = self
426 .streams
427 .write()
428 .unwrap_or_else(std::sync::PoisonError::into_inner);
429
430 for stream in streams.values_mut() {
432 stream.remove_expired(self.config.ttl);
433 }
434
435 streams.retain(|_, stream| !stream.events.is_empty());
437 }
438
439 #[must_use]
441 pub fn stream_count(&self) -> usize {
442 let streams = self
443 .streams
444 .read()
445 .unwrap_or_else(std::sync::PoisonError::into_inner);
446 streams.len()
447 }
448
449 #[must_use]
451 pub fn event_count(&self) -> usize {
452 let streams = self
453 .streams
454 .read()
455 .unwrap_or_else(std::sync::PoisonError::into_inner);
456 streams.values().map(|s| s.events.len()).sum()
457 }
458
459 #[must_use]
461 pub fn stats(&self) -> EventStoreStats {
462 let streams = self
463 .streams
464 .read()
465 .unwrap_or_else(std::sync::PoisonError::into_inner);
466 let total_events: usize = streams.values().map(|s| s.events.len()).sum();
467
468 EventStoreStats {
469 stream_count: streams.len(),
470 total_events,
471 max_events_per_stream: self.config.max_events_per_stream,
472 ttl: self.config.ttl,
473 }
474 }
475}
476
477#[derive(Debug, Clone)]
479pub struct EventStoreStats {
480 pub stream_count: usize,
482 pub total_events: usize,
484 pub max_events_per_stream: usize,
486 pub ttl: Option<Duration>,
488}
489
490pub type SharedEventStore = Arc<EventStore>;
492
493#[must_use]
495pub fn create_shared_event_store() -> SharedEventStore {
496 Arc::new(EventStore::new())
497}
498
499#[must_use]
501pub fn create_shared_event_store_with_config(config: EventStoreConfig) -> SharedEventStore {
502 Arc::new(EventStore::with_config(config))
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_store_and_retrieve_event() {
511 let store = EventStore::new();
512
513 let event_id = store.store_event("stream1", Some(serde_json::json!({"test": true})));
514 assert!(!event_id.is_empty());
515
516 let events = store.get_events_after("stream1", None);
517 assert_eq!(events.len(), 1);
518 assert_eq!(events[0].id, event_id);
519 assert!(events[0].data.is_some());
520 }
521
522 #[test]
523 fn test_store_priming_event() {
524 let store = EventStore::new();
525
526 let event_id = store.store_priming_event("stream1");
527 assert!(!event_id.is_empty());
528
529 let events = store.get_events_after("stream1", None);
530 assert_eq!(events.len(), 1);
531 assert!(events[0].data.is_none());
532 }
533
534 #[test]
535 fn test_events_after_id() {
536 let store = EventStore::new();
537
538 let id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
539 let id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
540 let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
541
542 let events = store.get_events_after("stream1", Some(&id1));
544 assert_eq!(events.len(), 2);
545 assert_eq!(events[0].id, id2);
546 assert_eq!(events[1].id, id3);
547
548 let events = store.get_events_after("stream1", Some(&id2));
550 assert_eq!(events.len(), 1);
551 assert_eq!(events[0].id, id3);
552
553 let events = store.get_events_after("stream1", Some(&id3));
555 assert!(events.is_empty());
556 }
557
558 #[test]
559 fn test_multiple_streams() {
560 let store = EventStore::new();
561
562 let id1 = store.store_event("stream1", Some(serde_json::json!({"stream": 1})));
563 let id2 = store.store_event("stream2", Some(serde_json::json!({"stream": 2})));
564
565 let events1 = store.get_events_after("stream1", None);
566 let events2 = store.get_events_after("stream2", None);
567
568 assert_eq!(events1.len(), 1);
569 assert_eq!(events1[0].id, id1);
570
571 assert_eq!(events2.len(), 1);
572 assert_eq!(events2[0].id, id2);
573 }
574
575 #[test]
576 fn test_max_events_limit() {
577 let config = EventStoreConfig::default().max_events(3);
578 let store = EventStore::with_config(config);
579
580 let _id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
581 let _id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
582 let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
583 let id4 = store.store_event("stream1", Some(serde_json::json!({"n": 4})));
584
585 let events = store.get_events_after("stream1", None);
587 assert_eq!(events.len(), 3);
588
589 assert_eq!(events[1].id, id3);
593 assert_eq!(events[2].id, id4);
594 }
595
596 #[test]
597 fn test_replay_events() {
598 let store = EventStore::new();
599
600 let id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
601 let id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
602 let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
603
604 let mut replayed = Vec::new();
605 let stream_id = store.replay_events_after(&id1, |event| {
606 replayed.push(event.id.clone());
607 });
608
609 assert_eq!(stream_id, Some("stream1".to_string()));
610 assert_eq!(replayed, vec![id2, id3]);
611 }
612
613 #[test]
614 fn test_replay_unknown_event_id() {
615 let store = EventStore::new();
616 store.store_event("stream1", Some(serde_json::json!({})));
617
618 let mut replayed = Vec::new();
619 let stream_id = store.replay_events_after("nonexistent", |event| {
620 replayed.push(event.id.clone());
621 });
622
623 assert!(stream_id.is_none());
624 assert!(replayed.is_empty());
625 }
626
627 #[test]
628 fn test_find_stream_for_event() {
629 let store = EventStore::new();
630
631 let id1 = store.store_event("stream1", Some(serde_json::json!({})));
632 let id2 = store.store_event("stream2", Some(serde_json::json!({})));
633
634 assert_eq!(
635 store.find_stream_for_event(&id1),
636 Some("stream1".to_string())
637 );
638 assert_eq!(
639 store.find_stream_for_event(&id2),
640 Some("stream2".to_string())
641 );
642 assert_eq!(store.find_stream_for_event("nonexistent"), None);
643 }
644
645 #[test]
646 fn test_clear_stream() {
647 let store = EventStore::new();
648
649 store.store_event("stream1", Some(serde_json::json!({})));
650 store.store_event("stream2", Some(serde_json::json!({})));
651
652 assert_eq!(store.stream_count(), 2);
653
654 store.clear_stream("stream1");
655
656 assert_eq!(store.stream_count(), 1);
657 assert!(store.get_events_after("stream1", None).is_empty());
658 }
659
660 #[test]
661 fn test_event_expiration() {
662 let config = EventStoreConfig {
663 max_events_per_stream: 100,
664 ttl: Some(Duration::from_millis(10)),
665 };
666 let store = EventStore::with_config(config);
667
668 store.store_event("stream1", Some(serde_json::json!({})));
669
670 assert_eq!(store.get_events_after("stream1", None).len(), 1);
672
673 std::thread::sleep(Duration::from_millis(20));
675
676 store.cleanup_expired();
678 assert!(store.get_events_after("stream1", None).is_empty());
679 }
680
681 #[test]
682 fn test_no_expiration() {
683 let config = EventStoreConfig::no_expiry();
684 let store = EventStore::with_config(config);
685
686 store.store_event("stream1", Some(serde_json::json!({})));
687
688 store.cleanup_expired();
690 assert_eq!(store.get_events_after("stream1", None).len(), 1);
691 }
692
693 #[test]
694 fn test_stats() {
695 let store = EventStore::new();
696
697 store.store_event("stream1", Some(serde_json::json!({})));
698 store.store_event("stream1", Some(serde_json::json!({})));
699 store.store_event("stream2", Some(serde_json::json!({})));
700
701 let stats = store.stats();
702 assert_eq!(stats.stream_count, 2);
703 assert_eq!(stats.total_events, 3);
704 }
705
706 #[test]
707 fn test_shared_event_store() {
708 let store = create_shared_event_store();
709
710 let store1 = Arc::clone(&store);
712 let store2 = Arc::clone(&store);
713
714 store1.store_event("stream1", Some(serde_json::json!({"from": 1})));
715 store2.store_event("stream1", Some(serde_json::json!({"from": 2})));
716
717 assert_eq!(store.event_count(), 2);
718 }
719
720 #[test]
721 fn test_unique_event_ids() {
722 let store = EventStore::new();
723
724 let id1 = store.store_event("stream1", None);
725 let id2 = store.store_event("stream1", None);
726 let id3 = store.store_event("stream2", None);
727
728 assert_ne!(id1, id2);
730 assert_ne!(id2, id3);
731 assert_ne!(id1, id3);
732 }
733
734 #[test]
735 fn test_config_builder() {
736 let config = EventStoreConfig::default()
737 .max_events(50)
738 .ttl(Duration::from_secs(300));
739
740 assert_eq!(config.max_events_per_stream, 50);
741 assert_eq!(config.ttl, Some(Duration::from_secs(300)));
742
743 let config = config.no_ttl();
744 assert!(config.ttl.is_none());
745 }
746
747 #[test]
752 fn event_store_config_default_values() {
753 let config = EventStoreConfig::default();
754 assert_eq!(config.max_events_per_stream, DEFAULT_MAX_EVENTS_PER_STREAM);
755 assert_eq!(config.ttl, Some(Duration::from_secs(DEFAULT_TTL_SECS)));
756 }
757
758 #[test]
759 fn event_store_default_trait() {
760 let store = EventStore::default();
761 assert_eq!(store.stream_count(), 0);
762 assert_eq!(store.event_count(), 0);
763 assert_eq!(
764 store.config().max_events_per_stream,
765 DEFAULT_MAX_EVENTS_PER_STREAM
766 );
767 }
768
769 #[test]
770 fn event_store_config_accessor() {
771 let config = EventStoreConfig::no_expiry().max_events(42);
772 let store = EventStore::with_config(config);
773 assert_eq!(store.config().max_events_per_stream, 42);
774 assert!(store.config().ttl.is_none());
775 }
776
777 #[test]
778 fn get_events_after_nonexistent_stream_returns_empty() {
779 let store = EventStore::new();
780 store.store_event("stream1", Some(serde_json::json!({})));
781 let events = store.get_events_after("no-such-stream", None);
782 assert!(events.is_empty());
783 }
784
785 #[test]
786 fn get_events_after_unknown_id_returns_empty() {
787 let store = EventStore::new();
788 store.store_event("stream1", Some(serde_json::json!({})));
789 let events = store.get_events_after("stream1", Some("bogus-id"));
791 assert!(events.is_empty());
792 }
793
794 #[test]
795 fn create_shared_event_store_with_config_works() {
796 let config = EventStoreConfig::no_expiry().max_events(5);
797 let store = create_shared_event_store_with_config(config);
798 assert_eq!(store.config().max_events_per_stream, 5);
799 assert!(store.config().ttl.is_none());
800 }
801
802 #[test]
803 fn cleanup_expired_removes_empty_streams() {
804 let config = EventStoreConfig {
805 max_events_per_stream: 100,
806 ttl: Some(Duration::from_millis(10)),
807 };
808 let store = EventStore::with_config(config);
809
810 store.store_event("stream1", Some(serde_json::json!({})));
811 store.store_event("stream2", Some(serde_json::json!({})));
812 assert_eq!(store.stream_count(), 2);
813
814 std::thread::sleep(Duration::from_millis(20));
815 store.cleanup_expired();
816
817 assert_eq!(store.stream_count(), 0);
819 assert_eq!(store.event_count(), 0);
820 }
821
822 #[test]
823 fn event_store_stats_includes_config_fields() {
824 let config = EventStoreConfig::no_expiry().max_events(77);
825 let store = EventStore::with_config(config);
826 store.store_event("s1", None);
827
828 let stats = store.stats();
829 assert_eq!(stats.stream_count, 1);
830 assert_eq!(stats.total_events, 1);
831 assert_eq!(stats.max_events_per_stream, 77);
832 assert!(stats.ttl.is_none());
833 }
834
835 #[test]
840 fn event_entry_debug_and_clone() {
841 let entry = EventEntry::new("ev1".into(), "s1".into(), Some(serde_json::json!(42)));
842 let debug = format!("{entry:?}");
843 assert!(debug.contains("ev1"));
844 assert!(debug.contains("s1"));
845
846 let cloned = entry.clone();
847 assert_eq!(cloned.id, "ev1");
848 assert_eq!(cloned.stream_id, "s1");
849 }
850
851 #[test]
852 fn event_entry_is_expired_not_expired() {
853 let entry = EventEntry::new("ev1".into(), "s1".into(), None);
854 assert!(!entry.is_expired(Some(Duration::from_secs(3600))));
856 assert!(!entry.is_expired(None));
858 }
859
860 #[test]
861 fn event_store_config_debug_and_clone() {
862 let config = EventStoreConfig::default().max_events(10);
863 let debug = format!("{config:?}");
864 assert!(debug.contains("10"));
865
866 let cloned = config.clone();
867 assert_eq!(cloned.max_events_per_stream, 10);
868 }
869
870 #[test]
871 fn event_store_stats_debug_and_clone() {
872 let store = EventStore::new();
873 store.store_event("s1", None);
874 let stats = store.stats();
875 let debug = format!("{stats:?}");
876 assert!(debug.contains("EventStoreStats"));
877
878 let cloned = stats.clone();
879 assert_eq!(cloned.stream_count, 1);
880 }
881
882 #[test]
883 fn event_store_debug() {
884 let store = EventStore::new();
885 let debug = format!("{store:?}");
886 assert!(debug.contains("EventStore"));
887 }
888
889 #[test]
890 fn cleanup_expired_noop_with_no_ttl() {
891 let config = EventStoreConfig::no_expiry();
892 let store = EventStore::with_config(config);
893 store.store_event("s1", Some(serde_json::json!(1)));
894 store.store_event("s2", Some(serde_json::json!(2)));
895 store.cleanup_expired();
896 assert_eq!(store.event_count(), 2);
898 assert_eq!(store.stream_count(), 2);
899 }
900
901 #[test]
902 fn clear_stream_nonexistent_is_noop() {
903 let store = EventStore::new();
904 store.store_event("s1", None);
905 store.clear_stream("no-such-stream");
906 assert_eq!(store.stream_count(), 1);
907 }
908
909 #[test]
910 fn event_id_format_contains_dash() {
911 let store = EventStore::new();
912 let id = store.store_event("s1", None);
913 assert!(id.contains('-'), "event ID should contain a dash: {id}");
914 }
915}