Skip to main content

fastmcp_transport/
event_store.rs

1//! Event store for SSE resumability.
2//!
3//! This module provides an [`EventStore`] that enables SSE polling and
4//! resumability by storing events that can be replayed when clients reconnect.
5//!
6//! # SSE Resumability
7//!
8//! When a client disconnects from an SSE stream, it may miss events. The
9//! `Last-Event-ID` header allows clients to indicate where they left off,
10//! and the server can replay missed events using the EventStore.
11//!
12//! # Features
13//!
14//! - **TTL-based event retention**: Events automatically expire after a configurable duration
15//! - **Per-stream event limits**: Prevents unbounded memory growth
16//! - **Cursor-based resumption**: Replay events from any point using event IDs
17//! - **Thread-safe**: Safe for concurrent access from multiple handlers
18//!
19//! # Example
20//!
21//! ```
22//! use fastmcp_transport::event_store::{EventStore, EventStoreConfig};
23//! use std::time::Duration;
24//!
25//! // Create event store with custom configuration
26//! let store = EventStore::with_config(EventStoreConfig {
27//!     max_events_per_stream: 100,
28//!     ttl: Some(Duration::from_secs(3600)), // 1 hour
29//! });
30//!
31//! // Store an event
32//! let stream_id = "session-123";
33//! let event_id = store.store_event(stream_id, Some(serde_json::json!({"method": "test"})));
34//!
35//! // Replay events after a specific ID
36//! let events = store.get_events_after(stream_id, None); // Get all events
37//! ```
38
39use std::collections::{HashMap, VecDeque};
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::{Arc, RwLock};
42use std::time::{Duration, Instant};
43
44/// Default maximum events per stream.
45pub const DEFAULT_MAX_EVENTS_PER_STREAM: usize = 100;
46
47/// Default TTL for events (1 hour).
48pub const DEFAULT_TTL_SECS: u64 = 3600;
49
50/// Unique identifier for an event.
51pub type EventId = String;
52
53/// Unique identifier for a stream (session).
54pub type StreamId = String;
55
56/// A stored event with metadata.
57#[derive(Debug, Clone)]
58pub struct EventEntry {
59    /// Unique event identifier.
60    pub id: EventId,
61    /// Stream this event belongs to.
62    pub stream_id: StreamId,
63    /// Event data (None for priming events).
64    pub data: Option<serde_json::Value>,
65    /// When this event was stored.
66    pub created_at: Instant,
67}
68
69impl EventEntry {
70    /// Creates a new event entry.
71    fn new(id: EventId, stream_id: StreamId, data: Option<serde_json::Value>) -> Self {
72        Self {
73            id,
74            stream_id,
75            data,
76            created_at: Instant::now(),
77        }
78    }
79
80    /// Returns true if this event has expired based on the given TTL.
81    fn is_expired(&self, ttl: Option<Duration>) -> bool {
82        match ttl {
83            Some(ttl) => self.created_at.elapsed() > ttl,
84            None => false,
85        }
86    }
87}
88
89/// Configuration for the event store.
90#[derive(Debug, Clone)]
91pub struct EventStoreConfig {
92    /// Maximum number of events to retain per stream.
93    pub max_events_per_stream: usize,
94    /// Time-to-live for events. `None` means events never expire.
95    pub ttl: Option<Duration>,
96}
97
98impl Default for EventStoreConfig {
99    fn default() -> Self {
100        Self {
101            max_events_per_stream: DEFAULT_MAX_EVENTS_PER_STREAM,
102            ttl: Some(Duration::from_secs(DEFAULT_TTL_SECS)),
103        }
104    }
105}
106
107impl EventStoreConfig {
108    /// Creates a config with no TTL (events never expire).
109    #[must_use]
110    pub fn no_expiry() -> Self {
111        Self {
112            ttl: None,
113            ..Default::default()
114        }
115    }
116
117    /// Sets the maximum events per stream.
118    #[must_use]
119    pub fn max_events(mut self, max: usize) -> Self {
120        self.max_events_per_stream = max;
121        self
122    }
123
124    /// Sets the TTL for events.
125    #[must_use]
126    pub fn ttl(mut self, ttl: Duration) -> Self {
127        self.ttl = Some(ttl);
128        self
129    }
130
131    /// Disables TTL (events never expire).
132    #[must_use]
133    pub fn no_ttl(mut self) -> Self {
134        self.ttl = None;
135        self
136    }
137}
138
139/// Internal storage for a single stream's events.
140#[derive(Debug)]
141struct StreamEvents {
142    /// Events in insertion order (oldest first).
143    events: VecDeque<EventEntry>,
144    /// Map from event ID to index for fast lookup.
145    index: HashMap<EventId, usize>,
146}
147
148impl StreamEvents {
149    fn new() -> Self {
150        Self {
151            events: VecDeque::new(),
152            index: HashMap::new(),
153        }
154    }
155
156    /// Adds an event, enforcing max events limit.
157    fn push(&mut self, entry: EventEntry, max_events: usize) {
158        // Remove oldest if at capacity
159        while self.events.len() >= max_events {
160            if let Some(oldest) = self.events.pop_front() {
161                self.index.remove(&oldest.id);
162            }
163            // Rebuild index after removal (indices shifted)
164            self.rebuild_index();
165        }
166
167        let idx = self.events.len();
168        self.index.insert(entry.id.clone(), idx);
169        self.events.push_back(entry);
170    }
171
172    /// Removes expired events.
173    fn remove_expired(&mut self, ttl: Option<Duration>) {
174        if ttl.is_none() {
175            return;
176        }
177
178        let mut removed = false;
179        while let Some(front) = self.events.front() {
180            if front.is_expired(ttl) {
181                if let Some(entry) = self.events.pop_front() {
182                    self.index.remove(&entry.id);
183                    removed = true;
184                }
185            } else {
186                break;
187            }
188        }
189
190        if removed {
191            self.rebuild_index();
192        }
193    }
194
195    /// Rebuilds the index after removals.
196    fn rebuild_index(&mut self) {
197        self.index.clear();
198        for (idx, entry) in self.events.iter().enumerate() {
199            self.index.insert(entry.id.clone(), idx);
200        }
201    }
202
203    /// Gets events after the specified ID (exclusive).
204    fn events_after(&self, after_id: Option<&str>) -> Vec<EventEntry> {
205        match after_id {
206            None => self.events.iter().cloned().collect(),
207            Some(id) => {
208                if let Some(&idx) = self.index.get(id) {
209                    self.events.iter().skip(idx + 1).cloned().collect()
210                } else {
211                    // ID not found, return empty (client should reconnect fresh)
212                    Vec::new()
213                }
214            }
215        }
216    }
217
218    /// Finds the stream ID for a given event ID.
219    fn contains(&self, event_id: &str) -> bool {
220        self.index.contains_key(event_id)
221    }
222}
223
224/// Thread-safe event store for SSE resumability.
225///
226/// Stores events per stream with automatic expiration and size limits.
227/// Use this to enable clients to resume SSE streams after disconnection.
228///
229/// # Thread Safety
230///
231/// The EventStore uses `RwLock` internally and is safe for concurrent
232/// access from multiple threads.
233#[derive(Debug)]
234pub struct EventStore {
235    /// Configuration.
236    config: EventStoreConfig,
237    /// Per-stream event storage.
238    streams: RwLock<HashMap<StreamId, StreamEvents>>,
239    /// Counter for generating unique event IDs.
240    event_counter: AtomicU64,
241}
242
243impl Default for EventStore {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249impl EventStore {
250    /// Creates a new event store with default configuration.
251    #[must_use]
252    pub fn new() -> Self {
253        Self::with_config(EventStoreConfig::default())
254    }
255
256    /// Creates a new event store with custom configuration.
257    #[must_use]
258    pub fn with_config(config: EventStoreConfig) -> Self {
259        Self {
260            config,
261            streams: RwLock::new(HashMap::new()),
262            event_counter: AtomicU64::new(0),
263        }
264    }
265
266    /// Returns the configuration.
267    #[must_use]
268    pub fn config(&self) -> &EventStoreConfig {
269        &self.config
270    }
271
272    /// Generates a unique event ID.
273    fn generate_event_id(&self) -> EventId {
274        let counter = self.event_counter.fetch_add(1, Ordering::Relaxed);
275        let timestamp = std::time::SystemTime::now()
276            .duration_since(std::time::UNIX_EPOCH)
277            .unwrap_or_default()
278            .as_millis();
279        format!("{timestamp}-{counter}")
280    }
281
282    /// Stores an event and returns its ID.
283    ///
284    /// # Arguments
285    ///
286    /// * `stream_id` - The stream (session) this event belongs to
287    /// * `data` - Event data, or `None` for a priming event
288    ///
289    /// # Returns
290    ///
291    /// The unique event ID that can be used for resumption.
292    pub fn store_event(&self, stream_id: &str, data: Option<serde_json::Value>) -> EventId {
293        let event_id = self.generate_event_id();
294        let entry = EventEntry::new(event_id.clone(), stream_id.to_string(), data);
295
296        let mut streams = self
297            .streams
298            .write()
299            .unwrap_or_else(std::sync::PoisonError::into_inner);
300
301        let stream = streams
302            .entry(stream_id.to_string())
303            .or_insert_with(StreamEvents::new);
304
305        // Clean up expired events first
306        stream.remove_expired(self.config.ttl);
307
308        // Add the new event
309        stream.push(entry, self.config.max_events_per_stream);
310
311        event_id
312    }
313
314    /// Stores a priming event (empty data) for SSE initialization.
315    ///
316    /// Per SSE spec, servers should send an event with just an ID to prime
317    /// the client's `Last-Event-ID` tracking.
318    pub fn store_priming_event(&self, stream_id: &str) -> EventId {
319        self.store_event(stream_id, None)
320    }
321
322    /// Gets events after the specified event ID.
323    ///
324    /// # Arguments
325    ///
326    /// * `stream_id` - The stream to get events from
327    /// * `after_id` - Get events after this ID (exclusive). `None` returns all events.
328    ///
329    /// # Returns
330    ///
331    /// Vector of events in chronological order.
332    #[must_use]
333    pub fn get_events_after(&self, stream_id: &str, after_id: Option<&str>) -> Vec<EventEntry> {
334        let mut streams = self
335            .streams
336            .write()
337            .unwrap_or_else(std::sync::PoisonError::into_inner);
338
339        if let Some(stream) = streams.get_mut(stream_id) {
340            // Clean up expired events first
341            stream.remove_expired(self.config.ttl);
342            stream.events_after(after_id)
343        } else {
344            Vec::new()
345        }
346    }
347
348    /// Replays events after a specific event ID using a callback.
349    ///
350    /// This is the primary method for SSE resumption. When a client reconnects
351    /// with a `Last-Event-ID`, use this to replay missed events.
352    ///
353    /// # Arguments
354    ///
355    /// * `last_event_id` - The client's last received event ID
356    /// * `callback` - Called for each event to replay
357    ///
358    /// # Returns
359    ///
360    /// The stream ID if the event was found, `None` otherwise.
361    pub fn replay_events_after<F>(&self, last_event_id: &str, mut callback: F) -> Option<StreamId>
362    where
363        F: FnMut(&EventEntry),
364    {
365        let streams = self
366            .streams
367            .read()
368            .unwrap_or_else(std::sync::PoisonError::into_inner);
369
370        // Find which stream contains this event
371        for (stream_id, stream) in streams.iter() {
372            if stream.contains(last_event_id) {
373                let events = stream.events_after(Some(last_event_id));
374                for event in events {
375                    callback(&event);
376                }
377                return Some(stream_id.clone());
378            }
379        }
380
381        None
382    }
383
384    /// Looks up the stream ID for a given event ID.
385    ///
386    /// # Returns
387    ///
388    /// The stream ID if the event exists, `None` otherwise.
389    #[must_use]
390    pub fn find_stream_for_event(&self, event_id: &str) -> Option<StreamId> {
391        let streams = self
392            .streams
393            .read()
394            .unwrap_or_else(std::sync::PoisonError::into_inner);
395
396        for (stream_id, stream) in streams.iter() {
397            if stream.contains(event_id) {
398                return Some(stream_id.clone());
399            }
400        }
401
402        None
403    }
404
405    /// Removes all events for a stream.
406    ///
407    /// Call this when a session ends to free memory.
408    pub fn clear_stream(&self, stream_id: &str) {
409        let mut streams = self
410            .streams
411            .write()
412            .unwrap_or_else(std::sync::PoisonError::into_inner);
413        streams.remove(stream_id);
414    }
415
416    /// Removes all expired events across all streams.
417    ///
418    /// This is called automatically during operations, but you can call
419    /// it manually for cleanup.
420    pub fn cleanup_expired(&self) {
421        if self.config.ttl.is_none() {
422            return;
423        }
424
425        let mut streams = self
426            .streams
427            .write()
428            .unwrap_or_else(std::sync::PoisonError::into_inner);
429
430        // Remove expired events from each stream
431        for stream in streams.values_mut() {
432            stream.remove_expired(self.config.ttl);
433        }
434
435        // Remove empty streams
436        streams.retain(|_, stream| !stream.events.is_empty());
437    }
438
439    /// Returns the number of streams currently stored.
440    #[must_use]
441    pub fn stream_count(&self) -> usize {
442        let streams = self
443            .streams
444            .read()
445            .unwrap_or_else(std::sync::PoisonError::into_inner);
446        streams.len()
447    }
448
449    /// Returns the total number of events across all streams.
450    #[must_use]
451    pub fn event_count(&self) -> usize {
452        let streams = self
453            .streams
454            .read()
455            .unwrap_or_else(std::sync::PoisonError::into_inner);
456        streams.values().map(|s| s.events.len()).sum()
457    }
458
459    /// Returns statistics about the event store.
460    #[must_use]
461    pub fn stats(&self) -> EventStoreStats {
462        let streams = self
463            .streams
464            .read()
465            .unwrap_or_else(std::sync::PoisonError::into_inner);
466        let total_events: usize = streams.values().map(|s| s.events.len()).sum();
467
468        EventStoreStats {
469            stream_count: streams.len(),
470            total_events,
471            max_events_per_stream: self.config.max_events_per_stream,
472            ttl: self.config.ttl,
473        }
474    }
475}
476
477/// Statistics about the event store.
478#[derive(Debug, Clone)]
479pub struct EventStoreStats {
480    /// Number of streams.
481    pub stream_count: usize,
482    /// Total events across all streams.
483    pub total_events: usize,
484    /// Configured max events per stream.
485    pub max_events_per_stream: usize,
486    /// Configured TTL.
487    pub ttl: Option<Duration>,
488}
489
490/// A shared event store for use across multiple handlers.
491pub type SharedEventStore = Arc<EventStore>;
492
493/// Creates a shared event store with default configuration.
494#[must_use]
495pub fn create_shared_event_store() -> SharedEventStore {
496    Arc::new(EventStore::new())
497}
498
499/// Creates a shared event store with custom configuration.
500#[must_use]
501pub fn create_shared_event_store_with_config(config: EventStoreConfig) -> SharedEventStore {
502    Arc::new(EventStore::with_config(config))
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_store_and_retrieve_event() {
511        let store = EventStore::new();
512
513        let event_id = store.store_event("stream1", Some(serde_json::json!({"test": true})));
514        assert!(!event_id.is_empty());
515
516        let events = store.get_events_after("stream1", None);
517        assert_eq!(events.len(), 1);
518        assert_eq!(events[0].id, event_id);
519        assert!(events[0].data.is_some());
520    }
521
522    #[test]
523    fn test_store_priming_event() {
524        let store = EventStore::new();
525
526        let event_id = store.store_priming_event("stream1");
527        assert!(!event_id.is_empty());
528
529        let events = store.get_events_after("stream1", None);
530        assert_eq!(events.len(), 1);
531        assert!(events[0].data.is_none());
532    }
533
534    #[test]
535    fn test_events_after_id() {
536        let store = EventStore::new();
537
538        let id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
539        let id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
540        let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
541
542        // Get events after id1 (should return id2 and id3)
543        let events = store.get_events_after("stream1", Some(&id1));
544        assert_eq!(events.len(), 2);
545        assert_eq!(events[0].id, id2);
546        assert_eq!(events[1].id, id3);
547
548        // Get events after id2 (should return id3)
549        let events = store.get_events_after("stream1", Some(&id2));
550        assert_eq!(events.len(), 1);
551        assert_eq!(events[0].id, id3);
552
553        // Get events after id3 (should return nothing)
554        let events = store.get_events_after("stream1", Some(&id3));
555        assert!(events.is_empty());
556    }
557
558    #[test]
559    fn test_multiple_streams() {
560        let store = EventStore::new();
561
562        let id1 = store.store_event("stream1", Some(serde_json::json!({"stream": 1})));
563        let id2 = store.store_event("stream2", Some(serde_json::json!({"stream": 2})));
564
565        let events1 = store.get_events_after("stream1", None);
566        let events2 = store.get_events_after("stream2", None);
567
568        assert_eq!(events1.len(), 1);
569        assert_eq!(events1[0].id, id1);
570
571        assert_eq!(events2.len(), 1);
572        assert_eq!(events2[0].id, id2);
573    }
574
575    #[test]
576    fn test_max_events_limit() {
577        let config = EventStoreConfig::default().max_events(3);
578        let store = EventStore::with_config(config);
579
580        let _id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
581        let _id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
582        let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
583        let id4 = store.store_event("stream1", Some(serde_json::json!({"n": 4})));
584
585        // Should only have 3 events (oldest removed)
586        let events = store.get_events_after("stream1", None);
587        assert_eq!(events.len(), 3);
588
589        // First event should be id2 (id1 was evicted)
590        // Actually first should be id2 since we push id4 after id3
591        // With max 3: after adding id4, we have id2, id3, id4
592        assert_eq!(events[1].id, id3);
593        assert_eq!(events[2].id, id4);
594    }
595
596    #[test]
597    fn test_replay_events() {
598        let store = EventStore::new();
599
600        let id1 = store.store_event("stream1", Some(serde_json::json!({"n": 1})));
601        let id2 = store.store_event("stream1", Some(serde_json::json!({"n": 2})));
602        let id3 = store.store_event("stream1", Some(serde_json::json!({"n": 3})));
603
604        let mut replayed = Vec::new();
605        let stream_id = store.replay_events_after(&id1, |event| {
606            replayed.push(event.id.clone());
607        });
608
609        assert_eq!(stream_id, Some("stream1".to_string()));
610        assert_eq!(replayed, vec![id2, id3]);
611    }
612
613    #[test]
614    fn test_replay_unknown_event_id() {
615        let store = EventStore::new();
616        store.store_event("stream1", Some(serde_json::json!({})));
617
618        let mut replayed = Vec::new();
619        let stream_id = store.replay_events_after("nonexistent", |event| {
620            replayed.push(event.id.clone());
621        });
622
623        assert!(stream_id.is_none());
624        assert!(replayed.is_empty());
625    }
626
627    #[test]
628    fn test_find_stream_for_event() {
629        let store = EventStore::new();
630
631        let id1 = store.store_event("stream1", Some(serde_json::json!({})));
632        let id2 = store.store_event("stream2", Some(serde_json::json!({})));
633
634        assert_eq!(
635            store.find_stream_for_event(&id1),
636            Some("stream1".to_string())
637        );
638        assert_eq!(
639            store.find_stream_for_event(&id2),
640            Some("stream2".to_string())
641        );
642        assert_eq!(store.find_stream_for_event("nonexistent"), None);
643    }
644
645    #[test]
646    fn test_clear_stream() {
647        let store = EventStore::new();
648
649        store.store_event("stream1", Some(serde_json::json!({})));
650        store.store_event("stream2", Some(serde_json::json!({})));
651
652        assert_eq!(store.stream_count(), 2);
653
654        store.clear_stream("stream1");
655
656        assert_eq!(store.stream_count(), 1);
657        assert!(store.get_events_after("stream1", None).is_empty());
658    }
659
660    #[test]
661    fn test_event_expiration() {
662        let config = EventStoreConfig {
663            max_events_per_stream: 100,
664            ttl: Some(Duration::from_millis(10)),
665        };
666        let store = EventStore::with_config(config);
667
668        store.store_event("stream1", Some(serde_json::json!({})));
669
670        // Events should exist initially
671        assert_eq!(store.get_events_after("stream1", None).len(), 1);
672
673        // Wait for expiration
674        std::thread::sleep(Duration::from_millis(20));
675
676        // Events should be gone after cleanup
677        store.cleanup_expired();
678        assert!(store.get_events_after("stream1", None).is_empty());
679    }
680
681    #[test]
682    fn test_no_expiration() {
683        let config = EventStoreConfig::no_expiry();
684        let store = EventStore::with_config(config);
685
686        store.store_event("stream1", Some(serde_json::json!({})));
687
688        // Even after cleanup, events should remain
689        store.cleanup_expired();
690        assert_eq!(store.get_events_after("stream1", None).len(), 1);
691    }
692
693    #[test]
694    fn test_stats() {
695        let store = EventStore::new();
696
697        store.store_event("stream1", Some(serde_json::json!({})));
698        store.store_event("stream1", Some(serde_json::json!({})));
699        store.store_event("stream2", Some(serde_json::json!({})));
700
701        let stats = store.stats();
702        assert_eq!(stats.stream_count, 2);
703        assert_eq!(stats.total_events, 3);
704    }
705
706    #[test]
707    fn test_shared_event_store() {
708        let store = create_shared_event_store();
709
710        // Clone for multiple "handlers"
711        let store1 = Arc::clone(&store);
712        let store2 = Arc::clone(&store);
713
714        store1.store_event("stream1", Some(serde_json::json!({"from": 1})));
715        store2.store_event("stream1", Some(serde_json::json!({"from": 2})));
716
717        assert_eq!(store.event_count(), 2);
718    }
719
720    #[test]
721    fn test_unique_event_ids() {
722        let store = EventStore::new();
723
724        let id1 = store.store_event("stream1", None);
725        let id2 = store.store_event("stream1", None);
726        let id3 = store.store_event("stream2", None);
727
728        // All IDs should be unique
729        assert_ne!(id1, id2);
730        assert_ne!(id2, id3);
731        assert_ne!(id1, id3);
732    }
733
734    #[test]
735    fn test_config_builder() {
736        let config = EventStoreConfig::default()
737            .max_events(50)
738            .ttl(Duration::from_secs(300));
739
740        assert_eq!(config.max_events_per_stream, 50);
741        assert_eq!(config.ttl, Some(Duration::from_secs(300)));
742
743        let config = config.no_ttl();
744        assert!(config.ttl.is_none());
745    }
746
747    // =========================================================================
748    // Additional coverage tests (bd-3dn2)
749    // =========================================================================
750
751    #[test]
752    fn event_store_config_default_values() {
753        let config = EventStoreConfig::default();
754        assert_eq!(config.max_events_per_stream, DEFAULT_MAX_EVENTS_PER_STREAM);
755        assert_eq!(config.ttl, Some(Duration::from_secs(DEFAULT_TTL_SECS)));
756    }
757
758    #[test]
759    fn event_store_default_trait() {
760        let store = EventStore::default();
761        assert_eq!(store.stream_count(), 0);
762        assert_eq!(store.event_count(), 0);
763        assert_eq!(
764            store.config().max_events_per_stream,
765            DEFAULT_MAX_EVENTS_PER_STREAM
766        );
767    }
768
769    #[test]
770    fn event_store_config_accessor() {
771        let config = EventStoreConfig::no_expiry().max_events(42);
772        let store = EventStore::with_config(config);
773        assert_eq!(store.config().max_events_per_stream, 42);
774        assert!(store.config().ttl.is_none());
775    }
776
777    #[test]
778    fn get_events_after_nonexistent_stream_returns_empty() {
779        let store = EventStore::new();
780        store.store_event("stream1", Some(serde_json::json!({})));
781        let events = store.get_events_after("no-such-stream", None);
782        assert!(events.is_empty());
783    }
784
785    #[test]
786    fn get_events_after_unknown_id_returns_empty() {
787        let store = EventStore::new();
788        store.store_event("stream1", Some(serde_json::json!({})));
789        // An unknown after_id returns empty per SSE spec (client should reconnect fresh)
790        let events = store.get_events_after("stream1", Some("bogus-id"));
791        assert!(events.is_empty());
792    }
793
794    #[test]
795    fn create_shared_event_store_with_config_works() {
796        let config = EventStoreConfig::no_expiry().max_events(5);
797        let store = create_shared_event_store_with_config(config);
798        assert_eq!(store.config().max_events_per_stream, 5);
799        assert!(store.config().ttl.is_none());
800    }
801
802    #[test]
803    fn cleanup_expired_removes_empty_streams() {
804        let config = EventStoreConfig {
805            max_events_per_stream: 100,
806            ttl: Some(Duration::from_millis(10)),
807        };
808        let store = EventStore::with_config(config);
809
810        store.store_event("stream1", Some(serde_json::json!({})));
811        store.store_event("stream2", Some(serde_json::json!({})));
812        assert_eq!(store.stream_count(), 2);
813
814        std::thread::sleep(Duration::from_millis(20));
815        store.cleanup_expired();
816
817        // Streams whose events all expired should be removed entirely
818        assert_eq!(store.stream_count(), 0);
819        assert_eq!(store.event_count(), 0);
820    }
821
822    #[test]
823    fn event_store_stats_includes_config_fields() {
824        let config = EventStoreConfig::no_expiry().max_events(77);
825        let store = EventStore::with_config(config);
826        store.store_event("s1", None);
827
828        let stats = store.stats();
829        assert_eq!(stats.stream_count, 1);
830        assert_eq!(stats.total_events, 1);
831        assert_eq!(stats.max_events_per_stream, 77);
832        assert!(stats.ttl.is_none());
833    }
834
835    // =========================================================================
836    // Additional coverage tests (bd-1w9n)
837    // =========================================================================
838
839    #[test]
840    fn event_entry_debug_and_clone() {
841        let entry = EventEntry::new("ev1".into(), "s1".into(), Some(serde_json::json!(42)));
842        let debug = format!("{entry:?}");
843        assert!(debug.contains("ev1"));
844        assert!(debug.contains("s1"));
845
846        let cloned = entry.clone();
847        assert_eq!(cloned.id, "ev1");
848        assert_eq!(cloned.stream_id, "s1");
849    }
850
851    #[test]
852    fn event_entry_is_expired_not_expired() {
853        let entry = EventEntry::new("ev1".into(), "s1".into(), None);
854        // Just created → not expired with 1h TTL
855        assert!(!entry.is_expired(Some(Duration::from_secs(3600))));
856        // Never expires with None TTL
857        assert!(!entry.is_expired(None));
858    }
859
860    #[test]
861    fn event_store_config_debug_and_clone() {
862        let config = EventStoreConfig::default().max_events(10);
863        let debug = format!("{config:?}");
864        assert!(debug.contains("10"));
865
866        let cloned = config.clone();
867        assert_eq!(cloned.max_events_per_stream, 10);
868    }
869
870    #[test]
871    fn event_store_stats_debug_and_clone() {
872        let store = EventStore::new();
873        store.store_event("s1", None);
874        let stats = store.stats();
875        let debug = format!("{stats:?}");
876        assert!(debug.contains("EventStoreStats"));
877
878        let cloned = stats.clone();
879        assert_eq!(cloned.stream_count, 1);
880    }
881
882    #[test]
883    fn event_store_debug() {
884        let store = EventStore::new();
885        let debug = format!("{store:?}");
886        assert!(debug.contains("EventStore"));
887    }
888
889    #[test]
890    fn cleanup_expired_noop_with_no_ttl() {
891        let config = EventStoreConfig::no_expiry();
892        let store = EventStore::with_config(config);
893        store.store_event("s1", Some(serde_json::json!(1)));
894        store.store_event("s2", Some(serde_json::json!(2)));
895        store.cleanup_expired();
896        // Nothing should be removed
897        assert_eq!(store.event_count(), 2);
898        assert_eq!(store.stream_count(), 2);
899    }
900
901    #[test]
902    fn clear_stream_nonexistent_is_noop() {
903        let store = EventStore::new();
904        store.store_event("s1", None);
905        store.clear_stream("no-such-stream");
906        assert_eq!(store.stream_count(), 1);
907    }
908
909    #[test]
910    fn event_id_format_contains_dash() {
911        let store = EventStore::new();
912        let id = store.store_event("s1", None);
913        assert!(id.contains('-'), "event ID should contain a dash: {id}");
914    }
915}