1pub mod builder;
2pub(crate) mod inner;
3pub mod limiter;
4pub mod query;
5
6use std::collections::{HashMap, HashSet};
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Duration, Utc};
10use dashmap::DashMap;
11
12use crate::config_converter::convert_if_needed;
13use crate::{
14 Clock, DeltaQuery, Error, EventCounterConfig, Formatter, MultiQuery, Query, RatioQuery, Result,
15 SingleEventCounter, Storage, SystemClock, TimeUnit,
16};
17
18use self::inner::EventStoreInner;
19use self::limiter::Limiter;
20
21pub struct EventStore {
141 pub(crate) inner: Arc<EventStoreInner>,
142 #[cfg(feature = "tokio")]
143 pub(crate) auto_persist_handle: Option<tokio::task::JoinHandle<()>>,
144}
145
146impl EventStore {
147 pub fn new() -> Self {
157 Self::from_parts(
158 SystemClock::new(),
159 None,
160 None,
161 EventCounterConfig::default(),
162 )
163 }
164
165 pub fn clock_now(&self) -> DateTime<Utc> {
167 self.inner.clock_now()
168 }
169
170 pub(crate) fn from_parts(
174 clock: Arc<dyn Clock>,
175 storage: Option<Box<dyn Storage>>,
176 formatter: Option<Arc<dyn crate::Formatter>>,
177 config: EventCounterConfig,
178 ) -> Self {
179 Self {
180 inner: Arc::new(EventStoreInner {
181 events: DashMap::new(),
182 clock,
183 storage: storage.map(|s| Arc::new(Mutex::new(s))),
184 formatter,
185 config,
186 }),
187 #[cfg(feature = "tokio")]
188 auto_persist_handle: None,
189 }
190 }
191
192 pub fn record(&self, event_id: impl EventId) {
204 self.record_count(event_id, 1);
205 }
206
207 pub fn record_count(&self, event_id: impl EventId, count: u32) {
219 let counter = self.inner.get_counter_for_record(event_id.as_ref());
220 let now = self.inner.clock.now();
221 let mut counter = counter.lock().unwrap();
222 counter.advance_if_needed(now);
223 counter.record(count);
224 counter.mark_dirty();
225 }
226
227 pub fn record_at(&self, event_id: impl EventId, time: DateTime<Utc>) -> Result<()> {
250 self.record_count_at(event_id, 1, time)
251 }
252
253 pub fn record_count_at(
276 &self,
277 event_id: impl EventId,
278 count: u32,
279 time: DateTime<Utc>,
280 ) -> Result<()> {
281 let counter = self.inner.get_counter_for_record(event_id.as_ref());
282 let now = self.inner.clock.now();
283 let mut counter = counter.lock().unwrap();
284 counter.advance_if_needed(now);
285 counter.record_at(count, time)?;
286 counter.mark_dirty();
287 Ok(())
288 }
289
290 pub fn record_ago(&self, event_id: impl EventId, duration: Duration) {
320 self.record_count_ago(event_id, 1, duration);
321 }
322
323 pub fn record_count_ago(&self, event_id: impl EventId, count: u32, duration: Duration) {
353 let now = self.inner.clock.now();
354 let time = now - duration;
355 let _ = self.record_count_at(event_id, count, time);
358 }
359
360 pub fn query(&self, event_id: impl EventId) -> Query {
374 Query::new(self.inner.clone(), event_id.as_ref().to_string())
375 }
376
377 pub fn query_many(&self, event_ids: &[impl EventId]) -> MultiQuery {
398 let event_ids_owned: Vec<String> =
399 event_ids.iter().map(|s| s.as_ref().to_string()).collect();
400 MultiQuery::new(self.inner.clone(), event_ids_owned)
401 }
402
403 pub fn query_ratio(&self, numerator: impl EventId, denominator: impl EventId) -> RatioQuery {
423 RatioQuery::new(
424 self.inner.clone(),
425 numerator.as_ref().to_string(),
426 denominator.as_ref().to_string(),
427 )
428 }
429
430 pub fn query_delta(&self, positive: impl EventId, negative: impl EventId) -> DeltaQuery {
451 DeltaQuery::new(
452 self.inner.clone(),
453 positive.as_ref().to_string(),
454 negative.as_ref().to_string(),
455 )
456 }
457
458 pub fn is_dirty(&self) -> bool {
460 for entry in self.inner.events.iter() {
461 let counter = entry.value().lock().unwrap();
462 if counter.is_dirty() {
463 return true;
464 }
465 }
466 false
467 }
468
469 pub fn limit(&self) -> Limiter {
489 Limiter::new(self.inner.clone())
490 }
491
492 pub fn balance_delta(&self, positive: impl EventId, negative: impl EventId) -> Result<()> {
517 let positive_str = positive.as_ref();
518 let negative_str = negative.as_ref();
519
520 let delta = self.query_delta(positive_str, negative_str).ever().sum();
522
523 if delta > 0 {
524 self.record_count(negative_str, delta.min(u32::MAX as i64) as u32);
526 } else if delta < 0 {
527 self.record_count(positive_str, (-delta).min(u32::MAX as i64) as u32);
529 }
530 Ok(())
533 }
534
535 pub fn persist(&self) -> Result<()> {
574 self.persist_if_dirty(false)
575 }
576
577 pub fn close(self) -> Result<()> {
608 self.persist()
609 }
611
612 fn persist_if_dirty(&self, force_dirty: bool) -> Result<()> {
613 let storage = self
614 .inner
615 .storage
616 .as_ref()
617 .ok_or_else(|| Error::Storage("No storage configured".to_string()))?;
618
619 let mut storage = storage.lock().unwrap();
620
621 let formatter = self
622 .inner
623 .formatter
624 .as_ref()
625 .ok_or_else(|| Error::Serialization("No formatter configured".to_string()))?;
626
627 storage.begin_transaction()?;
629
630 let longest_time_unit = self.inner.config.specified_time_unit(TimeUnit::Ever);
631 let persist_result = (|| {
632 for entry in self.inner.events.iter() {
633 let counter = entry.value();
634 let event_id = entry.key();
635 self.persist_counter(
636 &mut **storage,
637 &**formatter,
638 event_id,
639 counter,
640 longest_time_unit,
641 force_dirty,
642 )?;
643 }
644 Ok(())
645 })();
646
647 match persist_result {
649 Ok(()) => {
650 storage.commit_transaction()?;
651 Ok(())
652 }
653 Err(e) => {
654 let _ = storage.rollback_transaction();
656 Err(e)
657 }
658 }
659 }
660
661 pub fn persist_all(&self) -> Result<()> {
665 self.persist_if_dirty(true)
666 }
667
668 pub fn reset_dirty(&self) {
672 for entry in self.inner.events.iter() {
673 let mut counter = entry.value().lock().unwrap();
674 counter.reset_dirty();
675 }
676 }
677
678 pub fn compact(&mut self) -> Result<()> {
692 if self.inner.storage.is_none() {
694 return Err(Error::Storage("No storage configured".to_string()));
695 }
696
697 let keys = {
699 let storage = self.inner.storage.as_ref().unwrap();
700 let storage_guard = storage.lock().unwrap();
701 storage_guard.list_keys()?
702 };
703
704 let now = self.inner.clock.now();
705
706 for key in keys {
708 if let Some(counter) = self.inner.get_counter_for_query(&key) {
711 let mut counter_guard = counter.lock().unwrap();
712 counter_guard.advance_if_needed(now);
713 }
714 }
715
716 self.persist_all()?;
718
719 self.inner.events.clear();
721
722 Ok(())
723 }
724
725 pub fn memory_usage(&self) -> usize {
729 let mut total: usize = 0;
730 for entry in self.inner.events.iter() {
731 let counter = entry.value().lock().unwrap();
732 total = total.saturating_add(counter.memory_usage())
734 }
735 total
736 }
737
738 pub fn tracked_intervals(&self) -> Vec<(TimeUnit, usize)> {
742 self.inner
743 .config
744 .as_vec()
745 .iter()
746 .map(|config| (config.time_unit(), config.bucket_count()))
747 .collect()
748 }
749
750 pub fn export_all(&self) -> Result<HashMap<String, SingleEventCounter>> {
758 let mut result = HashMap::new();
759
760 let mut all_event_ids: HashSet<String> = HashSet::new();
762
763 for entry in self.inner.events.iter() {
765 all_event_ids.insert(entry.key().clone());
766 }
767
768 if let Some(storage) = &self.inner.storage {
770 let storage_guard = storage.lock().unwrap();
771 let storage_keys = storage_guard.list_keys()?;
772 drop(storage_guard); for key in storage_keys {
774 all_event_ids.insert(key);
775 }
776 }
777
778 for event_id in all_event_ids {
780 if let Some(counter_arc) = self.inner.get_counter_for_query(&event_id) {
781 let counter = counter_arc.lock().unwrap();
782 result.insert(event_id, counter.clone());
783 }
784 }
785
786 Ok(result)
787 }
788
789 pub fn export_dirty(&self) -> Result<HashMap<String, SingleEventCounter>> {
794 let mut result = HashMap::new();
795 for entry in self.inner.events.iter() {
796 let counter = entry.value().lock().unwrap();
797 if counter.is_dirty() {
798 result.insert(entry.key().clone(), counter.clone());
799 }
800 }
801 Ok(result)
802 }
803
804 pub fn import_event(&self, event_id: impl EventId, counter: SingleEventCounter) -> Result<()> {
809 let event_id_str = event_id.as_ref();
810 let mut counter = convert_if_needed(counter, &self.inner.config);
811 counter.mark_dirty();
812 self.inner
813 .events
814 .insert(event_id_str.to_string(), Arc::new(Mutex::new(counter)));
815 Ok(())
816 }
817
818 pub fn import_all(&self, events: HashMap<String, SingleEventCounter>) -> Result<()> {
822 for (event_id, counter) in events {
823 self.import_event(event_id, counter)?;
824 }
825 Ok(())
826 }
827
828 pub fn merge_event(&self, event_id: impl EventId, counter: SingleEventCounter) -> Result<()> {
834 let event_id_str = event_id.as_ref();
835 if let Some(existing_entry) = self.inner.get_counter_for_query(event_id_str) {
836 let mut existing = existing_entry.lock().unwrap();
837 existing.merge(counter)?;
838 existing.mark_dirty();
839 } else {
840 let mut new_counter = counter;
841 new_counter.mark_dirty();
842 self.inner
843 .events
844 .insert(event_id_str.to_string(), Arc::new(Mutex::new(new_counter)));
845 }
846
847 Ok(())
848 }
849
850 pub fn merge_all(&self, events: HashMap<String, SingleEventCounter>) -> Result<()> {
854 for (event_id, counter) in events {
855 self.merge_event(event_id, counter)?;
856 }
857 Ok(())
858 }
859
860 pub fn merge(&self, other: Self) -> Result<()> {
861 let events = other.export_all()?;
862 self.merge_all(events)
863 }
864
865 #[cfg(feature = "tokio")]
872 #[allow(dead_code)] pub(crate) fn spawn_auto_persist(
874 inner: Arc<EventStoreInner>,
875 interval: chrono::Duration,
876 ) -> tokio::task::JoinHandle<()> {
877 tokio::spawn(async move {
878 let std_interval = interval
880 .to_std()
881 .expect("auto_persist interval must be positive");
882
883 let store = EventStore {
885 inner: inner.clone(),
886 auto_persist_handle: None,
887 };
888
889 loop {
890 tokio::time::sleep(std_interval).await;
891
892 if store.is_dirty() {
893 if let Err(e) = store.persist() {
894 eprintln!("Auto-persist failed: {}", e);
895 }
896 }
897 }
898 })
899 }
900
901 fn persist_counter(
902 &self,
903 storage: &mut dyn Storage,
904 formatter: &dyn Formatter,
905 event_id: &str,
906 counter: &Mutex<SingleEventCounter>,
907 longest_time_unit: TimeUnit,
908 force_dirty: bool,
909 ) -> Result<()> {
910 if let Some(data) = {
911 let mut counter = counter.lock().unwrap();
913 if force_dirty || counter.is_dirty() {
914 if !counter.is_empty(longest_time_unit) {
916 let data = formatter.serialize(&counter)?;
917 counter.reset_dirty();
918 Some(data)
919 } else {
920 counter.reset_dirty();
921 None
922 }
923 } else {
924 return Ok(());
925 }
926 } {
928 storage.save(event_id, data)
933 } else {
934 storage.delete(event_id)
935 }
936 .map_err(|_e| {
937 let mut counter = counter.lock().unwrap();
942 counter.mark_dirty();
943 _e
944 })
945 }
946}
947
948impl Default for EventStore {
949 fn default() -> Self {
950 Self::new()
951 }
952}
953
954impl Drop for EventStore {
979 fn drop(&mut self) {
980 #[cfg(feature = "tokio")]
984 if let Some(handle) = &self.auto_persist_handle {
985 handle.abort();
986 }
987
988 if self.is_dirty() {
994 let _ = self.persist();
995 }
998 }
999}
1000
1001pub trait EventId: AsRef<str> {}
1061
1062impl EventId for str {}
1063impl EventId for String {}
1064impl EventId for &str {}
1065impl EventId for &String {}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070
1071 use chrono::TimeZone;
1072
1073 #[test]
1074 fn test_new_creates_empty_store_with_default_intervals() {
1075 let store = EventStore::new();
1076 assert!(!store.is_dirty());
1077
1078 let intervals = store.tracked_intervals();
1080 assert_eq!(intervals.len(), 6);
1081 }
1082
1083 #[test]
1084 fn test_default_configs_have_256_buckets_total() {
1085 let config = EventCounterConfig::default();
1086 let total: usize = config.as_vec().iter().map(|c| c.bucket_count()).sum();
1087 assert_eq!(total, 256);
1088 }
1089
1090 #[test]
1091 fn test_record_creates_counter_on_demand() {
1092 let store = EventStore::new();
1093 store.record("test_event");
1094
1095 let sum = store.query("test_event").last_days(1).sum();
1097 assert_eq!(sum, Some(1));
1098 }
1099
1100 #[test]
1101 fn test_record_count_with_count_5() {
1102 let store = EventStore::new();
1103 store.record_count("test_event", 5);
1104
1105 let sum = store.query("test_event").last_days(1).sum();
1106 assert_eq!(sum, Some(5));
1107 }
1108
1109 #[test]
1110 fn test_record_at_with_past_time() {
1111 let store = EventStore::new();
1112 let now = Utc::now();
1113 let past = now - Duration::days(2);
1114
1115 store.record_at("test_event", past).unwrap();
1116
1117 assert!(store.is_dirty());
1119 let sum = store.query("test_event").last_days(7).sum();
1120 assert_eq!(sum, Some(1));
1121 }
1122
1123 #[test]
1124 fn test_record_ago_with_duration() {
1125 let store = EventStore::new();
1126 store.record_ago("test_event", Duration::hours(3));
1127
1128 assert!(store.is_dirty());
1130 let sum = store.query("test_event").last_hours(24).sum();
1131 assert_eq!(sum, Some(1));
1132 }
1133
1134 #[test]
1135 fn test_query_returns_query_builder() {
1136 let store = EventStore::new();
1137 store.record("test_event");
1138
1139 let query = store.query("test_event");
1140 let sum = query.last_days(1).sum();
1141 assert_eq!(sum, Some(1));
1142 }
1143
1144 #[test]
1145 fn test_query_nonexistent_event_returns_none() {
1146 let store = EventStore::new();
1147 let query = store.query("nonexistent");
1148 let sum = query.last_days(1).sum();
1149 assert_eq!(sum, None);
1150 }
1151
1152 #[test]
1153 fn test_query_many() {
1154 let store = EventStore::new();
1155 store.record_count("event1", 5);
1156 store.record_count("event2", 3);
1157
1158 let event_ids = &["event1", "event2"];
1159 let query = store.query_many(event_ids);
1160 let sum = query.last_days(1).sum();
1161 assert_eq!(sum, Some(8));
1162 }
1163
1164 #[test]
1165 fn test_query_ratio() {
1166 let store = EventStore::new();
1167 store.record_count("numerator", 10);
1168 store.record_count("denominator", 5);
1169
1170 let ratio = store.query_ratio("numerator", "denominator").last_days(1);
1171 assert_eq!(ratio, Some(2.0));
1172 }
1173
1174 #[test]
1175 fn test_query_delta() {
1176 let store = EventStore::new();
1177 store.record_count("positive", 10);
1178 store.record_count("negative", 3);
1179
1180 let delta = store.query_delta("positive", "negative").last_days(1).sum();
1181 assert_eq!(delta, 7);
1182 }
1183
1184 #[test]
1185 fn test_dirty_tracking_starts_clean() {
1186 let store = EventStore::new();
1187 assert!(!store.is_dirty());
1188 }
1189
1190 #[test]
1191 fn test_dirty_tracking_becomes_dirty_after_record() {
1192 let store = EventStore::new();
1193 store.record("test_event");
1194 assert!(store.is_dirty());
1195 }
1196
1197 #[test]
1198 fn test_query_returns_none_for_nonexistent_event() {
1199 let store = EventStore::new();
1200 let result = store.query("nonexistent").last_days(1).sum();
1201 assert_eq!(result, None);
1202 }
1203
1204 #[test]
1205 fn test_integration_record_multiple_events_query_each() {
1206 let store = EventStore::new();
1207
1208 store.record_count("login", 5);
1210 store.record_count("logout", 3);
1211 store.record_count("error", 1);
1212
1213 let login_sum = store.query("login").last_days(1).sum();
1215 let logout_sum = store.query("logout").last_days(1).sum();
1216 let error_sum = store.query("error").last_days(1).sum();
1217
1218 assert_eq!(login_sum, Some(5));
1219 assert_eq!(logout_sum, Some(3));
1220 assert_eq!(error_sum, Some(1));
1221 }
1222
1223 #[test]
1224 fn test_record_creates_counter_with_default_config() {
1225 let store = EventStore::new();
1226 store.record("test_event");
1227
1228 assert!(store.query("test_event").last_minutes(1).sum().is_some());
1230 assert!(store.query("test_event").last_hours(1).sum().is_some());
1231 assert!(store.query("test_event").last_days(1).sum().is_some());
1232 assert!(store.query("test_event").last_months(1).sum().is_some());
1233 }
1234
1235 #[test]
1236 fn test_default_trait() {
1237 let store = EventStore::default();
1238 assert!(!store.is_dirty());
1239
1240 let intervals = store.tracked_intervals();
1242 assert_eq!(intervals.len(), 6);
1243 }
1244
1245 #[test]
1246 fn test_default_config_includes_all_six_time_units() {
1247 let store = EventStore::new();
1248 store.record("test_event");
1249
1250 let intervals = store.tracked_intervals();
1252 let time_units: Vec<TimeUnit> = intervals.iter().map(|(unit, _)| *unit).collect();
1253
1254 assert!(time_units.contains(&TimeUnit::Minutes));
1255 assert!(time_units.contains(&TimeUnit::Hours));
1256 assert!(time_units.contains(&TimeUnit::Days));
1257 assert!(time_units.contains(&TimeUnit::Weeks));
1258 assert!(time_units.contains(&TimeUnit::Months));
1259 assert!(time_units.contains(&TimeUnit::Years));
1260
1261 assert!(store.query("test_event").last_minutes(1).sum().is_some());
1263 assert!(store.query("test_event").last_hours(1).sum().is_some());
1264 assert!(store.query("test_event").last_days(1).sum().is_some());
1265 assert!(store.query("test_event").last_weeks(1).sum().is_some());
1266 assert!(store.query("test_event").last_months(1).sum().is_some());
1267 assert!(store.query("test_event").last_years(1).sum().is_some());
1268 }
1269
1270 #[test]
1271 fn test_record_at_with_time_before_creation() {
1272 let store = EventStore::new();
1273 let now = Utc.with_ymd_and_hms(2025, 1, 10, 12, 0, 0).unwrap();
1274
1275 let past = now - Duration::days(2);
1278 let result = store.record_at("test_event", past);
1279
1280 assert!(result.is_ok());
1282 }
1283
1284 #[test]
1285 fn test_multiple_records_to_same_event() {
1286 let store = EventStore::new();
1287 store.record("test_event");
1288 store.record("test_event");
1289 store.record("test_event");
1290
1291 let sum = store.query("test_event").last_days(1).sum();
1292 assert_eq!(sum, Some(3));
1293 }
1294
1295 #[test]
1296 fn test_record_count_ago() {
1297 let store = EventStore::new();
1298 store.record_count_ago("test_event", 5, Duration::hours(2));
1299
1300 let sum = store.query("test_event").last_hours(24).sum();
1302 assert_eq!(sum, Some(5));
1303 }
1304
1305 #[test]
1306 fn test_record_ago_outside_tracking_window_silently_drops() {
1307 let store = EventStore::new();
1308
1309 store.record_ago("ancient_event", Duration::days(365 * 10));
1312
1313 let sum = store.query("ancient_event").ever().sum();
1315 assert_eq!(sum, Some(0));
1316 }
1317
1318 #[test]
1319 fn test_record_count_ago_outside_tracking_window_silently_drops() {
1320 let store = EventStore::new();
1321
1322 store.record_count_ago("ancient_event", 100, Duration::days(365 * 10));
1325
1326 let sum = store.query("ancient_event").ever().sum();
1328 assert_eq!(sum, Some(0));
1329 }
1330
1331 #[test]
1332 fn test_balance_delta_positive() {
1333 let store = EventStore::new();
1334 store.record_count("credits", 10);
1335 store.record_count("debits", 3);
1336
1337 store.balance_delta("credits", "debits").unwrap();
1339
1340 let credits_sum = store.query("credits").ever().sum();
1341 let debits_sum = store.query("debits").ever().sum();
1342
1343 assert_eq!(credits_sum, Some(10));
1344 assert_eq!(debits_sum, Some(10)); }
1346
1347 #[test]
1348 fn test_balance_delta_negative() {
1349 let store = EventStore::new();
1350 store.record_count("credits", 3);
1351 store.record_count("debits", 10);
1352
1353 store.balance_delta("credits", "debits").unwrap();
1355
1356 let credits_sum = store.query("credits").ever().sum();
1357 let debits_sum = store.query("debits").ever().sum();
1358
1359 assert_eq!(credits_sum, Some(10)); assert_eq!(debits_sum, Some(10));
1361 }
1362
1363 #[test]
1364 fn test_balance_delta_zero() {
1365 let store = EventStore::new();
1366 store.record_count("credits", 10);
1367 store.record_count("debits", 10);
1368
1369 store.balance_delta("credits", "debits").unwrap();
1371
1372 let credits_sum = store.query("credits").ever().sum();
1373 let debits_sum = store.query("debits").ever().sum();
1374
1375 assert_eq!(credits_sum, Some(10));
1376 assert_eq!(debits_sum, Some(10));
1377 }
1378
1379 #[test]
1380 fn test_reset_dirty() {
1381 let store = EventStore::new();
1382 store.record("test_event");
1383 assert!(store.is_dirty());
1384
1385 store.reset_dirty();
1386 assert!(!store.is_dirty());
1387 }
1388
1389 #[test]
1390 fn test_memory_usage_empty_store() {
1391 let store = EventStore::new();
1392 assert_eq!(store.memory_usage(), 0);
1393 }
1394
1395 #[test]
1396 fn test_memory_usage_with_events() {
1397 let store = EventStore::new();
1398 store.record("test_event");
1399
1400 let usage = store.memory_usage();
1402 assert!(usage > 0);
1403 }
1404
1405 #[test]
1406 fn test_tracked_intervals_returns_default_config() {
1407 let store = EventStore::new();
1408 let intervals = store.tracked_intervals();
1409
1410 assert_eq!(intervals.len(), 6);
1411 assert!(intervals.contains(&(TimeUnit::Minutes, 60)));
1412 assert!(intervals.contains(&(TimeUnit::Hours, 72)));
1413 assert!(intervals.contains(&(TimeUnit::Days, 56)));
1414 assert!(intervals.contains(&(TimeUnit::Weeks, 52)));
1415 assert!(intervals.contains(&(TimeUnit::Months, 12)));
1416 assert!(intervals.contains(&(TimeUnit::Years, 4)));
1417 }
1418
1419 #[cfg(feature = "serde-bincode")]
1420 #[test]
1421 fn test_persist_without_storage_returns_error() {
1422 let store = EventStore::new();
1423 store.record("test_event");
1424
1425 let result = store.persist();
1426 assert!(result.is_err());
1427 match result.unwrap_err() {
1428 Error::Storage(_) => (),
1429 _ => panic!("Expected Storage error"),
1430 }
1431 }
1432
1433 #[cfg(feature = "serde-bincode")]
1434 #[test]
1435 fn test_persist_all_without_storage_returns_error() {
1436 let store = EventStore::new();
1437 store.record("test_event");
1438
1439 let result = store.persist_all();
1440 assert!(result.is_err());
1441 match result.unwrap_err() {
1442 Error::Storage(_) => (),
1443 _ => panic!("Expected Storage error"),
1444 }
1445 }
1446
1447 #[cfg(feature = "serde-bincode")]
1448 #[test]
1449 fn test_persist_with_storage() {
1450 use crate::storage::MemoryStorage;
1451 use crate::EventStoreBuilder;
1452
1453 let store = EventStoreBuilder::new()
1454 .with_storage(MemoryStorage::new())
1455 .build()
1456 .unwrap();
1457
1458 store.record("event1");
1459 store.record("event2");
1460 assert!(store.is_dirty());
1461
1462 let result = store.persist();
1463 assert!(result.is_ok());
1464 assert!(!store.is_dirty());
1465 }
1466
1467 #[cfg(feature = "serde-bincode")]
1468 #[test]
1469 fn test_close_persists_and_consumes() {
1470 use crate::storage::MemoryStorage;
1471 use crate::EventStoreBuilder;
1472
1473 let storage = MemoryStorage::new();
1474 let store = EventStoreBuilder::new()
1475 .with_storage(storage)
1476 .build()
1477 .unwrap();
1478
1479 store.record("event1");
1480 store.record("event2");
1481 assert!(store.is_dirty());
1482
1483 let result = store.close();
1485 assert!(result.is_ok());
1486 }
1488
1489 #[cfg(feature = "serde-bincode")]
1490 #[test]
1491 fn test_close_returns_error_on_persist_failure() {
1492 use crate::EventStoreBuilder;
1493
1494 let store = EventStoreBuilder::new().build().unwrap();
1496
1497 store.record("event1");
1498 assert!(store.is_dirty());
1499
1500 let result = store.close();
1502 assert!(result.is_err());
1503 assert!(result
1504 .unwrap_err()
1505 .to_string()
1506 .contains("No storage configured"));
1507 }
1508
1509 #[cfg(feature = "serde-bincode")]
1510 #[test]
1511 fn test_persist_only_dirty_events() {
1512 use crate::storage::MemoryStorage;
1513 use crate::EventStoreBuilder;
1514
1515 let store = EventStoreBuilder::new()
1516 .with_storage(MemoryStorage::new())
1517 .build()
1518 .unwrap();
1519
1520 store.record("event1");
1522 store.persist().unwrap();
1523 assert!(!store.is_dirty());
1524
1525 store.record("event2");
1527 assert!(store.is_dirty());
1528
1529 store.persist().unwrap();
1531 assert!(!store.is_dirty());
1532 }
1533
1534 #[cfg(feature = "serde-bincode")]
1535 #[test]
1536 fn test_persist_all() {
1537 use crate::storage::MemoryStorage;
1538 use crate::EventStoreBuilder;
1539
1540 let store = EventStoreBuilder::new()
1541 .with_storage(MemoryStorage::new())
1542 .build()
1543 .unwrap();
1544
1545 store.record("event1");
1546 store.record("event2");
1547 store.record("event3");
1548
1549 let result = store.persist_all();
1550 assert!(result.is_ok());
1551 assert!(!store.is_dirty());
1552 }
1553
1554 #[cfg(feature = "serde-bincode")]
1555 #[test]
1556 fn test_serialization_roundtrip() {
1557 use crate::storage::MemoryStorage;
1558 use crate::EventStoreBuilder;
1559
1560 let store = EventStoreBuilder::new()
1561 .with_storage(MemoryStorage::new())
1562 .build()
1563 .unwrap();
1564
1565 store.record_count("test_event", 42);
1566 store.persist().unwrap();
1567
1568 let sum = store.query("test_event").last_days(1).sum();
1570 assert_eq!(sum, Some(42));
1571 }
1572
1573 #[cfg(feature = "serde-bincode")]
1574 #[test]
1575 fn test_persist_clears_dirty_events() {
1576 use crate::storage::MemoryStorage;
1577 use crate::EventStoreBuilder;
1578
1579 let store = EventStoreBuilder::new()
1580 .with_storage(MemoryStorage::new())
1581 .build()
1582 .unwrap();
1583
1584 store.record("event1");
1585 store.record("event2");
1586 assert!(store.is_dirty());
1587
1588 store.persist().unwrap();
1589 assert!(!store.is_dirty());
1590 }
1591
1592 #[cfg(feature = "serde-bincode")]
1593 #[test]
1594 fn test_compact_advances_and_saves() {
1595 use crate::storage::MemoryStorage;
1596 use crate::EventStoreBuilder;
1597
1598 let mut store = EventStoreBuilder::new()
1599 .track_days(7)
1600 .with_storage(MemoryStorage::new())
1601 .build()
1602 .unwrap();
1603
1604 store.record("event1");
1606 store.record("event2");
1607
1608 store.persist_all().unwrap();
1610 assert!(!store.is_dirty());
1611
1612 store.compact().unwrap();
1614
1615 assert_eq!(store.query("event1").last_days(7).sum(), Some(1));
1617 assert_eq!(store.query("event2").last_days(7).sum(), Some(1));
1618 }
1619
1620 #[cfg(feature = "serde-bincode")]
1621 #[test]
1622 fn test_compact_while_recording() {
1623 use crate::storage::MemoryStorage;
1624 use crate::{EventCounterConfig, SystemClock};
1625 use std::thread;
1626
1627 let base_store = EventStore::from_parts(
1629 SystemClock::new(),
1630 Some(Box::new(MemoryStorage::new())),
1631 Some(Arc::new(crate::formatter::BincodeFormat)),
1632 EventCounterConfig::default(),
1633 );
1634
1635 let store = Arc::new(Mutex::new(base_store));
1637 let store_writer = Arc::clone(&store);
1638 let store_compactor = Arc::clone(&store);
1639
1640 let writer = thread::spawn(move || {
1642 for _ in 0..100 {
1643 let store = store_writer.lock().unwrap();
1644 store.record("compact_event");
1645 drop(store);
1646 thread::sleep(std::time::Duration::from_micros(10));
1647 }
1648 });
1649
1650 let compactor = thread::spawn(move || {
1652 for _ in 0..10 {
1653 thread::sleep(std::time::Duration::from_micros(50));
1654 let mut store = store_compactor.lock().unwrap();
1655 let _ = store.compact();
1656 }
1657 });
1658
1659 writer.join().unwrap();
1660 compactor.join().unwrap();
1661
1662 let store = store.lock().unwrap();
1664 let sum = store.query("compact_event").last_days(7).sum();
1665 assert_eq!(sum, Some(100));
1666 }
1667
1668 #[cfg(feature = "serde-bincode")]
1669 #[test]
1670 fn test_compact_without_storage_returns_error() {
1671 let mut store = EventStore::new();
1672 store.record("event1");
1673
1674 let result = store.compact();
1675 assert!(result.is_err());
1676 assert!(result
1677 .unwrap_err()
1678 .to_string()
1679 .contains("No storage configured"));
1680 }
1681
1682 #[cfg(feature = "serde-bincode")]
1683 #[test]
1684 fn test_memory_usage_calculation() {
1685 let store = EventStore::new();
1686 store.record("event1");
1687 store.record("event2");
1688
1689 let usage = store.memory_usage();
1690 assert!(usage >= 96);
1694 }
1695
1696 #[test]
1697 fn test_export_all_returns_all_counters() {
1698 let store = EventStore::new();
1699 store.record("event1");
1700 store.record("event2");
1701 store.record("event3");
1702
1703 let exported = store.export_all().unwrap();
1704 assert_eq!(exported.len(), 3);
1705 assert!(exported.contains_key("event1"));
1706 assert!(exported.contains_key("event2"));
1707 assert!(exported.contains_key("event3"));
1708 }
1709
1710 #[test]
1711 fn test_export_dirty_returns_only_dirty_counters() {
1712 let store = EventStore::new();
1713 store.record("event1");
1714 store.record("event2");
1715 store.reset_dirty();
1716
1717 store.record("event3");
1719
1720 let exported = store.export_dirty().unwrap();
1721 assert_eq!(exported.len(), 1);
1722 assert!(exported.contains_key("event3"));
1723 assert!(!exported.contains_key("event1"));
1724 assert!(!exported.contains_key("event2"));
1725 }
1726
1727 #[test]
1728 fn test_export_all_returns_empty_map_for_empty_store() {
1729 let store = EventStore::new();
1730 let exported = store.export_all().unwrap();
1731 assert_eq!(exported.len(), 0);
1732 }
1733
1734 #[test]
1735 fn test_export_dirty_returns_empty_map_when_nothing_dirty() {
1736 let store = EventStore::new();
1737 store.record("event1");
1738 store.reset_dirty();
1739
1740 let exported = store.export_dirty().unwrap();
1741 assert_eq!(exported.len(), 0);
1742 }
1743
1744 #[cfg(feature = "serde-bincode")]
1745 #[test]
1746 fn test_export_all_includes_disk_only_counters() {
1747 use crate::storage::MemoryStorage;
1748 use crate::EventStoreBuilder;
1749
1750 let storage = MemoryStorage::new();
1751 let store = EventStoreBuilder::new()
1752 .with_storage(storage)
1753 .build()
1754 .unwrap();
1755
1756 store.record_count("event1", 10);
1758 store.persist().unwrap();
1759
1760 store.record_count("event2", 20);
1762
1763 let storage2 = {
1765 let storage_arc = store.inner.storage.as_ref().unwrap().clone();
1766 let storage_guard = storage_arc.lock().unwrap();
1767 let mut new_storage = MemoryStorage::new();
1769 for key in storage_guard.list_keys().unwrap() {
1770 let data = storage_guard.load(&key).unwrap().unwrap();
1771 new_storage.save(&key, data).unwrap();
1772 }
1773 new_storage
1774 };
1775
1776 let store2 = EventStoreBuilder::new()
1777 .with_storage(storage2)
1778 .build()
1779 .unwrap();
1780
1781 store2.record_count("event3", 30);
1783
1784 let exported = store2.export_all().unwrap();
1788 assert_eq!(exported.len(), 2);
1789 assert!(exported.contains_key("event1"));
1790 assert!(exported.contains_key("event3"));
1791
1792 assert_eq!(store2.query("event1").last_days(1).sum(), Some(10));
1794 assert_eq!(store2.query("event3").last_days(1).sum(), Some(30));
1795 }
1796
1797 #[test]
1798 fn test_import_event_creates_new_counter() {
1799 let store1 = EventStore::new();
1800 store1.record_count("event1", 42);
1801
1802 let exported = store1.export_all().unwrap();
1803 let counter = exported.get("event1").unwrap().clone();
1804
1805 let store2 = EventStore::new();
1806 store2.import_event("event1", counter).unwrap();
1807
1808 let sum = store2.query("event1").last_days(1).sum();
1809 assert_eq!(sum, Some(42));
1810 }
1811
1812 #[test]
1813 fn test_import_event_overwrites_existing() {
1814 let store = EventStore::new();
1815 store.record_count("event1", 10);
1816
1817 let store2 = EventStore::new();
1818 store2.record_count("event1", 42);
1819
1820 let exported = store2.export_all().unwrap();
1821 let counter = exported.get("event1").unwrap().clone();
1822
1823 store.import_event("event1", counter).unwrap();
1824
1825 let sum = store.query("event1").last_days(1).sum();
1826 assert_eq!(sum, Some(42));
1828 }
1829
1830 #[test]
1831 fn test_import_all_batch_imports() {
1832 let store1 = EventStore::new();
1833 store1.record_count("event1", 10);
1834 store1.record_count("event2", 20);
1835 store1.record_count("event3", 30);
1836
1837 let exported = store1.export_all().unwrap();
1838
1839 let store2 = EventStore::new();
1840 store2.import_all(exported).unwrap();
1841
1842 assert_eq!(store2.query("event1").last_days(1).sum(), Some(10));
1843 assert_eq!(store2.query("event2").last_days(1).sum(), Some(20));
1844 assert_eq!(store2.query("event3").last_days(1).sum(), Some(30));
1845 }
1846
1847 #[test]
1848 fn test_merge_event_combines_counts() {
1849 let store1 = EventStore::new();
1850 store1.record_count("event1", 10);
1851
1852 let store2 = EventStore::new();
1853 store2.record_count("event1", 20);
1854
1855 let exported = store2.export_all().unwrap();
1856 let counter = exported.get("event1").unwrap().clone();
1857
1858 store1.merge_event("event1", counter).unwrap();
1859
1860 let sum = store1.query("event1").last_days(1).sum();
1861 assert_eq!(sum, Some(30));
1863 }
1864
1865 #[test]
1866 fn test_merge_event_creates_counter_if_not_exists() {
1867 let store1 = EventStore::new();
1868
1869 let store2 = EventStore::new();
1870 store2.record_count("event1", 42);
1871
1872 let exported = store2.export_all().unwrap();
1873 let counter = exported.get("event1").unwrap().clone();
1874
1875 store1.merge_event("event1", counter).unwrap();
1876
1877 let sum = store1.query("event1").last_days(1).sum();
1878 assert_eq!(sum, Some(42));
1879 }
1880
1881 #[test]
1882 fn test_merge_all_combines_multiple_events() {
1883 let store1 = EventStore::new();
1884 store1.record_count("event1", 10);
1885 store1.record_count("event2", 20);
1886
1887 let store2 = EventStore::new();
1888 store2.record_count("event1", 5);
1889 store2.record_count("event3", 30);
1890
1891 let exported = store2.export_all().unwrap();
1892 store1.merge_all(exported).unwrap();
1893
1894 assert_eq!(store1.query("event1").last_days(1).sum(), Some(15)); assert_eq!(store1.query("event2").last_days(1).sum(), Some(20)); assert_eq!(store1.query("event3").last_days(1).sum(), Some(30));
1897 }
1899
1900 #[test]
1901 fn test_merge_is_commutative_at_store_level() {
1902 let store_a1 = EventStore::new();
1903 store_a1.record_count("event1", 10);
1904
1905 let store_b1 = EventStore::new();
1906 store_b1.record_count("event1", 20);
1907
1908 let store_a2 = EventStore::new();
1909 store_a2.record_count("event1", 10);
1910
1911 let store_b2 = EventStore::new();
1912 store_b2.record_count("event1", 20);
1913
1914 let b1_export = store_b1.export_all().unwrap();
1916 store_a1.merge_all(b1_export).unwrap();
1917
1918 let a2_export = store_a2.export_all().unwrap();
1920 store_b2.merge_all(a2_export).unwrap();
1921
1922 assert_eq!(store_a1.query("event1").last_days(1).sum(), Some(30));
1924 assert_eq!(store_b2.query("event1").last_days(1).sum(), Some(30));
1925 }
1926
1927 #[test]
1928 fn test_merge_is_associative_at_store_level() {
1929 let store_a1 = EventStore::new();
1930 store_a1.record_count("event1", 10);
1931
1932 let store_b1 = EventStore::new();
1933 store_b1.record_count("event1", 20);
1934
1935 let store_c1 = EventStore::new();
1936 store_c1.record_count("event1", 30);
1937
1938 let store_a2 = EventStore::new();
1939 store_a2.record_count("event1", 10);
1940
1941 let store_b2 = EventStore::new();
1942 store_b2.record_count("event1", 20);
1943
1944 let store_c2 = EventStore::new();
1945 store_c2.record_count("event1", 30);
1946
1947 let b1_export = store_b1.export_all().unwrap();
1949 store_a1.merge_all(b1_export).unwrap();
1950 let c1_export = store_c1.export_all().unwrap();
1951 store_a1.merge_all(c1_export).unwrap();
1952
1953 let c2_export = store_c2.export_all().unwrap();
1955 store_b2.merge_all(c2_export).unwrap();
1956 let b2_export = store_b2.export_all().unwrap();
1957 store_a2.merge_all(b2_export).unwrap();
1958
1959 assert_eq!(store_a1.query("event1").last_days(1).sum(), Some(60));
1961 assert_eq!(store_a2.query("event1").last_days(1).sum(), Some(60));
1962 }
1963
1964 #[cfg(all(feature = "tokio", feature = "serde"))]
1966 #[tokio::test]
1967 async fn test_base_event_store_has_auto_persist_handle_field() {
1968 use crate::storage::MemoryStorage;
1969 use std::time::Duration;
1970
1971 let mut store = EventStore::from_parts(
1973 SystemClock::new(),
1974 Some(Box::new(MemoryStorage::new())),
1975 #[cfg(feature = "serde-bincode")]
1976 Some(Arc::new(crate::formatter::BincodeFormat)),
1977 #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
1978 Some(Arc::new(crate::formatter::JsonFormat)),
1979 EventCounterConfig::default(),
1980 );
1981
1982 assert!(store.auto_persist_handle.is_none());
1984
1985 let handle = tokio::spawn(async {
1987 tokio::time::sleep(Duration::from_millis(10)).await;
1988 });
1989 store.auto_persist_handle = Some(handle);
1990 assert!(store.auto_persist_handle.is_some());
1991 }
1992
1993 #[cfg(all(feature = "tokio", feature = "serde"))]
1995 #[tokio::test]
1996 async fn test_spawn_auto_persist_creates_background_task() {
1997 use crate::storage::MemoryStorage;
1998 use chrono::Duration;
1999
2000 let store = EventStore::from_parts(
2002 SystemClock::new(),
2003 Some(Box::new(MemoryStorage::new())),
2004 #[cfg(feature = "serde-bincode")]
2005 Some(Arc::new(crate::formatter::BincodeFormat)),
2006 #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
2007 Some(Arc::new(crate::formatter::JsonFormat)),
2008 EventCounterConfig::default(),
2009 );
2010
2011 store.record("test");
2013 assert!(store.is_dirty());
2014
2015 let handle =
2017 EventStore::spawn_auto_persist(store.inner.clone(), Duration::milliseconds(50));
2018
2019 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2021
2022 assert!(!store.is_dirty());
2024
2025 handle.abort();
2027 }
2028
2029 #[cfg(all(feature = "tokio", feature = "serde"))]
2031 #[tokio::test]
2032 async fn test_drop_aborts_handle_and_persists() {
2033 use crate::storage::MemoryStorage;
2034 use std::time::Duration;
2035
2036 let mut store = EventStore::from_parts(
2038 SystemClock::new(),
2039 Some(Box::new(MemoryStorage::new())),
2040 #[cfg(feature = "serde-bincode")]
2041 Some(Arc::new(crate::formatter::BincodeFormat)),
2042 #[cfg(all(feature = "serde-json", not(feature = "serde-bincode")))]
2043 Some(Arc::new(crate::formatter::JsonFormat)),
2044 EventCounterConfig::default(),
2045 );
2046
2047 let handle = tokio::spawn(async {
2049 loop {
2050 tokio::time::sleep(Duration::from_secs(1)).await;
2051 }
2052 });
2053 store.auto_persist_handle = Some(handle);
2054
2055 store.record("test");
2057 assert!(store.is_dirty());
2058
2059 drop(store);
2061
2062 }
2065
2066 #[cfg(all(
2068 feature = "tokio",
2069 feature = "serde",
2070 feature = "serde",
2071 feature = "storage-fs"
2072 ))]
2073 #[tokio::test]
2074 async fn test_drop_persists_dirty_data_with_auto_persist() {
2075 use crate::storage::FilePerEvent;
2076 use chrono::Duration;
2077 use tempfile::tempdir;
2078
2079 let temp_dir = tempdir().unwrap();
2080 let storage_path = temp_dir.path().join("events");
2081
2082 let store = EventStore::builder()
2084 .with_storage(FilePerEvent::new(&storage_path, ".dat").unwrap())
2085 .auto_persist(Duration::milliseconds(100))
2086 .build()
2087 .unwrap();
2088
2089 store.record("event1");
2091 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
2092
2093 {
2095 let storage = FilePerEvent::new(&storage_path, ".dat").unwrap();
2096 let data = storage.load("event1").unwrap();
2097 assert!(
2098 data.is_some(),
2099 "First event should be persisted by auto-persist"
2100 );
2101 }
2102
2103 store.record("event2");
2105
2106 drop(store);
2108
2109 let storage = FilePerEvent::new(&storage_path, ".dat").unwrap();
2111 let event1_data = storage.load("event1").unwrap();
2112 let event2_data = storage.load("event2").unwrap();
2113
2114 assert!(
2115 event1_data.is_some(),
2116 "First event should still be in storage"
2117 );
2118 assert!(
2119 event2_data.is_some(),
2120 "Second event MUST be persisted on drop, not lost"
2121 );
2122 }
2123}
2124
2125#[cfg(test)]
2126mod concurrency_tests {
2127 use std::thread;
2128
2129 use super::*;
2130
2131 #[test]
2132 fn test_concurrent_record_from_multiple_threads() {
2133 let store = Arc::new(EventStore::new());
2134 let mut handles = vec![];
2135
2136 for _ in 0..10 {
2138 let store_clone = Arc::clone(&store);
2139 let handle = thread::spawn(move || {
2140 for _ in 0..100 {
2141 store_clone.record("concurrent_event");
2142 }
2143 });
2144 handles.push(handle);
2145 }
2146
2147 for handle in handles {
2149 handle.join().unwrap();
2150 }
2151
2152 let sum = store.query("concurrent_event").last_days(1).sum();
2154 assert_eq!(sum, Some(1000));
2155 }
2156
2157 #[test]
2158 fn test_query_while_recording() {
2159 let store = Arc::new(EventStore::new());
2160 let store_writer = Arc::clone(&store);
2161 let store_reader = Arc::clone(&store);
2162
2163 let writer = thread::spawn(move || {
2165 for i in 0..100 {
2166 store_writer.record_count("test_event", i);
2167 thread::sleep(std::time::Duration::from_micros(10));
2168 }
2169 });
2170
2171 let reader = thread::spawn(move || {
2173 for _ in 0..50 {
2174 let _sum = store_reader.query("test_event").last_days(1).sum();
2175 thread::sleep(std::time::Duration::from_micros(20));
2176 }
2177 });
2178
2179 writer.join().unwrap();
2180 reader.join().unwrap();
2181
2182 let final_sum = store.query("test_event").last_days(1).sum();
2184 assert_eq!(final_sum, Some(4950));
2185 }
2186
2187 #[cfg(feature = "serde-bincode")]
2188 #[test]
2189 fn test_persist_while_recording() {
2190 use crate::storage::MemoryStorage;
2191
2192 let base_store = EventStore::from_parts(
2194 SystemClock::new(),
2195 Some(Box::new(MemoryStorage::new())),
2196 #[cfg(feature = "serde")]
2197 {
2198 #[cfg(feature = "serde-bincode")]
2199 {
2200 Some(Arc::new(crate::formatter::BincodeFormat))
2201 }
2202 #[cfg(not(feature = "serde-bincode"))]
2203 {
2204 None
2205 }
2206 },
2207 EventCounterConfig::default(),
2208 );
2209 let store = Arc::new(base_store);
2210
2211 let store_writer = Arc::clone(&store);
2212 let store_persister = Arc::clone(&store);
2213
2214 let writer = thread::spawn(move || {
2216 for _ in 0..100 {
2217 store_writer.record("persist_event");
2218 thread::sleep(std::time::Duration::from_micros(10));
2219 }
2220 });
2221
2222 let persister = thread::spawn(move || {
2224 for _ in 0..20 {
2225 let _ = store_persister.persist();
2226 thread::sleep(std::time::Duration::from_micros(50));
2227 }
2228 });
2229
2230 writer.join().unwrap();
2231 persister.join().unwrap();
2232
2233 store.persist().unwrap();
2235
2236 let sum = store.query("persist_event").last_days(1).sum();
2238 assert_eq!(sum, Some(100));
2239 assert!(!store.is_dirty());
2240 }
2241
2242 #[test]
2243 fn test_concurrent_query_many_from_multiple_threads() {
2244 let store = Arc::new(EventStore::new());
2245 let mut handles = vec![];
2246
2247 store.record_count("event1", 100);
2249 store.record_count("event2", 200);
2250 store.record_count("event3", 300);
2251
2252 for _ in 0..10 {
2254 let store_clone = Arc::clone(&store);
2255 let handle = thread::spawn(move || {
2256 for _ in 0..50 {
2257 let event_ids = &["event1", "event2", "event3"];
2258 let sum = store_clone.query_many(event_ids).last_days(1).sum();
2259 assert_eq!(sum, Some(600));
2261 }
2262 });
2263 handles.push(handle);
2264 }
2265
2266 for handle in handles {
2268 handle.join().unwrap();
2269 }
2270 }
2271
2272 #[test]
2273 fn test_concurrent_query_ratio_from_multiple_threads() {
2274 let store = Arc::new(EventStore::new());
2275 let mut handles = vec![];
2276
2277 store.record_count("numerator", 100);
2279 store.record_count("denominator", 50);
2280
2281 for _ in 0..10 {
2283 let store_clone = Arc::clone(&store);
2284 let handle = thread::spawn(move || {
2285 for _ in 0..50 {
2286 let ratio = store_clone
2287 .query_ratio("numerator", "denominator")
2288 .last_days(1);
2289 assert_eq!(ratio, Some(2.0));
2291 }
2292 });
2293 handles.push(handle);
2294 }
2295
2296 for handle in handles {
2298 handle.join().unwrap();
2299 }
2300 }
2301
2302 #[test]
2303 fn test_concurrent_query_delta_from_multiple_threads() {
2304 let store = Arc::new(EventStore::new());
2305 let mut handles = vec![];
2306
2307 store.record_count("positive", 150);
2309 store.record_count("negative", 50);
2310
2311 for _ in 0..10 {
2313 let store_clone = Arc::clone(&store);
2314 let handle = thread::spawn(move || {
2315 for _ in 0..50 {
2316 let delta = store_clone
2317 .query_delta("positive", "negative")
2318 .last_days(1)
2319 .sum();
2320 assert_eq!(delta, 100);
2322 }
2323 });
2324 handles.push(handle);
2325 }
2326
2327 for handle in handles {
2329 handle.join().unwrap();
2330 }
2331 }
2332
2333 #[test]
2334 fn test_concurrent_query_many_while_recording() {
2335 let store = Arc::new(EventStore::new());
2336 let store_writer = Arc::clone(&store);
2337 let store_reader = Arc::clone(&store);
2338
2339 let writer = thread::spawn(move || {
2341 for i in 0..100 {
2342 store_writer.record_count("event1", i);
2343 store_writer.record_count("event2", i * 2);
2344 thread::sleep(std::time::Duration::from_micros(10));
2345 }
2346 });
2347
2348 let reader = thread::spawn(move || {
2350 for _ in 0..50 {
2351 let event_ids = &["event1", "event2"];
2352 let _sum = store_reader.query_many(event_ids).last_days(1).sum();
2353 thread::sleep(std::time::Duration::from_micros(20));
2354 }
2355 });
2356
2357 writer.join().unwrap();
2358 reader.join().unwrap();
2359
2360 let event1_sum = store.query("event1").last_days(1).sum().unwrap();
2362 let event2_sum = store.query("event2").last_days(1).sum().unwrap();
2363 let multi_sum = store
2364 .query_many(&["event1", "event2"])
2365 .last_days(1)
2366 .sum()
2367 .unwrap();
2368
2369 assert_eq!(event1_sum, 4950);
2372 assert_eq!(event2_sum, 9900);
2373 assert_eq!(multi_sum, 14850);
2374 }
2375
2376 #[test]
2377 fn test_concurrent_query_ratio_while_recording() {
2378 let store = Arc::new(EventStore::new());
2379 let store_writer = Arc::clone(&store);
2380 let store_reader = Arc::clone(&store);
2381
2382 let writer = thread::spawn(move || {
2384 for i in 1..=100 {
2385 store_writer.record_count("numerator", i * 2);
2386 store_writer.record_count("denominator", i);
2387 thread::sleep(std::time::Duration::from_micros(10));
2388 }
2389 });
2390
2391 let reader = thread::spawn(move || {
2393 for _ in 0..50 {
2394 let _ratio = store_reader
2395 .query_ratio("numerator", "denominator")
2396 .last_days(1);
2397 thread::sleep(std::time::Duration::from_micros(20));
2398 }
2399 });
2400
2401 writer.join().unwrap();
2402 reader.join().unwrap();
2403
2404 let ratio = store.query_ratio("numerator", "denominator").last_days(1);
2406 assert_eq!(ratio, Some(2.0));
2410 }
2411
2412 #[test]
2413 fn test_concurrent_query_delta_while_recording() {
2414 let store = Arc::new(EventStore::new());
2415 let store_writer = Arc::clone(&store);
2416 let store_reader = Arc::clone(&store);
2417
2418 let writer = thread::spawn(move || {
2420 for i in 0..100 {
2421 store_writer.record_count("positive", i * 2);
2422 store_writer.record_count("negative", i);
2423 thread::sleep(std::time::Duration::from_micros(10));
2424 }
2425 });
2426
2427 let reader = thread::spawn(move || {
2429 for _ in 0..50 {
2430 let _delta = store_reader
2431 .query_delta("positive", "negative")
2432 .last_days(1)
2433 .sum();
2434 thread::sleep(std::time::Duration::from_micros(20));
2435 }
2436 });
2437
2438 writer.join().unwrap();
2439 reader.join().unwrap();
2440
2441 let delta = store.query_delta("positive", "negative").last_days(1).sum();
2443 assert_eq!(delta, 4950);
2447 }
2448}