tiny_counter/store/
mod.rs

1pub mod builder;
2pub(crate) mod inner;
3pub mod limiter;
4pub mod query;
5
6use std::collections::{HashMap, HashSet};
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Duration, Utc};
10use dashmap::DashMap;
11
12use crate::config_converter::convert_if_needed;
13use crate::{
14    Clock, DeltaQuery, Error, EventCounterConfig, Formatter, MultiQuery, Query, RatioQuery, Result,
15    SingleEventCounter, Storage, SystemClock, TimeUnit,
16};
17
18use self::inner::EventStoreInner;
19use self::limiter::Limiter;
20
21/// Top-level store for tracking multiple events with automatic counter creation.
22///
23/// EventStore provides a high-level API for recording and querying events.
24/// Each event is tracked independently with its own SingleEventCounter.
25/// Most methods use &self and internal locking for thread safety.
26///
27/// # Thread Safety and Sharing
28///
29/// EventStore is NOT Clone. To share across threads:
30/// - Use `Arc<EventStore>` for read-heavy workloads (record, query)
31/// - Use `Arc<Mutex<EventStore>>` when calling `compact()` (&mut self methods)
32///
33/// Auto-persistence is configured via the builder and managed internally.
34///
35/// # Drop and Synchronous I/O Behavior
36///
37/// **IMPORTANT**: When an EventStore is dropped, it performs **synchronous I/O** by calling
38/// [`persist()`](Self::persist) if any events are dirty. This ensures data is not lost, but has implications:
39///
40/// ## When Drop I/O Matters
41///
42/// - **Async contexts**: Drop runs synchronously and will block the executor. This can cause
43///   performance issues or even deadlocks in async code.
44/// - **Performance-sensitive code**: Drop may take time proportional to the number of dirty events
45///   and the storage backend's performance.
46/// - **Panic unwinding**: If a panic occurs, Drop still runs, but persistence errors are silently
47///   ignored (they cannot be returned during unwinding).
48///
49/// ## Best Practices for Cleanup
50///
51/// **Option 1 - Use `close()`** (recommended for clarity):
52///
53/// ```rust
54/// # #[cfg(feature = "serde")]
55/// # {
56/// use tiny_counter::{EventStore, storage::MemoryStorage};
57///
58/// # async fn example() {
59/// let store = EventStore::builder()
60///     .with_storage(MemoryStorage::new())
61///     .build()
62///     .unwrap();
63///
64/// store.record("user_action");
65///
66/// // Explicitly close with error handling
67/// store.close().expect("Failed to close store");
68/// // Store is now consumed
69/// # }
70/// # }
71/// ```
72///
73/// **Option 2 - Persist then drop**:
74///
75/// ```rust
76/// # #[cfg(feature = "serde")]
77/// # {
78/// use tiny_counter::{EventStore, storage::MemoryStorage};
79///
80/// # async fn example() {
81/// let store = EventStore::builder()
82///     .with_storage(MemoryStorage::new())
83///     .build()
84///     .unwrap();
85///
86/// store.record("user_action");
87///
88/// // Explicitly persist before drop in async context
89/// store.persist().expect("Failed to persist events");
90/// // Now drop is safe - either no dirty data, or we already handled errors
91/// drop(store);
92/// # }
93/// # }
94/// ```
95///
96/// **With auto-persist**: If you configured auto-persist via the builder, Drop still performs
97/// a final persist to catch any events recorded after the last auto-persist cycle:
98///
99/// ```rust
100/// # #[cfg(all(feature = "tokio", feature = "serde"))]
101/// # {
102/// use tiny_counter::{EventStore, storage::MemoryStorage};
103/// use chrono::Duration;
104///
105/// # async fn example() {
106/// let store = EventStore::builder()
107///     .with_storage(MemoryStorage::new())
108///     .auto_persist(Duration::seconds(60))  // Background task every 60s
109///     .build()
110///     .unwrap();
111///
112/// store.record("event1");
113/// // Auto-persist will eventually save this
114///
115/// tokio::time::sleep(std::time::Duration::from_millis(100)).await;
116///
117/// store.record("event2");
118/// // This might not be saved yet by auto-persist!
119///
120/// // Option 1: Explicit persist before drop (recommended for critical data)
121/// store.persist().expect("Failed to persist");
122/// drop(store);
123///
124/// // Option 2: Let Drop handle it (blocks executor, errors ignored)
125/// // drop(store);  // Will call persist() synchronously
126/// # }
127/// # }
128/// ```
129///
130/// ## Tradeoffs
131///
132/// - **Drop with persist**: Convenient, prevents data loss, but blocks and may fail silently
133/// - **Explicit persist**: More verbose, but allows error handling and avoids blocking Drop
134/// - **Auto-persist**: Reduces manual persist calls, but Drop still needed for final flush
135///
136/// ## Alternative: No-Op Drop
137///
138/// If you don't configure storage (no `with_storage()` in builder), Drop is a no-op and has
139/// no I/O implications.
140pub struct EventStore {
141    pub(crate) inner: Arc<EventStoreInner>,
142    #[cfg(feature = "tokio")]
143    pub(crate) auto_persist_handle: Option<tokio::task::JoinHandle<()>>,
144}
145
146impl EventStore {
147    /// Creates a new EventStore with default configuration.
148    ///
149    /// Default configuration includes 6 time units totaling 256 buckets:
150    /// - 60 Minutes
151    /// - 72 Hours
152    /// - 56 Days
153    /// - 52 Weeks
154    /// - 12 Months
155    /// - 4 Years
156    pub fn new() -> Self {
157        Self::from_parts(
158            SystemClock::new(),
159            None,
160            None,
161            EventCounterConfig::default(),
162        )
163    }
164
165    /// Gets current time from clock.
166    pub fn clock_now(&self) -> DateTime<Utc> {
167        self.inner.clock_now()
168    }
169
170    /// Creates an EventStore from individual components.
171    ///
172    /// This is an internal constructor used by the builder.
173    pub(crate) fn from_parts(
174        clock: Arc<dyn Clock>,
175        storage: Option<Box<dyn Storage>>,
176        formatter: Option<Arc<dyn crate::Formatter>>,
177        config: EventCounterConfig,
178    ) -> Self {
179        Self {
180            inner: Arc::new(EventStoreInner {
181                events: DashMap::new(),
182                clock,
183                storage: storage.map(|s| Arc::new(Mutex::new(s))),
184                formatter,
185                config,
186            }),
187            #[cfg(feature = "tokio")]
188            auto_persist_handle: None,
189        }
190    }
191
192    /// Records a single event at the current time.
193    ///
194    /// # Examples
195    ///
196    /// ```
197    /// use tiny_counter::EventStore;
198    ///
199    /// let store = EventStore::new();
200    /// store.record("app_launch");
201    /// store.record("button_click");
202    /// ```
203    pub fn record(&self, event_id: impl EventId) {
204        self.record_count(event_id, 1);
205    }
206
207    /// Records multiple events at the current time.
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use tiny_counter::EventStore;
213    ///
214    /// let store = EventStore::new();
215    /// store.record_count("api_call", 5);
216    /// store.record_count("page_view", 10);
217    /// ```
218    pub fn record_count(&self, event_id: impl EventId, count: u32) {
219        let counter = self.inner.get_counter_for_record(event_id.as_ref());
220        let now = self.inner.clock.now();
221        let mut counter = counter.lock().unwrap();
222        counter.advance_if_needed(now);
223        counter.record(count);
224        counter.mark_dirty();
225    }
226
227    /// Records a single event at a specific time.
228    ///
229    /// # Examples
230    ///
231    /// ```
232    /// use tiny_counter::EventStore;
233    /// use chrono::{Duration, Utc};
234    ///
235    /// let store = EventStore::new();
236    /// let two_days_ago = Utc::now() - Duration::days(2);
237    /// store.record_at("feature_used", two_days_ago).unwrap();
238    /// ```
239    ///
240    /// # Errors
241    ///
242    /// Returns `Error::FutureEvent` if the timestamp is in the future.
243    ///
244    /// **Note on old events**: Events beyond the tracking window are silently dropped with no error.
245    /// This is intentional - the library uses fixed-size rotating buckets, and old data falls off
246    /// as new data arrives. This represents a loss of granularity (e.g., "25 hours ago" becomes
247    /// "1 day ago") rather than complete data loss. For production use, prefer `record()` for
248    /// real-time events. Use `record_at()` and `record_ago()` primarily for testing and backfilling.
249    pub fn record_at(&self, event_id: impl EventId, time: DateTime<Utc>) -> Result<()> {
250        self.record_count_at(event_id, 1, time)
251    }
252
253    /// Records multiple events at a specific time.
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// use tiny_counter::EventStore;
259    /// use chrono::{Duration, Utc};
260    ///
261    /// let store = EventStore::new();
262    /// let yesterday = Utc::now() - Duration::days(1);
263    /// store.record_count_at("sync_event", 3, yesterday).unwrap();
264    /// ```
265    ///
266    /// # Errors
267    ///
268    /// Returns `Error::FutureEvent` if the timestamp is in the future.
269    ///
270    /// **Note on old events**: Events beyond the tracking window are silently dropped with no error.
271    /// This is intentional - the library uses fixed-size rotating buckets, and old data falls off
272    /// as new data arrives. This represents a loss of granularity (e.g., "25 hours ago" becomes
273    /// "1 day ago") rather than complete data loss. For production use, prefer `record()` for
274    /// real-time events. Use `record_count_at()` and `record_count_ago()` primarily for testing and backfilling.
275    pub fn record_count_at(
276        &self,
277        event_id: impl EventId,
278        count: u32,
279        time: DateTime<Utc>,
280    ) -> Result<()> {
281        let counter = self.inner.get_counter_for_record(event_id.as_ref());
282        let now = self.inner.clock.now();
283        let mut counter = counter.lock().unwrap();
284        counter.advance_if_needed(now);
285        counter.record_at(count, time)?;
286        counter.mark_dirty();
287        Ok(())
288    }
289
290    /// Records a single event that occurred a duration ago.
291    ///
292    /// **Important**: Events outside the tracking window are silently dropped with no error or warning.
293    /// The tracking window depends on your configuration. With default settings (256 buckets across
294    /// 6 time units), events older than approximately 4 years are dropped.
295    ///
296    /// This represents a loss of granularity (e.g., "25 hours ago" becomes "1 day ago"), not complete
297    /// data loss. The library uses fixed-size rotating buckets - old data falls off as new data arrives.
298    ///
299    /// **Recommendation**: Use `record()` for production real-time events. Use `record_ago()` primarily
300    /// for testing queries or backfilling recent historical data within the tracking window.
301    ///
302    /// This method never returns an error. Use [`record_at`](Self::record_at) if you need
303    /// to detect when events fall outside the tracking window.
304    ///
305    /// # Examples
306    ///
307    /// ```
308    /// use tiny_counter::EventStore;
309    /// use chrono::Duration;
310    ///
311    /// let store = EventStore::new();
312    /// store.record_ago("sync", Duration::hours(3));
313    ///
314    /// // Events too old are dropped silently
315    /// store.record_ago("ancient", Duration::days(365 * 10));
316    /// let sum = store.query("ancient").ever().sum();
317    /// assert_eq!(sum, Some(0)); // Event was dropped
318    /// ```
319    pub fn record_ago(&self, event_id: impl EventId, duration: Duration) {
320        self.record_count_ago(event_id, 1, duration);
321    }
322
323    /// Records multiple events that occurred a duration ago.
324    ///
325    /// **Important**: Events outside the tracking window are silently dropped with no error or warning.
326    /// The tracking window depends on your configuration. With default settings (256 buckets across
327    /// 6 time units), events older than approximately 4 years are dropped.
328    ///
329    /// This represents a loss of granularity (e.g., "25 hours ago" becomes "1 day ago"), not complete
330    /// data loss. The library uses fixed-size rotating buckets - old data falls off as new data arrives.
331    ///
332    /// **Recommendation**: Use `record_count()` for production real-time events. Use `record_count_ago()`
333    /// primarily for testing queries or backfilling recent historical data within the tracking window.
334    ///
335    /// This method never returns an error. Use [`record_count_at`](Self::record_count_at) if you need
336    /// to detect when events fall outside the tracking window.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// use tiny_counter::EventStore;
342    /// use chrono::Duration;
343    ///
344    /// let store = EventStore::new();
345    /// store.record_count_ago("batch_process", 5, Duration::days(1));
346    ///
347    /// // Events too old are dropped silently
348    /// store.record_count_ago("ancient_batch", 100, Duration::days(365 * 10));
349    /// let sum = store.query("ancient_batch").ever().sum();
350    /// assert_eq!(sum, Some(0)); // Events were dropped
351    /// ```
352    pub fn record_count_ago(&self, event_id: impl EventId, count: u32, duration: Duration) {
353        let now = self.inner.clock.now();
354        let time = now - duration;
355        // Events outside tracking window are silently dropped.
356        // This matches the API design where record_ago methods are infallible.
357        let _ = self.record_count_at(event_id, count, time);
358    }
359
360    /// Creates a query builder for a single event.
361    ///
362    /// # Examples
363    ///
364    /// ```
365    /// use tiny_counter::EventStore;
366    ///
367    /// let store = EventStore::new();
368    /// store.record("app_launch");
369    ///
370    /// let count = store.query("app_launch").last_days(7).sum();
371    /// assert_eq!(count, Some(1));
372    /// ```
373    pub fn query(&self, event_id: impl EventId) -> Query {
374        Query::new(self.inner.clone(), event_id.as_ref().to_string())
375    }
376
377    /// Creates a query builder for multiple events.
378    ///
379    /// Combines counts from multiple events into a single sum.
380    ///
381    /// # Examples
382    ///
383    /// ```
384    /// use tiny_counter::EventStore;
385    ///
386    /// let store = EventStore::new();
387    /// store.record("app_launch");
388    /// store.record("app_resume");
389    ///
390    /// let total_opens = store
391    ///     .query_many(&["app_launch", "app_resume"])
392    ///     .last_days(7)
393    ///     .sum();
394    ///
395    /// assert_eq!(total_opens, Some(2));
396    /// ```
397    pub fn query_many(&self, event_ids: &[impl EventId]) -> MultiQuery {
398        let event_ids_owned: Vec<String> =
399            event_ids.iter().map(|s| s.as_ref().to_string()).collect();
400        MultiQuery::new(self.inner.clone(), event_ids_owned)
401    }
402
403    /// Creates a ratio query builder for two events.
404    ///
405    /// Calculates the ratio of numerator to denominator events.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// use tiny_counter::EventStore;
411    ///
412    /// let store = EventStore::new();
413    /// store.record_count("conversions", 25);
414    /// store.record_count("visits", 100);
415    ///
416    /// let conversion_rate = store
417    ///     .query_ratio("conversions", "visits")
418    ///     .last_days(7);
419    ///
420    /// assert_eq!(conversion_rate, Some(0.25));
421    /// ```
422    pub fn query_ratio(&self, numerator: impl EventId, denominator: impl EventId) -> RatioQuery {
423        RatioQuery::new(
424            self.inner.clone(),
425            numerator.as_ref().to_string(),
426            denominator.as_ref().to_string(),
427        )
428    }
429
430    /// Creates a delta query builder for two events.
431    ///
432    /// Calculates the net change (positive - negative) between two events.
433    ///
434    /// # Examples
435    ///
436    /// ```
437    /// use tiny_counter::EventStore;
438    ///
439    /// let store = EventStore::new();
440    /// store.record_count("items_added", 10);
441    /// store.record_count("items_removed", 3);
442    ///
443    /// let inventory_change = store
444    ///     .query_delta("items_added", "items_removed")
445    ///     .last_days(7)
446    ///     .sum();
447    ///
448    /// assert_eq!(inventory_change, 7);
449    /// ```
450    pub fn query_delta(&self, positive: impl EventId, negative: impl EventId) -> DeltaQuery {
451        DeltaQuery::new(
452            self.inner.clone(),
453            positive.as_ref().to_string(),
454            negative.as_ref().to_string(),
455        )
456    }
457
458    /// Returns whether any events have been modified since the last persist.
459    pub fn is_dirty(&self) -> bool {
460        for entry in self.inner.events.iter() {
461            let counter = entry.value().lock().unwrap();
462            if counter.is_dirty() {
463                return true;
464            }
465        }
466        false
467    }
468
469    /// Creates a rate limiter builder for checking constraints.
470    ///
471    /// Use this to create complex rate limiting rules with multiple constraints.
472    ///
473    /// # Examples
474    ///
475    /// ```
476    /// use tiny_counter::{EventStore, TimeUnit};
477    ///
478    /// let store = EventStore::new();
479    ///
480    /// let result = store
481    ///     .limit()
482    ///     .at_most("api_call", 10, TimeUnit::Minutes)
483    ///     .at_most("api_call", 100, TimeUnit::Hours)
484    ///     .check_and_record("api_call");
485    ///
486    /// assert!(result.is_ok());
487    /// ```
488    pub fn limit(&self) -> Limiter {
489        Limiter::new(self.inner.clone())
490    }
491
492    /// Reconciles the delta between two events and records to balance them.
493    ///
494    /// This method calculates the all-time delta (positive - negative) and:
495    /// - If delta > 0: records delta to the negative event
496    /// - If delta < 0: records |delta| to the positive event
497    /// - If delta == 0: does nothing
498    ///
499    /// This is useful for tracking net changes like credits/debits or joins/leaves.
500    ///
501    /// # Examples
502    ///
503    /// ```
504    /// use tiny_counter::EventStore;
505    ///
506    /// let store = EventStore::new();
507    /// store.record_count("credits", 100);
508    /// store.record_count("debits", 30);
509    ///
510    /// // Balance adds 70 to debits to equalize
511    /// store.balance_delta("credits", "debits").unwrap();
512    ///
513    /// let delta = store.query_delta("credits", "debits").ever().sum();
514    /// assert_eq!(delta, 0);
515    /// ```
516    pub fn balance_delta(&self, positive: impl EventId, negative: impl EventId) -> Result<()> {
517        let positive_str = positive.as_ref();
518        let negative_str = negative.as_ref();
519
520        // Query all-time delta
521        let delta = self.query_delta(positive_str, negative_str).ever().sum();
522
523        if delta > 0 {
524            // Positive delta: record to negative event to balance
525            self.record_count(negative_str, delta.min(u32::MAX as i64) as u32);
526        } else if delta < 0 {
527            // Negative delta: record |delta| to positive event to balance
528            self.record_count(positive_str, (-delta).min(u32::MAX as i64) as u32);
529        }
530        // delta == 0: no-op
531
532        Ok(())
533    }
534
535    /// Persists only dirty (modified) events to storage.
536    ///
537    /// This method performs synchronous I/O to save modified events to the configured storage
538    /// backend. It's automatically called by the Drop implementation when the EventStore is
539    /// dropped, but **explicit calls are recommended** in async contexts and for error handling.
540    ///
541    /// Returns an error if no storage is configured or if serialization/storage fails.
542    /// Requires a formatter - either a built-in formatter (serde-bincode, serde-json) or a custom implementation.
543    ///
544    /// # Best Practices
545    ///
546    /// - **Async contexts**: Call `persist()` explicitly before the store goes out of scope
547    ///   to avoid blocking the executor during Drop
548    /// - **Error handling**: Explicit calls allow you to handle persistence errors, while
549    ///   Drop silently ignores errors
550    /// - **With auto-persist**: Still call `persist()` before drop to ensure the final batch
551    ///   of events is saved and to catch any errors
552    ///
553    /// # Examples
554    ///
555    /// ```rust
556    /// # #[cfg(feature = "serde")]
557    /// # {
558    /// use tiny_counter::{EventStore, storage::MemoryStorage};
559    ///
560    /// let store = EventStore::builder()
561    ///     .with_storage(MemoryStorage::new())
562    ///     .build()
563    ///     .unwrap();
564    ///
565    /// store.record("event");
566    /// assert!(store.is_dirty());
567    ///
568    /// // Explicit persist with error handling
569    /// store.persist().expect("Failed to persist");
570    /// assert!(!store.is_dirty());
571    /// # }
572    /// ```
573    pub fn persist(&self) -> Result<()> {
574        self.persist_if_dirty(false)
575    }
576
577    /// Explicitly persist and close the event store.
578    ///
579    /// This is a convenience method that calls [`persist()`](Self::persist) and then consumes
580    /// the EventStore. It's semantically clearer than calling `persist()` + `drop()` and makes
581    /// the intent of cleanup explicit.
582    ///
583    /// **Preferred over relying on Drop** in production code, especially in:
584    /// - Async contexts where Drop would block the executor
585    /// - Code where error handling is important
586    /// - Shutdown sequences where explicit cleanup is desired
587    ///
588    /// # Examples
589    ///
590    /// ```rust
591    /// # #[cfg(feature = "serde")]
592    /// # {
593    /// use tiny_counter::{EventStore, storage::MemoryStorage};
594    ///
595    /// let store = EventStore::builder()
596    ///     .with_storage(MemoryStorage::new())
597    ///     .build()
598    ///     .unwrap();
599    ///
600    /// store.record("event");
601    ///
602    /// // Explicit cleanup with error handling
603    /// store.close().expect("Failed to close store");
604    /// // Store is now consumed and dropped
605    /// # }
606    /// ```
607    pub fn close(self) -> Result<()> {
608        self.persist()
609        // self is dropped here after successful persist
610    }
611
612    fn persist_if_dirty(&self, force_dirty: bool) -> Result<()> {
613        let storage = self
614            .inner
615            .storage
616            .as_ref()
617            .ok_or_else(|| Error::Storage("No storage configured".to_string()))?;
618
619        let mut storage = storage.lock().unwrap();
620
621        let formatter = self
622            .inner
623            .formatter
624            .as_ref()
625            .ok_or_else(|| Error::Serialization("No formatter configured".to_string()))?;
626
627        // Begin transaction for atomic multi-event persistence
628        storage.begin_transaction()?;
629
630        let longest_time_unit = self.inner.config.specified_time_unit(TimeUnit::Ever);
631        let persist_result = (|| {
632            for entry in self.inner.events.iter() {
633                let counter = entry.value();
634                let event_id = entry.key();
635                self.persist_counter(
636                    &mut **storage,
637                    &**formatter,
638                    event_id,
639                    counter,
640                    longest_time_unit,
641                    force_dirty,
642                )?;
643            }
644            Ok(())
645        })();
646
647        // Commit or rollback based on result
648        match persist_result {
649            Ok(()) => {
650                storage.commit_transaction()?;
651                Ok(())
652            }
653            Err(e) => {
654                // Attempt rollback, but return original error
655                let _ = storage.rollback_transaction();
656                Err(e)
657            }
658        }
659    }
660
661    /// Persists all events to storage, regardless of dirty status.
662    ///
663    /// Returns an error if no storage is configured or if serialization/storage fails.
664    pub fn persist_all(&self) -> Result<()> {
665        self.persist_if_dirty(true)
666    }
667
668    /// Clears the dirty flag on all events without persisting.
669    ///
670    /// Use this when you want to mark all events as clean without saving.
671    pub fn reset_dirty(&self) {
672        for entry in self.inner.events.iter() {
673            let mut counter = entry.value().lock().unwrap();
674            counter.reset_dirty();
675        }
676    }
677
678    /// Compacts storage by loading all events, advancing to current time,
679    /// persisting back to storage, and clearing memory.
680    ///
681    /// This method:
682    /// - Loads all events from storage into memory (triggers convert_if_needed)
683    /// - Advances all counters to current time
684    /// - Persists all counters (saves non-empty, deletes empty)
685    /// - Clears in-memory cache (events will be lazy-loaded as needed)
686    ///
687    /// # Thread Safety
688    ///
689    /// Requires `&mut self` for exclusive access. When sharing EventStore across
690    /// threads, wrap in `Arc<Mutex<EventStore>>` to safely call this method.
691    pub fn compact(&mut self) -> Result<()> {
692        // Verify storage is configured
693        if self.inner.storage.is_none() {
694            return Err(Error::Storage("No storage configured".to_string()));
695        }
696
697        // List all keys from storage
698        let keys = {
699            let storage = self.inner.storage.as_ref().unwrap();
700            let storage_guard = storage.lock().unwrap();
701            storage_guard.list_keys()?
702        };
703
704        let now = self.inner.clock.now();
705
706        // Load all events into memory and advance them
707        for key in keys {
708            // get_counter_for_query loads from storage if not already in memory
709            // and automatically triggers convert_if_needed
710            if let Some(counter) = self.inner.get_counter_for_query(&key) {
711                let mut counter_guard = counter.lock().unwrap();
712                counter_guard.advance_if_needed(now);
713            }
714        }
715
716        // Persist all in-memory counters (handles save/delete logic)
717        self.persist_all()?;
718
719        // Clear in-memory cache to free memory
720        self.inner.events.clear();
721
722        Ok(())
723    }
724
725    /// Returns the approximate memory usage in bytes for all tracked events.
726    ///
727    /// This includes the bucket storage for all intervals across all events.
728    pub fn memory_usage(&self) -> usize {
729        let mut total: usize = 0;
730        for entry in self.inner.events.iter() {
731            let counter = entry.value().lock().unwrap();
732            // Calculate memory for each interval in the counter
733            total = total.saturating_add(counter.memory_usage())
734        }
735        total
736    }
737
738    /// Returns a list of tracked time units and their bucket counts.
739    ///
740    /// This reflects the default configuration used for new events.
741    pub fn tracked_intervals(&self) -> Vec<(TimeUnit, usize)> {
742        self.inner
743            .config
744            .as_vec()
745            .iter()
746            .map(|config| (config.time_unit(), config.bucket_count()))
747            .collect()
748    }
749
750    /// Exports all event counters.
751    ///
752    /// Returns a HashMap mapping event IDs to their SingleEventCounter instances.
753    /// Useful for serialization, backup, or multi-device sync.
754    ///
755    /// This method queries storage to find all event_ids on disk and loads any
756    /// counters that aren't already in memory, ensuring a complete snapshot.
757    pub fn export_all(&self) -> Result<HashMap<String, SingleEventCounter>> {
758        let mut result = HashMap::new();
759
760        // Collect all event_ids from both memory and storage
761        let mut all_event_ids: HashSet<String> = HashSet::new();
762
763        // Add all event_ids from memory
764        for entry in self.inner.events.iter() {
765            all_event_ids.insert(entry.key().clone());
766        }
767
768        // Add all event_ids from storage (if storage is configured)
769        if let Some(storage) = &self.inner.storage {
770            let storage_guard = storage.lock().unwrap();
771            let storage_keys = storage_guard.list_keys()?;
772            drop(storage_guard); // Release lock before loading counters
773            for key in storage_keys {
774                all_event_ids.insert(key);
775            }
776        }
777
778        // Load each counter (from memory or storage) and add to result
779        for event_id in all_event_ids {
780            if let Some(counter_arc) = self.inner.get_counter_for_query(&event_id) {
781                let counter = counter_arc.lock().unwrap();
782                result.insert(event_id, counter.clone());
783            }
784        }
785
786        Ok(result)
787    }
788
789    /// Exports only dirty (modified) event counters.
790    ///
791    /// Returns a HashMap of event IDs to SingleEventCounter for events that have
792    /// been modified since the last persist or reset_dirty.
793    pub fn export_dirty(&self) -> Result<HashMap<String, SingleEventCounter>> {
794        let mut result = HashMap::new();
795        for entry in self.inner.events.iter() {
796            let counter = entry.value().lock().unwrap();
797            if counter.is_dirty() {
798                result.insert(entry.key().clone(), counter.clone());
799            }
800        }
801        Ok(result)
802    }
803
804    /// Imports a single event counter, creating or replacing it.
805    ///
806    /// If the event already exists, it is replaced entirely (not merged).
807    /// Marks the event as dirty after import.
808    pub fn import_event(&self, event_id: impl EventId, counter: SingleEventCounter) -> Result<()> {
809        let event_id_str = event_id.as_ref();
810        let mut counter = convert_if_needed(counter, &self.inner.config);
811        counter.mark_dirty();
812        self.inner
813            .events
814            .insert(event_id_str.to_string(), Arc::new(Mutex::new(counter)));
815        Ok(())
816    }
817
818    /// Imports multiple event counters, creating or replacing them.
819    ///
820    /// This is a batch version of import_event.
821    pub fn import_all(&self, events: HashMap<String, SingleEventCounter>) -> Result<()> {
822        for (event_id, counter) in events {
823            self.import_event(event_id, counter)?;
824        }
825        Ok(())
826    }
827
828    /// Merges a single event counter into the store.
829    ///
830    /// If the event doesn't exist, creates it with the merged counter.
831    /// If it exists, merges the counts using SingleEventCounter::merge.
832    /// Marks the event as dirty after merge.
833    pub fn merge_event(&self, event_id: impl EventId, counter: SingleEventCounter) -> Result<()> {
834        let event_id_str = event_id.as_ref();
835        if let Some(existing_entry) = self.inner.get_counter_for_query(event_id_str) {
836            let mut existing = existing_entry.lock().unwrap();
837            existing.merge(counter)?;
838            existing.mark_dirty();
839        } else {
840            let mut new_counter = counter;
841            new_counter.mark_dirty();
842            self.inner
843                .events
844                .insert(event_id_str.to_string(), Arc::new(Mutex::new(new_counter)));
845        }
846
847        Ok(())
848    }
849
850    /// Merges multiple event counters into the store.
851    ///
852    /// This is a batch version of merge_event.
853    pub fn merge_all(&self, events: HashMap<String, SingleEventCounter>) -> Result<()> {
854        for (event_id, counter) in events {
855            self.merge_event(event_id, counter)?;
856        }
857        Ok(())
858    }
859
860    pub fn merge(&self, other: Self) -> Result<()> {
861        let events = other.export_all()?;
862        self.merge_all(events)
863    }
864
865    /// Spawns a background task for automatic persistence.
866    ///
867    /// This method creates a tokio task that periodically checks if the store
868    /// is dirty and persists if needed. The task runs until aborted.
869    ///
870    /// Returns a JoinHandle that can be used to abort the task.
871    #[cfg(feature = "tokio")]
872    #[allow(dead_code)] // Will be used by builder in ev-5e4
873    pub(crate) fn spawn_auto_persist(
874        inner: Arc<EventStoreInner>,
875        interval: chrono::Duration,
876    ) -> tokio::task::JoinHandle<()> {
877        tokio::spawn(async move {
878            // Convert to std::time::Duration at tokio boundary
879            let std_interval = interval
880                .to_std()
881                .expect("auto_persist interval must be positive");
882
883            // Create a temporary BaseEventStore to call methods
884            let store = EventStore {
885                inner: inner.clone(),
886                auto_persist_handle: None,
887            };
888
889            loop {
890                tokio::time::sleep(std_interval).await;
891
892                if store.is_dirty() {
893                    if let Err(e) = store.persist() {
894                        eprintln!("Auto-persist failed: {}", e);
895                    }
896                }
897            }
898        })
899    }
900
901    fn persist_counter(
902        &self,
903        storage: &mut dyn Storage,
904        formatter: &dyn Formatter,
905        event_id: &str,
906        counter: &Mutex<SingleEventCounter>,
907        longest_time_unit: TimeUnit,
908        force_dirty: bool,
909    ) -> Result<()> {
910        if let Some(data) = {
911            // We get the lock, so we can get the data and serialize it.
912            let mut counter = counter.lock().unwrap();
913            if force_dirty || counter.is_dirty() {
914                // We optimistically mark the event as not dirty
915                if !counter.is_empty(longest_time_unit) {
916                    let data = formatter.serialize(&counter)?;
917                    counter.reset_dirty();
918                    Some(data)
919                } else {
920                    counter.reset_dirty();
921                    None
922                }
923            } else {
924                return Ok(());
925            }
926            // We release the lock here, with the data serialized, and marked as not dirty.
927        } {
928            // We're now out of the lock, and can let the storage take as long as it wants to
929            // store the event.
930            // If another thread now records another event, then dirty will be set to true,
931            // but it will have missed this save, and will be done next time. That's ok.
932            storage.save(event_id, data)
933        } else {
934            storage.delete(event_id)
935        }
936        .map_err(|_e| {
937            // If storage errors, then we should re-mark the event as dirty.
938            // Dirty may already be set to true (if another thread has recorded an event),
939            // but we won't lose that just because we serialized.
940            // We'll just try again later.
941            let mut counter = counter.lock().unwrap();
942            counter.mark_dirty();
943            _e
944        })
945    }
946}
947
948impl Default for EventStore {
949    fn default() -> Self {
950        Self::new()
951    }
952}
953
954/// Drop implementation for EventStore.
955///
956/// **IMPORTANT**: This implementation performs **synchronous I/O** which can block the
957/// current thread. See the struct-level documentation for [`EventStore`] for best practices.
958///
959/// When an EventStore is dropped:
960/// 1. The background auto-persist task is aborted (if configured)
961/// 2. A final `persist()` is attempted if the store has dirty (unsaved) events
962///
963/// # Behavior Details
964///
965/// - **Blocking I/O**: The persist operation blocks until complete. In async contexts,
966///   this blocks the executor thread.
967/// - **Silent errors**: Any errors during persist are ignored (logged to stderr in debug
968///   builds). Errors cannot be returned from Drop.
969/// - **Panic safety**: Drop runs during panic unwinding, so errors are swallowed to avoid
970///   double-panics.
971///
972/// # Recommendations
973///
974/// Call [`persist()`](Self::persist) explicitly before dropping to:
975/// - Avoid blocking in async code
976/// - Handle errors properly
977/// - Make cleanup behavior explicit
978impl Drop for EventStore {
979    fn drop(&mut self) {
980        // Abort the background auto-persist task if present.
981        // This ensures the task doesn't outlive the EventStore and prevents
982        // potential use-after-free of the inner Arc<EventStoreInner>.
983        #[cfg(feature = "tokio")]
984        if let Some(handle) = &self.auto_persist_handle {
985            handle.abort();
986        }
987
988        // Perform a final synchronous persist if there are unsaved changes.
989        // This is a best-effort attempt - errors are silently ignored because:
990        // 1. Drop cannot return an error
991        // 2. Drop may be called during panic unwinding (can't panic in Drop)
992        // 3. For critical data, users should call persist() explicitly before drop
993        if self.is_dirty() {
994            let _ = self.persist();
995            // Errors are intentionally ignored. In production code, call persist()
996            // explicitly before dropping the store to handle errors properly.
997        }
998    }
999}
1000
1001/// A generic event id trait for type-safe event identification.
1002///
1003/// This trait enables using custom types (especially enums) as event identifiers,
1004/// providing compile-time guarantees about which events exist in your system.
1005///
1006/// # Benefits of Type-Safe Event IDs
1007///
1008/// Using enums instead of raw strings provides several advantages:
1009///
1010/// 1. **Bounded event set**: Compiler enforces a fixed set of events, preventing typos
1011/// 2. **Memory safety**: Enum variants guarantee a bounded number of event counters
1012/// 3. **Refactoring safety**: Renaming events shows all usage sites at compile time
1013/// 4. **Documentation**: Enum definition serves as single source of truth for all events
1014///
1015/// # Memory Implications
1016///
1017/// Each unique event ID creates a new counter in memory (~2KB per event with default config).
1018/// Using patterns like `format!("user:{}:event", user_id)` creates unbounded event IDs
1019/// that grow with your user base, defeating the fixed-memory guarantee.
1020///
1021/// **Safe pattern** (bounded events):
1022/// ```rust
1023/// use tiny_counter::{EventStore, EventId};
1024///
1025/// #[derive(Debug)]
1026/// enum AppEvent {
1027///     UserLogin,
1028///     UserLogout,
1029///     ApiCall,
1030/// }
1031///
1032/// impl AsRef<str> for AppEvent {
1033///     fn as_ref(&self) -> &str {
1034///         match self {
1035///             AppEvent::UserLogin => "user_login",
1036///             AppEvent::UserLogout => "user_logout",
1037///             AppEvent::ApiCall => "api_call",
1038///         }
1039///     }
1040/// }
1041///
1042/// impl EventId for AppEvent {}
1043///
1044/// let store = EventStore::new();
1045/// store.record(AppEvent::UserLogin);  // Type-safe, bounded memory
1046/// ```
1047///
1048/// **Unsafe pattern** (unbounded events, avoid this):
1049/// ```rust,no_run
1050/// # use tiny_counter::EventStore;
1051/// let store = EventStore::new();
1052/// for user_id in 0..1_000_000 {
1053///     // WARNING: Creates 1M separate counters, ~2GB memory!
1054///     store.record(format!("user:{}:login", user_id));
1055/// }
1056/// ```
1057///
1058/// For per-user tracking, use a single event ID and separate EventStore instances per user,
1059/// or aggregate at the application level rather than in the event store.
1060pub trait EventId: AsRef<str> {}
1061
1062impl EventId for str {}
1063impl EventId for String {}
1064impl EventId for &str {}
1065impl EventId for &String {}
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070
1071    use chrono::TimeZone;
1072
1073    #[test]
1074    fn test_new_creates_empty_store_with_default_intervals() {
1075        let store = EventStore::new();
1076        assert!(!store.is_dirty());
1077
1078        // Verify default intervals work by querying each time unit
1079        let intervals = store.tracked_intervals();
1080        assert_eq!(intervals.len(), 6);
1081    }
1082
1083    #[test]
1084    fn test_default_configs_have_256_buckets_total() {
1085        let config = EventCounterConfig::default();
1086        let total: usize = config.as_vec().iter().map(|c| c.bucket_count()).sum();
1087        assert_eq!(total, 256);
1088    }
1089
1090    #[test]
1091    fn test_record_creates_counter_on_demand() {
1092        let store = EventStore::new();
1093        store.record("test_event");
1094
1095        // Verify counter was created by querying the event
1096        let sum = store.query("test_event").last_days(1).sum();
1097        assert_eq!(sum, Some(1));
1098    }
1099
1100    #[test]
1101    fn test_record_count_with_count_5() {
1102        let store = EventStore::new();
1103        store.record_count("test_event", 5);
1104
1105        let sum = store.query("test_event").last_days(1).sum();
1106        assert_eq!(sum, Some(5));
1107    }
1108
1109    #[test]
1110    fn test_record_at_with_past_time() {
1111        let store = EventStore::new();
1112        let now = Utc::now();
1113        let past = now - Duration::days(2);
1114
1115        store.record_at("test_event", past).unwrap();
1116
1117        // Verify event was recorded
1118        assert!(store.is_dirty());
1119        let sum = store.query("test_event").last_days(7).sum();
1120        assert_eq!(sum, Some(1));
1121    }
1122
1123    #[test]
1124    fn test_record_ago_with_duration() {
1125        let store = EventStore::new();
1126        store.record_ago("test_event", Duration::hours(3));
1127
1128        // Verify event was recorded
1129        assert!(store.is_dirty());
1130        let sum = store.query("test_event").last_hours(24).sum();
1131        assert_eq!(sum, Some(1));
1132    }
1133
1134    #[test]
1135    fn test_query_returns_query_builder() {
1136        let store = EventStore::new();
1137        store.record("test_event");
1138
1139        let query = store.query("test_event");
1140        let sum = query.last_days(1).sum();
1141        assert_eq!(sum, Some(1));
1142    }
1143
1144    #[test]
1145    fn test_query_nonexistent_event_returns_none() {
1146        let store = EventStore::new();
1147        let query = store.query("nonexistent");
1148        let sum = query.last_days(1).sum();
1149        assert_eq!(sum, None);
1150    }
1151
1152    #[test]
1153    fn test_query_many() {
1154        let store = EventStore::new();
1155        store.record_count("event1", 5);
1156        store.record_count("event2", 3);
1157
1158        let event_ids = &["event1", "event2"];
1159        let query = store.query_many(event_ids);
1160        let sum = query.last_days(1).sum();
1161        assert_eq!(sum, Some(8));
1162    }
1163
1164    #[test]
1165    fn test_query_ratio() {
1166        let store = EventStore::new();
1167        store.record_count("numerator", 10);
1168        store.record_count("denominator", 5);
1169
1170        let ratio = store.query_ratio("numerator", "denominator").last_days(1);
1171        assert_eq!(ratio, Some(2.0));
1172    }
1173
1174    #[test]
1175    fn test_query_delta() {
1176        let store = EventStore::new();
1177        store.record_count("positive", 10);
1178        store.record_count("negative", 3);
1179
1180        let delta = store.query_delta("positive", "negative").last_days(1).sum();
1181        assert_eq!(delta, 7);
1182    }
1183
1184    #[test]
1185    fn test_dirty_tracking_starts_clean() {
1186        let store = EventStore::new();
1187        assert!(!store.is_dirty());
1188    }
1189
1190    #[test]
1191    fn test_dirty_tracking_becomes_dirty_after_record() {
1192        let store = EventStore::new();
1193        store.record("test_event");
1194        assert!(store.is_dirty());
1195    }
1196
1197    #[test]
1198    fn test_query_returns_none_for_nonexistent_event() {
1199        let store = EventStore::new();
1200        let result = store.query("nonexistent").last_days(1).sum();
1201        assert_eq!(result, None);
1202    }
1203
1204    #[test]
1205    fn test_integration_record_multiple_events_query_each() {
1206        let store = EventStore::new();
1207
1208        // Record different events
1209        store.record_count("login", 5);
1210        store.record_count("logout", 3);
1211        store.record_count("error", 1);
1212
1213        // Query each
1214        let login_sum = store.query("login").last_days(1).sum();
1215        let logout_sum = store.query("logout").last_days(1).sum();
1216        let error_sum = store.query("error").last_days(1).sum();
1217
1218        assert_eq!(login_sum, Some(5));
1219        assert_eq!(logout_sum, Some(3));
1220        assert_eq!(error_sum, Some(1));
1221    }
1222
1223    #[test]
1224    fn test_record_creates_counter_with_default_config() {
1225        let store = EventStore::new();
1226        store.record("test_event");
1227
1228        // Verify counter has all 4 default intervals by querying each
1229        assert!(store.query("test_event").last_minutes(1).sum().is_some());
1230        assert!(store.query("test_event").last_hours(1).sum().is_some());
1231        assert!(store.query("test_event").last_days(1).sum().is_some());
1232        assert!(store.query("test_event").last_months(1).sum().is_some());
1233    }
1234
1235    #[test]
1236    fn test_default_trait() {
1237        let store = EventStore::default();
1238        assert!(!store.is_dirty());
1239
1240        // Verify default intervals are configured
1241        let intervals = store.tracked_intervals();
1242        assert_eq!(intervals.len(), 6);
1243    }
1244
1245    #[test]
1246    fn test_default_config_includes_all_six_time_units() {
1247        let store = EventStore::new();
1248        store.record("test_event");
1249
1250        // Verify that all 6 time units are in the default configuration
1251        let intervals = store.tracked_intervals();
1252        let time_units: Vec<TimeUnit> = intervals.iter().map(|(unit, _)| *unit).collect();
1253
1254        assert!(time_units.contains(&TimeUnit::Minutes));
1255        assert!(time_units.contains(&TimeUnit::Hours));
1256        assert!(time_units.contains(&TimeUnit::Days));
1257        assert!(time_units.contains(&TimeUnit::Weeks));
1258        assert!(time_units.contains(&TimeUnit::Months));
1259        assert!(time_units.contains(&TimeUnit::Years));
1260
1261        // Verify that all 6 default time units work
1262        assert!(store.query("test_event").last_minutes(1).sum().is_some());
1263        assert!(store.query("test_event").last_hours(1).sum().is_some());
1264        assert!(store.query("test_event").last_days(1).sum().is_some());
1265        assert!(store.query("test_event").last_weeks(1).sum().is_some());
1266        assert!(store.query("test_event").last_months(1).sum().is_some());
1267        assert!(store.query("test_event").last_years(1).sum().is_some());
1268    }
1269
1270    #[test]
1271    fn test_record_at_with_time_before_creation() {
1272        let store = EventStore::new();
1273        let now = Utc.with_ymd_and_hms(2025, 1, 10, 12, 0, 0).unwrap();
1274
1275        // Mock time by using TestClock would be better, but for this test
1276        // we'll just verify it doesn't panic
1277        let past = now - Duration::days(2);
1278        let result = store.record_at("test_event", past);
1279
1280        // Should succeed (not a future event)
1281        assert!(result.is_ok());
1282    }
1283
1284    #[test]
1285    fn test_multiple_records_to_same_event() {
1286        let store = EventStore::new();
1287        store.record("test_event");
1288        store.record("test_event");
1289        store.record("test_event");
1290
1291        let sum = store.query("test_event").last_days(1).sum();
1292        assert_eq!(sum, Some(3));
1293    }
1294
1295    #[test]
1296    fn test_record_count_ago() {
1297        let store = EventStore::new();
1298        store.record_count_ago("test_event", 5, Duration::hours(2));
1299
1300        // Should have recorded in the past
1301        let sum = store.query("test_event").last_hours(24).sum();
1302        assert_eq!(sum, Some(5));
1303    }
1304
1305    #[test]
1306    fn test_record_ago_outside_tracking_window_silently_drops() {
1307        let store = EventStore::new();
1308
1309        // Default store tracks up to 4 years in the past
1310        // Record an event 10 years ago - should be silently dropped
1311        store.record_ago("ancient_event", Duration::days(365 * 10));
1312
1313        // Query returns Some(0) because event counter exists but event was dropped
1314        let sum = store.query("ancient_event").ever().sum();
1315        assert_eq!(sum, Some(0));
1316    }
1317
1318    #[test]
1319    fn test_record_count_ago_outside_tracking_window_silently_drops() {
1320        let store = EventStore::new();
1321
1322        // Default store tracks up to 4 years in the past
1323        // Record events 10 years ago - should be silently dropped
1324        store.record_count_ago("ancient_event", 100, Duration::days(365 * 10));
1325
1326        // Query returns Some(0) because event counter exists but events were dropped
1327        let sum = store.query("ancient_event").ever().sum();
1328        assert_eq!(sum, Some(0));
1329    }
1330
1331    #[test]
1332    fn test_balance_delta_positive() {
1333        let store = EventStore::new();
1334        store.record_count("credits", 10);
1335        store.record_count("debits", 3);
1336
1337        // Delta = 10 - 3 = 7 (positive), so record 7 to debits
1338        store.balance_delta("credits", "debits").unwrap();
1339
1340        let credits_sum = store.query("credits").ever().sum();
1341        let debits_sum = store.query("debits").ever().sum();
1342
1343        assert_eq!(credits_sum, Some(10));
1344        assert_eq!(debits_sum, Some(10)); // 3 + 7 = 10
1345    }
1346
1347    #[test]
1348    fn test_balance_delta_negative() {
1349        let store = EventStore::new();
1350        store.record_count("credits", 3);
1351        store.record_count("debits", 10);
1352
1353        // Delta = 3 - 10 = -7 (negative), so record 7 to credits
1354        store.balance_delta("credits", "debits").unwrap();
1355
1356        let credits_sum = store.query("credits").ever().sum();
1357        let debits_sum = store.query("debits").ever().sum();
1358
1359        assert_eq!(credits_sum, Some(10)); // 3 + 7 = 10
1360        assert_eq!(debits_sum, Some(10));
1361    }
1362
1363    #[test]
1364    fn test_balance_delta_zero() {
1365        let store = EventStore::new();
1366        store.record_count("credits", 10);
1367        store.record_count("debits", 10);
1368
1369        // Delta = 10 - 10 = 0, so no-op
1370        store.balance_delta("credits", "debits").unwrap();
1371
1372        let credits_sum = store.query("credits").ever().sum();
1373        let debits_sum = store.query("debits").ever().sum();
1374
1375        assert_eq!(credits_sum, Some(10));
1376        assert_eq!(debits_sum, Some(10));
1377    }
1378
1379    #[test]
1380    fn test_reset_dirty() {
1381        let store = EventStore::new();
1382        store.record("test_event");
1383        assert!(store.is_dirty());
1384
1385        store.reset_dirty();
1386        assert!(!store.is_dirty());
1387    }
1388
1389    #[test]
1390    fn test_memory_usage_empty_store() {
1391        let store = EventStore::new();
1392        assert_eq!(store.memory_usage(), 0);
1393    }
1394
1395    #[test]
1396    fn test_memory_usage_with_events() {
1397        let store = EventStore::new();
1398        store.record("test_event");
1399
1400        // Memory usage should be greater than 0
1401        let usage = store.memory_usage();
1402        assert!(usage > 0);
1403    }
1404
1405    #[test]
1406    fn test_tracked_intervals_returns_default_config() {
1407        let store = EventStore::new();
1408        let intervals = store.tracked_intervals();
1409
1410        assert_eq!(intervals.len(), 6);
1411        assert!(intervals.contains(&(TimeUnit::Minutes, 60)));
1412        assert!(intervals.contains(&(TimeUnit::Hours, 72)));
1413        assert!(intervals.contains(&(TimeUnit::Days, 56)));
1414        assert!(intervals.contains(&(TimeUnit::Weeks, 52)));
1415        assert!(intervals.contains(&(TimeUnit::Months, 12)));
1416        assert!(intervals.contains(&(TimeUnit::Years, 4)));
1417    }
1418
1419    #[cfg(feature = "serde-bincode")]
1420    #[test]
1421    fn test_persist_without_storage_returns_error() {
1422        let store = EventStore::new();
1423        store.record("test_event");
1424
1425        let result = store.persist();
1426        assert!(result.is_err());
1427        match result.unwrap_err() {
1428            Error::Storage(_) => (),
1429            _ => panic!("Expected Storage error"),
1430        }
1431    }
1432
1433    #[cfg(feature = "serde-bincode")]
1434    #[test]
1435    fn test_persist_all_without_storage_returns_error() {
1436        let store = EventStore::new();
1437        store.record("test_event");
1438
1439        let result = store.persist_all();
1440        assert!(result.is_err());
1441        match result.unwrap_err() {
1442            Error::Storage(_) => (),
1443            _ => panic!("Expected Storage error"),
1444        }
1445    }
1446
1447    #[cfg(feature = "serde-bincode")]
1448    #[test]
1449    fn test_persist_with_storage() {
1450        use crate::storage::MemoryStorage;
1451        use crate::EventStoreBuilder;
1452
1453        let store = EventStoreBuilder::new()
1454            .with_storage(MemoryStorage::new())
1455            .build()
1456            .unwrap();
1457
1458        store.record("event1");
1459        store.record("event2");
1460        assert!(store.is_dirty());
1461
1462        let result = store.persist();
1463        assert!(result.is_ok());
1464        assert!(!store.is_dirty());
1465    }
1466
1467    #[cfg(feature = "serde-bincode")]
1468    #[test]
1469    fn test_close_persists_and_consumes() {
1470        use crate::storage::MemoryStorage;
1471        use crate::EventStoreBuilder;
1472
1473        let storage = MemoryStorage::new();
1474        let store = EventStoreBuilder::new()
1475            .with_storage(storage)
1476            .build()
1477            .unwrap();
1478
1479        store.record("event1");
1480        store.record("event2");
1481        assert!(store.is_dirty());
1482
1483        // close() should persist and consume the store
1484        let result = store.close();
1485        assert!(result.is_ok());
1486        // store is now consumed and cannot be used
1487    }
1488
1489    #[cfg(feature = "serde-bincode")]
1490    #[test]
1491    fn test_close_returns_error_on_persist_failure() {
1492        use crate::EventStoreBuilder;
1493
1494        // Create store without storage - persist will fail
1495        let store = EventStoreBuilder::new().build().unwrap();
1496
1497        store.record("event1");
1498        assert!(store.is_dirty());
1499
1500        // close() should return error when persist fails
1501        let result = store.close();
1502        assert!(result.is_err());
1503        assert!(result
1504            .unwrap_err()
1505            .to_string()
1506            .contains("No storage configured"));
1507    }
1508
1509    #[cfg(feature = "serde-bincode")]
1510    #[test]
1511    fn test_persist_only_dirty_events() {
1512        use crate::storage::MemoryStorage;
1513        use crate::EventStoreBuilder;
1514
1515        let store = EventStoreBuilder::new()
1516            .with_storage(MemoryStorage::new())
1517            .build()
1518            .unwrap();
1519
1520        // Record and persist event1
1521        store.record("event1");
1522        store.persist().unwrap();
1523        assert!(!store.is_dirty());
1524
1525        // Record event2
1526        store.record("event2");
1527        assert!(store.is_dirty());
1528
1529        // Persist should only write event2
1530        store.persist().unwrap();
1531        assert!(!store.is_dirty());
1532    }
1533
1534    #[cfg(feature = "serde-bincode")]
1535    #[test]
1536    fn test_persist_all() {
1537        use crate::storage::MemoryStorage;
1538        use crate::EventStoreBuilder;
1539
1540        let store = EventStoreBuilder::new()
1541            .with_storage(MemoryStorage::new())
1542            .build()
1543            .unwrap();
1544
1545        store.record("event1");
1546        store.record("event2");
1547        store.record("event3");
1548
1549        let result = store.persist_all();
1550        assert!(result.is_ok());
1551        assert!(!store.is_dirty());
1552    }
1553
1554    #[cfg(feature = "serde-bincode")]
1555    #[test]
1556    fn test_serialization_roundtrip() {
1557        use crate::storage::MemoryStorage;
1558        use crate::EventStoreBuilder;
1559
1560        let store = EventStoreBuilder::new()
1561            .with_storage(MemoryStorage::new())
1562            .build()
1563            .unwrap();
1564
1565        store.record_count("test_event", 42);
1566        store.persist().unwrap();
1567
1568        // Verify the counter has the right value
1569        let sum = store.query("test_event").last_days(1).sum();
1570        assert_eq!(sum, Some(42));
1571    }
1572
1573    #[cfg(feature = "serde-bincode")]
1574    #[test]
1575    fn test_persist_clears_dirty_events() {
1576        use crate::storage::MemoryStorage;
1577        use crate::EventStoreBuilder;
1578
1579        let store = EventStoreBuilder::new()
1580            .with_storage(MemoryStorage::new())
1581            .build()
1582            .unwrap();
1583
1584        store.record("event1");
1585        store.record("event2");
1586        assert!(store.is_dirty());
1587
1588        store.persist().unwrap();
1589        assert!(!store.is_dirty());
1590    }
1591
1592    #[cfg(feature = "serde-bincode")]
1593    #[test]
1594    fn test_compact_advances_and_saves() {
1595        use crate::storage::MemoryStorage;
1596        use crate::EventStoreBuilder;
1597
1598        let mut store = EventStoreBuilder::new()
1599            .track_days(7)
1600            .with_storage(MemoryStorage::new())
1601            .build()
1602            .unwrap();
1603
1604        // Record events
1605        store.record("event1");
1606        store.record("event2");
1607
1608        // Persist both
1609        store.persist_all().unwrap();
1610        assert!(!store.is_dirty());
1611
1612        // Compact should re-save all events after advancing
1613        store.compact().unwrap();
1614
1615        // Events should still be queryable
1616        assert_eq!(store.query("event1").last_days(7).sum(), Some(1));
1617        assert_eq!(store.query("event2").last_days(7).sum(), Some(1));
1618    }
1619
1620    #[cfg(feature = "serde-bincode")]
1621    #[test]
1622    fn test_compact_while_recording() {
1623        use crate::storage::MemoryStorage;
1624        use crate::{EventCounterConfig, SystemClock};
1625        use std::thread;
1626
1627        // Create store with from_parts to avoid Clone issue
1628        let base_store = EventStore::from_parts(
1629            SystemClock::new(),
1630            Some(Box::new(MemoryStorage::new())),
1631            Some(Arc::new(crate::formatter::BincodeFormat)),
1632            EventCounterConfig::default(),
1633        );
1634
1635        // Wrap in Arc+Mutex since compact needs &mut self
1636        let store = Arc::new(Mutex::new(base_store));
1637        let store_writer = Arc::clone(&store);
1638        let store_compactor = Arc::clone(&store);
1639
1640        // Writer thread - continuously record
1641        let writer = thread::spawn(move || {
1642            for _ in 0..100 {
1643                let store = store_writer.lock().unwrap();
1644                store.record("compact_event");
1645                drop(store);
1646                thread::sleep(std::time::Duration::from_micros(10));
1647            }
1648        });
1649
1650        // Compactor thread - compact multiple times
1651        let compactor = thread::spawn(move || {
1652            for _ in 0..10 {
1653                thread::sleep(std::time::Duration::from_micros(50));
1654                let mut store = store_compactor.lock().unwrap();
1655                let _ = store.compact();
1656            }
1657        });
1658
1659        writer.join().unwrap();
1660        compactor.join().unwrap();
1661
1662        // Verify all 100 events were recorded (no data loss)
1663        let store = store.lock().unwrap();
1664        let sum = store.query("compact_event").last_days(7).sum();
1665        assert_eq!(sum, Some(100));
1666    }
1667
1668    #[cfg(feature = "serde-bincode")]
1669    #[test]
1670    fn test_compact_without_storage_returns_error() {
1671        let mut store = EventStore::new();
1672        store.record("event1");
1673
1674        let result = store.compact();
1675        assert!(result.is_err());
1676        assert!(result
1677            .unwrap_err()
1678            .to_string()
1679            .contains("No storage configured"));
1680    }
1681
1682    #[cfg(feature = "serde-bincode")]
1683    #[test]
1684    fn test_memory_usage_calculation() {
1685        let store = EventStore::new();
1686        store.record("event1");
1687        store.record("event2");
1688
1689        let usage = store.memory_usage();
1690        // Each event has 6 intervals (default config)
1691        // Each interval has at least 1 bucket (8 bytes)
1692        // So minimum is 2 events * 6 intervals * 1 bucket * 8 bytes = 96 bytes
1693        assert!(usage >= 96);
1694    }
1695
1696    #[test]
1697    fn test_export_all_returns_all_counters() {
1698        let store = EventStore::new();
1699        store.record("event1");
1700        store.record("event2");
1701        store.record("event3");
1702
1703        let exported = store.export_all().unwrap();
1704        assert_eq!(exported.len(), 3);
1705        assert!(exported.contains_key("event1"));
1706        assert!(exported.contains_key("event2"));
1707        assert!(exported.contains_key("event3"));
1708    }
1709
1710    #[test]
1711    fn test_export_dirty_returns_only_dirty_counters() {
1712        let store = EventStore::new();
1713        store.record("event1");
1714        store.record("event2");
1715        store.reset_dirty();
1716
1717        // Now record event3 (should be dirty)
1718        store.record("event3");
1719
1720        let exported = store.export_dirty().unwrap();
1721        assert_eq!(exported.len(), 1);
1722        assert!(exported.contains_key("event3"));
1723        assert!(!exported.contains_key("event1"));
1724        assert!(!exported.contains_key("event2"));
1725    }
1726
1727    #[test]
1728    fn test_export_all_returns_empty_map_for_empty_store() {
1729        let store = EventStore::new();
1730        let exported = store.export_all().unwrap();
1731        assert_eq!(exported.len(), 0);
1732    }
1733
1734    #[test]
1735    fn test_export_dirty_returns_empty_map_when_nothing_dirty() {
1736        let store = EventStore::new();
1737        store.record("event1");
1738        store.reset_dirty();
1739
1740        let exported = store.export_dirty().unwrap();
1741        assert_eq!(exported.len(), 0);
1742    }
1743
1744    #[cfg(feature = "serde-bincode")]
1745    #[test]
1746    fn test_export_all_includes_disk_only_counters() {
1747        use crate::storage::MemoryStorage;
1748        use crate::EventStoreBuilder;
1749
1750        let storage = MemoryStorage::new();
1751        let store = EventStoreBuilder::new()
1752            .with_storage(storage)
1753            .build()
1754            .unwrap();
1755
1756        // Record and persist event1
1757        store.record_count("event1", 10);
1758        store.persist().unwrap();
1759
1760        // Record event2 (stays in memory)
1761        store.record_count("event2", 20);
1762
1763        // Clear memory by dropping store and creating new one with same storage
1764        let storage2 = {
1765            let storage_arc = store.inner.storage.as_ref().unwrap().clone();
1766            let storage_guard = storage_arc.lock().unwrap();
1767            // Create a new MemoryStorage and copy data from old one
1768            let mut new_storage = MemoryStorage::new();
1769            for key in storage_guard.list_keys().unwrap() {
1770                let data = storage_guard.load(&key).unwrap().unwrap();
1771                new_storage.save(&key, data).unwrap();
1772            }
1773            new_storage
1774        };
1775
1776        let store2 = EventStoreBuilder::new()
1777            .with_storage(storage2)
1778            .build()
1779            .unwrap();
1780
1781        // Record event3 (only in memory)
1782        store2.record_count("event3", 30);
1783
1784        // export_all should include:
1785        // - event1 (from storage, not in memory)
1786        // - event3 (from memory, not in storage)
1787        let exported = store2.export_all().unwrap();
1788        assert_eq!(exported.len(), 2);
1789        assert!(exported.contains_key("event1"));
1790        assert!(exported.contains_key("event3"));
1791
1792        // Verify the counts are correct
1793        assert_eq!(store2.query("event1").last_days(1).sum(), Some(10));
1794        assert_eq!(store2.query("event3").last_days(1).sum(), Some(30));
1795    }
1796
1797    #[test]
1798    fn test_import_event_creates_new_counter() {
1799        let store1 = EventStore::new();
1800        store1.record_count("event1", 42);
1801
1802        let exported = store1.export_all().unwrap();
1803        let counter = exported.get("event1").unwrap().clone();
1804
1805        let store2 = EventStore::new();
1806        store2.import_event("event1", counter).unwrap();
1807
1808        let sum = store2.query("event1").last_days(1).sum();
1809        assert_eq!(sum, Some(42));
1810    }
1811
1812    #[test]
1813    fn test_import_event_overwrites_existing() {
1814        let store = EventStore::new();
1815        store.record_count("event1", 10);
1816
1817        let store2 = EventStore::new();
1818        store2.record_count("event1", 42);
1819
1820        let exported = store2.export_all().unwrap();
1821        let counter = exported.get("event1").unwrap().clone();
1822
1823        store.import_event("event1", counter).unwrap();
1824
1825        let sum = store.query("event1").last_days(1).sum();
1826        // Should have overwritten 10 with 42
1827        assert_eq!(sum, Some(42));
1828    }
1829
1830    #[test]
1831    fn test_import_all_batch_imports() {
1832        let store1 = EventStore::new();
1833        store1.record_count("event1", 10);
1834        store1.record_count("event2", 20);
1835        store1.record_count("event3", 30);
1836
1837        let exported = store1.export_all().unwrap();
1838
1839        let store2 = EventStore::new();
1840        store2.import_all(exported).unwrap();
1841
1842        assert_eq!(store2.query("event1").last_days(1).sum(), Some(10));
1843        assert_eq!(store2.query("event2").last_days(1).sum(), Some(20));
1844        assert_eq!(store2.query("event3").last_days(1).sum(), Some(30));
1845    }
1846
1847    #[test]
1848    fn test_merge_event_combines_counts() {
1849        let store1 = EventStore::new();
1850        store1.record_count("event1", 10);
1851
1852        let store2 = EventStore::new();
1853        store2.record_count("event1", 20);
1854
1855        let exported = store2.export_all().unwrap();
1856        let counter = exported.get("event1").unwrap().clone();
1857
1858        store1.merge_event("event1", counter).unwrap();
1859
1860        let sum = store1.query("event1").last_days(1).sum();
1861        // Should have 10 + 20 = 30
1862        assert_eq!(sum, Some(30));
1863    }
1864
1865    #[test]
1866    fn test_merge_event_creates_counter_if_not_exists() {
1867        let store1 = EventStore::new();
1868
1869        let store2 = EventStore::new();
1870        store2.record_count("event1", 42);
1871
1872        let exported = store2.export_all().unwrap();
1873        let counter = exported.get("event1").unwrap().clone();
1874
1875        store1.merge_event("event1", counter).unwrap();
1876
1877        let sum = store1.query("event1").last_days(1).sum();
1878        assert_eq!(sum, Some(42));
1879    }
1880
1881    #[test]
1882    fn test_merge_all_combines_multiple_events() {
1883        let store1 = EventStore::new();
1884        store1.record_count("event1", 10);
1885        store1.record_count("event2", 20);
1886
1887        let store2 = EventStore::new();
1888        store2.record_count("event1", 5);
1889        store2.record_count("event3", 30);
1890
1891        let exported = store2.export_all().unwrap();
1892        store1.merge_all(exported).unwrap();
1893
1894        assert_eq!(store1.query("event1").last_days(1).sum(), Some(15)); // 10 + 5
1895        assert_eq!(store1.query("event2").last_days(1).sum(), Some(20)); // unchanged
1896        assert_eq!(store1.query("event3").last_days(1).sum(), Some(30));
1897        // new
1898    }
1899
1900    #[test]
1901    fn test_merge_is_commutative_at_store_level() {
1902        let store_a1 = EventStore::new();
1903        store_a1.record_count("event1", 10);
1904
1905        let store_b1 = EventStore::new();
1906        store_b1.record_count("event1", 20);
1907
1908        let store_a2 = EventStore::new();
1909        store_a2.record_count("event1", 10);
1910
1911        let store_b2 = EventStore::new();
1912        store_b2.record_count("event1", 20);
1913
1914        // a + b
1915        let b1_export = store_b1.export_all().unwrap();
1916        store_a1.merge_all(b1_export).unwrap();
1917
1918        // b + a
1919        let a2_export = store_a2.export_all().unwrap();
1920        store_b2.merge_all(a2_export).unwrap();
1921
1922        // Both should equal 30
1923        assert_eq!(store_a1.query("event1").last_days(1).sum(), Some(30));
1924        assert_eq!(store_b2.query("event1").last_days(1).sum(), Some(30));
1925    }
1926
1927    #[test]
1928    fn test_merge_is_associative_at_store_level() {
1929        let store_a1 = EventStore::new();
1930        store_a1.record_count("event1", 10);
1931
1932        let store_b1 = EventStore::new();
1933        store_b1.record_count("event1", 20);
1934
1935        let store_c1 = EventStore::new();
1936        store_c1.record_count("event1", 30);
1937
1938        let store_a2 = EventStore::new();
1939        store_a2.record_count("event1", 10);
1940
1941        let store_b2 = EventStore::new();
1942        store_b2.record_count("event1", 20);
1943
1944        let store_c2 = EventStore::new();
1945        store_c2.record_count("event1", 30);
1946
1947        // (a + b) + c
1948        let b1_export = store_b1.export_all().unwrap();
1949        store_a1.merge_all(b1_export).unwrap();
1950        let c1_export = store_c1.export_all().unwrap();
1951        store_a1.merge_all(c1_export).unwrap();
1952
1953        // a + (b + c)
1954        let c2_export = store_c2.export_all().unwrap();
1955        store_b2.merge_all(c2_export).unwrap();
1956        let b2_export = store_b2.export_all().unwrap();
1957        store_a2.merge_all(b2_export).unwrap();
1958
1959        // Both should equal 60
1960        assert_eq!(store_a1.query("event1").last_days(1).sum(), Some(60));
1961        assert_eq!(store_a2.query("event1").last_days(1).sum(), Some(60));
1962    }
1963
1964    // Test for ev-dcq: BaseEventStore with auto_persist_handle field
1965    #[cfg(all(feature = "tokio", feature = "serde"))]
1966    #[tokio::test]
1967    async fn test_base_event_store_has_auto_persist_handle_field() {
1968        use crate::storage::MemoryStorage;
1969        use std::time::Duration;
1970
1971        // Create store with storage
1972        let mut store = EventStore::from_parts(
1973            SystemClock::new(),
1974            Some(Box::new(MemoryStorage::new())),
1975            #[cfg(feature = "serde-bincode")]
1976            Some(Arc::new(crate::formatter::BincodeFormat)),
1977            #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
1978            Some(Arc::new(crate::formatter::JsonFormat)),
1979            EventCounterConfig::default(),
1980        );
1981
1982        // Verify field exists and is None initially
1983        assert!(store.auto_persist_handle.is_none());
1984
1985        // Simulate setting the handle (this will fail until we add the field)
1986        let handle = tokio::spawn(async {
1987            tokio::time::sleep(Duration::from_millis(10)).await;
1988        });
1989        store.auto_persist_handle = Some(handle);
1990        assert!(store.auto_persist_handle.is_some());
1991    }
1992
1993    // Test for ev-dcq: spawn_auto_persist creates background task
1994    #[cfg(all(feature = "tokio", feature = "serde"))]
1995    #[tokio::test]
1996    async fn test_spawn_auto_persist_creates_background_task() {
1997        use crate::storage::MemoryStorage;
1998        use chrono::Duration;
1999
2000        // Create store with storage
2001        let store = EventStore::from_parts(
2002            SystemClock::new(),
2003            Some(Box::new(MemoryStorage::new())),
2004            #[cfg(feature = "serde-bincode")]
2005            Some(Arc::new(crate::formatter::BincodeFormat)),
2006            #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
2007            Some(Arc::new(crate::formatter::JsonFormat)),
2008            EventCounterConfig::default(),
2009        );
2010
2011        // Record an event to make it dirty
2012        store.record("test");
2013        assert!(store.is_dirty());
2014
2015        // Spawn auto-persist task
2016        let handle =
2017            EventStore::spawn_auto_persist(store.inner.clone(), Duration::milliseconds(50));
2018
2019        // Wait for auto-persist to run
2020        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2021
2022        // Store should no longer be dirty
2023        assert!(!store.is_dirty());
2024
2025        // Clean up
2026        handle.abort();
2027    }
2028
2029    // Test for ev-dcq: Drop implementation aborts handle and does final persist
2030    #[cfg(all(feature = "tokio", feature = "serde"))]
2031    #[tokio::test]
2032    async fn test_drop_aborts_handle_and_persists() {
2033        use crate::storage::MemoryStorage;
2034        use std::time::Duration;
2035
2036        // Create store with storage
2037        let mut store = EventStore::from_parts(
2038            SystemClock::new(),
2039            Some(Box::new(MemoryStorage::new())),
2040            #[cfg(feature = "serde-bincode")]
2041            Some(Arc::new(crate::formatter::BincodeFormat)),
2042            #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
2043            Some(Arc::new(crate::formatter::JsonFormat)),
2044            EventCounterConfig::default(),
2045        );
2046
2047        // Spawn a long-running task
2048        let handle = tokio::spawn(async {
2049            loop {
2050                tokio::time::sleep(Duration::from_secs(1)).await;
2051            }
2052        });
2053        store.auto_persist_handle = Some(handle);
2054
2055        // Record an event
2056        store.record("test");
2057        assert!(store.is_dirty());
2058
2059        // Drop the store
2060        drop(store);
2061
2062        // Task should be aborted (we can't directly test this, but Drop runs)
2063        // The test passes if Drop doesn't panic
2064    }
2065
2066    // Test for ev-rh6: Drop persists dirty data even when auto-persist is enabled
2067    #[cfg(all(
2068        feature = "tokio",
2069        feature = "serde",
2070        feature = "serde",
2071        feature = "storage-fs"
2072    ))]
2073    #[tokio::test]
2074    async fn test_drop_persists_dirty_data_with_auto_persist() {
2075        use crate::storage::FilePerEvent;
2076        use chrono::Duration;
2077        use tempfile::tempdir;
2078
2079        let temp_dir = tempdir().unwrap();
2080        let storage_path = temp_dir.path().join("events");
2081
2082        // Create store with auto-persist enabled (short interval)
2083        let store = EventStore::builder()
2084            .with_storage(FilePerEvent::new(&storage_path, ".dat").unwrap())
2085            .auto_persist(Duration::milliseconds(100))
2086            .build()
2087            .unwrap();
2088
2089        // Record first event and wait for auto-persist to run
2090        store.record("event1");
2091        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
2092
2093        // Verify first event was persisted
2094        {
2095            let storage = FilePerEvent::new(&storage_path, ".dat").unwrap();
2096            let data = storage.load("event1").unwrap();
2097            assert!(
2098                data.is_some(),
2099                "First event should be persisted by auto-persist"
2100            );
2101        }
2102
2103        // Record second event (makes store dirty again)
2104        store.record("event2");
2105
2106        // Drop the store BEFORE next auto-persist cycle
2107        drop(store);
2108
2109        // Load from storage and verify BOTH events are present
2110        let storage = FilePerEvent::new(&storage_path, ".dat").unwrap();
2111        let event1_data = storage.load("event1").unwrap();
2112        let event2_data = storage.load("event2").unwrap();
2113
2114        assert!(
2115            event1_data.is_some(),
2116            "First event should still be in storage"
2117        );
2118        assert!(
2119            event2_data.is_some(),
2120            "Second event MUST be persisted on drop, not lost"
2121        );
2122    }
2123}
2124
2125#[cfg(test)]
2126mod concurrency_tests {
2127    use std::thread;
2128
2129    use super::*;
2130
2131    #[test]
2132    fn test_concurrent_record_from_multiple_threads() {
2133        let store = Arc::new(EventStore::new());
2134        let mut handles = vec![];
2135
2136        // Spawn 10 threads, each recording 100 events
2137        for _ in 0..10 {
2138            let store_clone = Arc::clone(&store);
2139            let handle = thread::spawn(move || {
2140                for _ in 0..100 {
2141                    store_clone.record("concurrent_event");
2142                }
2143            });
2144            handles.push(handle);
2145        }
2146
2147        // Wait for all threads to complete
2148        for handle in handles {
2149            handle.join().unwrap();
2150        }
2151
2152        // Should have 1000 total events
2153        let sum = store.query("concurrent_event").last_days(1).sum();
2154        assert_eq!(sum, Some(1000));
2155    }
2156
2157    #[test]
2158    fn test_query_while_recording() {
2159        let store = Arc::new(EventStore::new());
2160        let store_writer = Arc::clone(&store);
2161        let store_reader = Arc::clone(&store);
2162
2163        // Writer thread
2164        let writer = thread::spawn(move || {
2165            for i in 0..100 {
2166                store_writer.record_count("test_event", i);
2167                thread::sleep(std::time::Duration::from_micros(10));
2168            }
2169        });
2170
2171        // Reader thread
2172        let reader = thread::spawn(move || {
2173            for _ in 0..50 {
2174                let _sum = store_reader.query("test_event").last_days(1).sum();
2175                thread::sleep(std::time::Duration::from_micros(20));
2176            }
2177        });
2178
2179        writer.join().unwrap();
2180        reader.join().unwrap();
2181
2182        // Should have sum of 0..100 = 4950
2183        let final_sum = store.query("test_event").last_days(1).sum();
2184        assert_eq!(final_sum, Some(4950));
2185    }
2186
2187    #[cfg(feature = "serde-bincode")]
2188    #[test]
2189    fn test_persist_while_recording() {
2190        use crate::storage::MemoryStorage;
2191
2192        // Create base store directly to avoid enum issues with Arc+Clone
2193        let base_store = EventStore::from_parts(
2194            SystemClock::new(),
2195            Some(Box::new(MemoryStorage::new())),
2196            #[cfg(feature = "serde")]
2197            {
2198                #[cfg(feature = "serde-bincode")]
2199                {
2200                    Some(Arc::new(crate::formatter::BincodeFormat))
2201                }
2202                #[cfg(not(feature = "serde-bincode"))]
2203                {
2204                    None
2205                }
2206            },
2207            EventCounterConfig::default(),
2208        );
2209        let store = Arc::new(base_store);
2210
2211        let store_writer = Arc::clone(&store);
2212        let store_persister = Arc::clone(&store);
2213
2214        // Writer thread
2215        let writer = thread::spawn(move || {
2216            for _ in 0..100 {
2217                store_writer.record("persist_event");
2218                thread::sleep(std::time::Duration::from_micros(10));
2219            }
2220        });
2221
2222        // Persister thread
2223        let persister = thread::spawn(move || {
2224            for _ in 0..20 {
2225                let _ = store_persister.persist();
2226                thread::sleep(std::time::Duration::from_micros(50));
2227            }
2228        });
2229
2230        writer.join().unwrap();
2231        persister.join().unwrap();
2232
2233        // Final persist
2234        store.persist().unwrap();
2235
2236        // Should have 100 events
2237        let sum = store.query("persist_event").last_days(1).sum();
2238        assert_eq!(sum, Some(100));
2239        assert!(!store.is_dirty());
2240    }
2241
2242    #[test]
2243    fn test_concurrent_query_many_from_multiple_threads() {
2244        let store = Arc::new(EventStore::new());
2245        let mut handles = vec![];
2246
2247        // Pre-populate events
2248        store.record_count("event1", 100);
2249        store.record_count("event2", 200);
2250        store.record_count("event3", 300);
2251
2252        // Spawn 10 threads, each querying multiple events 50 times
2253        for _ in 0..10 {
2254            let store_clone = Arc::clone(&store);
2255            let handle = thread::spawn(move || {
2256                for _ in 0..50 {
2257                    let event_ids = &["event1", "event2", "event3"];
2258                    let sum = store_clone.query_many(event_ids).last_days(1).sum();
2259                    // Should always get the same sum
2260                    assert_eq!(sum, Some(600));
2261                }
2262            });
2263            handles.push(handle);
2264        }
2265
2266        // Wait for all threads to complete
2267        for handle in handles {
2268            handle.join().unwrap();
2269        }
2270    }
2271
2272    #[test]
2273    fn test_concurrent_query_ratio_from_multiple_threads() {
2274        let store = Arc::new(EventStore::new());
2275        let mut handles = vec![];
2276
2277        // Pre-populate events
2278        store.record_count("numerator", 100);
2279        store.record_count("denominator", 50);
2280
2281        // Spawn 10 threads, each querying ratio 50 times
2282        for _ in 0..10 {
2283            let store_clone = Arc::clone(&store);
2284            let handle = thread::spawn(move || {
2285                for _ in 0..50 {
2286                    let ratio = store_clone
2287                        .query_ratio("numerator", "denominator")
2288                        .last_days(1);
2289                    // Should always get the same ratio
2290                    assert_eq!(ratio, Some(2.0));
2291                }
2292            });
2293            handles.push(handle);
2294        }
2295
2296        // Wait for all threads to complete
2297        for handle in handles {
2298            handle.join().unwrap();
2299        }
2300    }
2301
2302    #[test]
2303    fn test_concurrent_query_delta_from_multiple_threads() {
2304        let store = Arc::new(EventStore::new());
2305        let mut handles = vec![];
2306
2307        // Pre-populate events
2308        store.record_count("positive", 150);
2309        store.record_count("negative", 50);
2310
2311        // Spawn 10 threads, each querying delta 50 times
2312        for _ in 0..10 {
2313            let store_clone = Arc::clone(&store);
2314            let handle = thread::spawn(move || {
2315                for _ in 0..50 {
2316                    let delta = store_clone
2317                        .query_delta("positive", "negative")
2318                        .last_days(1)
2319                        .sum();
2320                    // Should always get the same delta
2321                    assert_eq!(delta, 100);
2322                }
2323            });
2324            handles.push(handle);
2325        }
2326
2327        // Wait for all threads to complete
2328        for handle in handles {
2329            handle.join().unwrap();
2330        }
2331    }
2332
2333    #[test]
2334    fn test_concurrent_query_many_while_recording() {
2335        let store = Arc::new(EventStore::new());
2336        let store_writer = Arc::clone(&store);
2337        let store_reader = Arc::clone(&store);
2338
2339        // Writer thread
2340        let writer = thread::spawn(move || {
2341            for i in 0..100 {
2342                store_writer.record_count("event1", i);
2343                store_writer.record_count("event2", i * 2);
2344                thread::sleep(std::time::Duration::from_micros(10));
2345            }
2346        });
2347
2348        // Reader thread querying multiple events
2349        let reader = thread::spawn(move || {
2350            for _ in 0..50 {
2351                let event_ids = &["event1", "event2"];
2352                let _sum = store_reader.query_many(event_ids).last_days(1).sum();
2353                thread::sleep(std::time::Duration::from_micros(20));
2354            }
2355        });
2356
2357        writer.join().unwrap();
2358        reader.join().unwrap();
2359
2360        // Verify final sums
2361        let event1_sum = store.query("event1").last_days(1).sum().unwrap();
2362        let event2_sum = store.query("event2").last_days(1).sum().unwrap();
2363        let multi_sum = store
2364            .query_many(&["event1", "event2"])
2365            .last_days(1)
2366            .sum()
2367            .unwrap();
2368
2369        // event1: sum of 0..100 = 4950
2370        // event2: sum of (0..100)*2 = 9900
2371        assert_eq!(event1_sum, 4950);
2372        assert_eq!(event2_sum, 9900);
2373        assert_eq!(multi_sum, 14850);
2374    }
2375
2376    #[test]
2377    fn test_concurrent_query_ratio_while_recording() {
2378        let store = Arc::new(EventStore::new());
2379        let store_writer = Arc::clone(&store);
2380        let store_reader = Arc::clone(&store);
2381
2382        // Writer thread
2383        let writer = thread::spawn(move || {
2384            for i in 1..=100 {
2385                store_writer.record_count("numerator", i * 2);
2386                store_writer.record_count("denominator", i);
2387                thread::sleep(std::time::Duration::from_micros(10));
2388            }
2389        });
2390
2391        // Reader thread querying ratio
2392        let reader = thread::spawn(move || {
2393            for _ in 0..50 {
2394                let _ratio = store_reader
2395                    .query_ratio("numerator", "denominator")
2396                    .last_days(1);
2397                thread::sleep(std::time::Duration::from_micros(20));
2398            }
2399        });
2400
2401        writer.join().unwrap();
2402        reader.join().unwrap();
2403
2404        // Verify final ratio
2405        let ratio = store.query_ratio("numerator", "denominator").last_days(1);
2406        // numerator: sum of 2,4,6,...,200 = 10100
2407        // denominator: sum of 1,2,3,...,100 = 5050
2408        // ratio = 10100/5050 = 2.0
2409        assert_eq!(ratio, Some(2.0));
2410    }
2411
2412    #[test]
2413    fn test_concurrent_query_delta_while_recording() {
2414        let store = Arc::new(EventStore::new());
2415        let store_writer = Arc::clone(&store);
2416        let store_reader = Arc::clone(&store);
2417
2418        // Writer thread
2419        let writer = thread::spawn(move || {
2420            for i in 0..100 {
2421                store_writer.record_count("positive", i * 2);
2422                store_writer.record_count("negative", i);
2423                thread::sleep(std::time::Duration::from_micros(10));
2424            }
2425        });
2426
2427        // Reader thread querying delta
2428        let reader = thread::spawn(move || {
2429            for _ in 0..50 {
2430                let _delta = store_reader
2431                    .query_delta("positive", "negative")
2432                    .last_days(1)
2433                    .sum();
2434                thread::sleep(std::time::Duration::from_micros(20));
2435            }
2436        });
2437
2438        writer.join().unwrap();
2439        reader.join().unwrap();
2440
2441        // Verify final delta
2442        let delta = store.query_delta("positive", "negative").last_days(1).sum();
2443        // positive: sum of 0,2,4,...,198 = 9900
2444        // negative: sum of 0,1,2,...,99 = 4950
2445        // delta = 9900 - 4950 = 4950
2446        assert_eq!(delta, 4950);
2447    }
2448}