1use crate::{domain::entities::Event, error::Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Snapshot {
12 pub id: Uuid,
14
15 pub entity_id: String,
17
18 pub state: serde_json::Value,
20
21 pub created_at: DateTime<Utc>,
23
24 pub as_of: DateTime<Utc>,
26
27 pub event_count: usize,
29
30 pub metadata: SnapshotMetadata,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct SnapshotMetadata {
36 pub snapshot_type: SnapshotType,
38
39 pub size_bytes: usize,
41
42 pub version: u32,
44}
45
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47#[serde(rename_all = "lowercase")]
48pub enum SnapshotType {
49 Manual,
50 Automatic,
51 OnDemand,
52}
53
54impl Snapshot {
55 pub fn new(
57 entity_id: String,
58 state: serde_json::Value,
59 as_of: DateTime<Utc>,
60 event_count: usize,
61 snapshot_type: SnapshotType,
62 ) -> Self {
63 let state_json = serde_json::to_string(&state).unwrap_or_default();
64 let size_bytes = state_json.len();
65
66 Self {
67 id: Uuid::new_v4(),
68 entity_id,
69 state,
70 created_at: Utc::now(),
71 as_of,
72 event_count,
73 metadata: SnapshotMetadata {
74 snapshot_type,
75 size_bytes,
76 version: 1,
77 },
78 }
79 }
80
81 pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
83 let mut merged = self.state.clone();
84
85 for event in events {
86 if event.timestamp > self.as_of
88 && let serde_json::Value::Object(ref mut state_map) = merged
89 && let serde_json::Value::Object(ref payload_map) = event.payload
90 {
91 for (key, value) in payload_map {
92 state_map.insert(key.clone(), value.clone());
93 }
94 }
95 }
96
97 merged
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct SnapshotConfig {
104 pub event_threshold: usize,
106
107 pub time_threshold_seconds: i64,
109
110 pub max_snapshots_per_entity: usize,
112
113 pub auto_snapshot: bool,
115}
116
117impl Default for SnapshotConfig {
118 fn default() -> Self {
119 Self {
120 event_threshold: 100,
121 time_threshold_seconds: 3600, max_snapshots_per_entity: 10,
123 auto_snapshot: true,
124 }
125 }
126}
127
128pub struct SnapshotManager {
130 snapshots: Arc<DashMap<String, Vec<Snapshot>>>,
132
133 config: SnapshotConfig,
135
136 stats: Arc<RwLock<SnapshotStats>>,
138}
139
140#[derive(Debug, Clone, Default, Serialize)]
141pub struct SnapshotStats {
142 pub total_snapshots: usize,
143 pub total_entities: usize,
144 pub total_size_bytes: usize,
145 pub snapshots_created: u64,
146 pub snapshots_pruned: u64,
147}
148
149impl SnapshotManager {
150 pub fn new(config: SnapshotConfig) -> Self {
152 Self {
153 snapshots: Arc::new(DashMap::new()),
154 config,
155 stats: Arc::new(RwLock::new(SnapshotStats::default())),
156 }
157 }
158
159 #[cfg_attr(feature = "hotpath", hotpath::measure)]
161 pub fn create_snapshot(
162 &self,
163 entity_id: &str,
164 state: serde_json::Value,
165 as_of: DateTime<Utc>,
166 event_count: usize,
167 snapshot_type: SnapshotType,
168 ) -> Result<Snapshot> {
169 let snapshot = Snapshot::new(
170 entity_id.to_string(),
171 state,
172 as_of,
173 event_count,
174 snapshot_type,
175 );
176
177 let mut entity_snapshots = self.snapshots.entry(entity_id.to_string()).or_default();
178
179 entity_snapshots.push(snapshot.clone());
181
182 entity_snapshots.sort_by_key(|x| std::cmp::Reverse(x.as_of));
184
185 let mut pruned = 0;
187 if entity_snapshots.len() > self.config.max_snapshots_per_entity {
188 let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
189 entity_snapshots.truncate(self.config.max_snapshots_per_entity);
190 pruned = to_remove;
191 }
192
193 drop(entity_snapshots);
195
196 let mut stats = self.stats.write();
198 stats.snapshots_created += 1;
199 stats.snapshots_pruned += pruned as u64;
200 stats.total_snapshots = self.snapshots.iter().map(|entry| entry.value().len()).sum();
201 stats.total_entities = self.snapshots.len();
202 stats.total_size_bytes = self
203 .snapshots
204 .iter()
205 .map(|entry| {
206 entry
207 .value()
208 .iter()
209 .map(|s| s.metadata.size_bytes)
210 .sum::<usize>()
211 })
212 .sum();
213
214 tracing::info!(
215 "๐ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
216 match snapshot_type {
217 SnapshotType::Manual => "manual",
218 SnapshotType::Automatic => "automatic",
219 SnapshotType::OnDemand => "on-demand",
220 },
221 entity_id,
222 event_count,
223 snapshot.metadata.size_bytes
224 );
225
226 Ok(snapshot)
227 }
228
229 #[cfg_attr(feature = "hotpath", hotpath::measure)]
231 pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
232 self.snapshots
233 .get(entity_id)
234 .and_then(|entry| entry.value().first().cloned())
235 }
236
237 pub fn get_snapshot_as_of(&self, entity_id: &str, as_of: DateTime<Utc>) -> Option<Snapshot> {
239 self.snapshots.get(entity_id).and_then(|entry| {
240 entry
241 .value()
242 .iter()
243 .filter(|s| s.as_of <= as_of)
244 .max_by_key(|s| s.as_of)
245 .cloned()
246 })
247 }
248
249 pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
251 self.snapshots
252 .get(entity_id)
253 .map(|entry| entry.value().clone())
254 .unwrap_or_default()
255 }
256
257 #[cfg_attr(feature = "hotpath", hotpath::measure)]
259 pub fn should_create_snapshot(
260 &self,
261 entity_id: &str,
262 current_event_count: usize,
263 last_event_time: DateTime<Utc>,
264 ) -> bool {
265 if !self.config.auto_snapshot {
266 return false;
267 }
268
269 match self.snapshots.get(entity_id) {
270 None => {
271 current_event_count >= self.config.event_threshold
273 }
274 Some(entry) => {
275 let snaps = entry.value();
276 if let Some(latest) = snaps.first() {
277 let events_since_snapshot = current_event_count - latest.event_count;
279 if events_since_snapshot >= self.config.event_threshold {
280 return true;
281 }
282
283 let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
285 if time_since_snapshot >= self.config.time_threshold_seconds {
286 return true;
287 }
288 }
289 false
290 }
291 }
292 }
293
294 pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
296 let removed = self.snapshots.remove(entity_id).map_or(0, |(_, v)| v.len());
297
298 let mut stats = self.stats.write();
300 stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
301 stats.total_entities = self.snapshots.len();
302
303 tracing::info!("๐๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
304
305 Ok(removed)
306 }
307
308 pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
310 if let Some(mut entity_snapshots) = self.snapshots.get_mut(entity_id) {
311 let initial_len = entity_snapshots.len();
312 entity_snapshots.retain(|s| s.id != snapshot_id);
313 let removed = initial_len != entity_snapshots.len();
314
315 if removed {
316 let mut stats = self.stats.write();
318 stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
319 tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
320 }
321
322 return Ok(removed);
323 }
324
325 Ok(false)
326 }
327
328 pub fn stats(&self) -> SnapshotStats {
330 (*self.stats.read()).clone()
331 }
332
333 pub fn clear_all(&self) {
335 self.snapshots.clear();
336
337 let mut stats = self.stats.write();
338 *stats = SnapshotStats::default();
339
340 tracing::info!("๐งน Cleared all snapshots");
341 }
342
343 pub fn config(&self) -> &SnapshotConfig {
345 &self.config
346 }
347
348 pub fn list_entities(&self) -> Vec<String> {
350 self.snapshots
351 .iter()
352 .map(|entry| entry.key().clone())
353 .collect()
354 }
355}
356
357#[derive(Debug, Deserialize)]
359pub struct CreateSnapshotRequest {
360 pub entity_id: String,
361}
362
363#[derive(Debug, Serialize)]
365pub struct CreateSnapshotResponse {
366 pub snapshot_id: Uuid,
367 pub entity_id: String,
368 pub created_at: DateTime<Utc>,
369 pub event_count: usize,
370 pub size_bytes: usize,
371}
372
373#[derive(Debug, Deserialize)]
375pub struct ListSnapshotsRequest {
376 pub entity_id: Option<String>,
377}
378
379#[derive(Debug, Serialize)]
381pub struct ListSnapshotsResponse {
382 pub snapshots: Vec<SnapshotInfo>,
383 pub total: usize,
384}
385
386#[derive(Debug, Serialize)]
387pub struct SnapshotInfo {
388 pub id: Uuid,
389 pub entity_id: String,
390 pub created_at: DateTime<Utc>,
391 pub as_of: DateTime<Utc>,
392 pub event_count: usize,
393 pub size_bytes: usize,
394 pub snapshot_type: SnapshotType,
395}
396
397impl From<Snapshot> for SnapshotInfo {
398 fn from(snapshot: Snapshot) -> Self {
399 Self {
400 id: snapshot.id,
401 entity_id: snapshot.entity_id,
402 created_at: snapshot.created_at,
403 as_of: snapshot.as_of,
404 event_count: snapshot.event_count,
405 size_bytes: snapshot.metadata.size_bytes,
406 snapshot_type: snapshot.metadata.snapshot_type,
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use chrono::Duration;
415 use serde_json::json;
416
417 fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
418 Snapshot::new(
419 entity_id.to_string(),
420 json!({"count": event_count}),
421 Utc::now(),
422 event_count,
423 SnapshotType::Automatic,
424 )
425 }
426
427 #[test]
428 fn test_snapshot_creation() {
429 let snapshot = create_test_snapshot("entity-1", 100);
430 assert_eq!(snapshot.entity_id, "entity-1");
431 assert_eq!(snapshot.event_count, 100);
432 assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
433 }
434
435 #[test]
436 fn test_snapshot_manager() {
437 let manager = SnapshotManager::new(SnapshotConfig::default());
438
439 let result = manager.create_snapshot(
440 "entity-1",
441 json!({"value": 42}),
442 Utc::now(),
443 100,
444 SnapshotType::Manual,
445 );
446
447 assert!(result.is_ok());
448
449 let latest = manager.get_latest_snapshot("entity-1");
450 assert!(latest.is_some());
451 assert_eq!(latest.unwrap().event_count, 100);
452 }
453
454 #[test]
455 fn test_snapshot_pruning() {
456 let config = SnapshotConfig {
457 max_snapshots_per_entity: 3,
458 ..Default::default()
459 };
460 let manager = SnapshotManager::new(config);
461
462 for i in 0..5 {
464 manager
465 .create_snapshot(
466 "entity-1",
467 json!({"count": i}),
468 Utc::now(),
469 i,
470 SnapshotType::Automatic,
471 )
472 .unwrap();
473 }
474
475 let snapshots = manager.get_all_snapshots("entity-1");
477 assert_eq!(snapshots.len(), 3);
478 }
479
480 #[test]
481 fn test_should_create_snapshot() {
482 let config = SnapshotConfig {
483 event_threshold: 100,
484 time_threshold_seconds: 3600,
485 auto_snapshot: true,
486 ..Default::default()
487 };
488 let manager = SnapshotManager::new(config);
489
490 assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
492
493 assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
495
496 manager
498 .create_snapshot(
499 "entity-1",
500 json!({"value": 1}),
501 Utc::now(),
502 100,
503 SnapshotType::Automatic,
504 )
505 .unwrap();
506
507 assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
509
510 assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
512 }
513
514 #[test]
515 fn test_merge_with_events() {
516 let snapshot = Snapshot::new(
517 "entity-1".to_string(),
518 json!({"name": "Alice", "score": 10}),
519 Utc::now(),
520 5,
521 SnapshotType::Automatic,
522 );
523
524 let event = Event::reconstruct_from_strings(
525 Uuid::new_v4(),
526 "score.updated".to_string(),
527 "entity-1".to_string(),
528 "default".to_string(),
529 json!({"score": 20}),
530 Utc::now(),
531 None,
532 1,
533 );
534
535 let merged = snapshot.merge_with_events(&[event]);
536 assert_eq!(merged["name"], "Alice");
537 assert_eq!(merged["score"], 20);
538 }
539
540 #[test]
541 fn test_default_config() {
542 let config = SnapshotConfig::default();
543 assert_eq!(config.event_threshold, 100);
544 assert_eq!(config.time_threshold_seconds, 3600);
545 assert_eq!(config.max_snapshots_per_entity, 10);
546 assert!(config.auto_snapshot);
547 }
548
549 #[test]
550 fn test_snapshot_type_serde() {
551 let types = vec![
552 SnapshotType::Manual,
553 SnapshotType::Automatic,
554 SnapshotType::OnDemand,
555 ];
556
557 for snapshot_type in types {
558 let json = serde_json::to_string(&snapshot_type).unwrap();
559 let parsed: SnapshotType = serde_json::from_str(&json).unwrap();
560 assert_eq!(parsed, snapshot_type);
561 }
562 }
563
564 #[test]
565 fn test_snapshot_serde() {
566 let snapshot = create_test_snapshot("entity-1", 50);
567 let json = serde_json::to_string(&snapshot).unwrap();
568 let parsed: Snapshot = serde_json::from_str(&json).unwrap();
569 assert_eq!(parsed.entity_id, snapshot.entity_id);
570 assert_eq!(parsed.event_count, snapshot.event_count);
571 }
572
573 #[test]
574 fn test_get_latest_snapshot_none() {
575 let manager = SnapshotManager::new(SnapshotConfig::default());
576 let latest = manager.get_latest_snapshot("non-existent");
577 assert!(latest.is_none());
578 }
579
580 #[test]
581 fn test_get_all_snapshots_empty() {
582 let manager = SnapshotManager::new(SnapshotConfig::default());
583 let snapshots = manager.get_all_snapshots("non-existent");
584 assert!(snapshots.is_empty());
585 }
586
587 #[test]
588 fn test_delete_snapshots() {
589 let manager = SnapshotManager::new(SnapshotConfig::default());
590
591 manager
592 .create_snapshot(
593 "entity-1",
594 json!({"value": 1}),
595 Utc::now(),
596 100,
597 SnapshotType::Manual,
598 )
599 .unwrap();
600
601 let deleted = manager.delete_snapshots("entity-1").unwrap();
602 assert_eq!(deleted, 1);
603
604 let latest = manager.get_latest_snapshot("entity-1");
605 assert!(latest.is_none());
606 }
607
608 #[test]
609 fn test_delete_single_snapshot() {
610 let manager = SnapshotManager::new(SnapshotConfig::default());
611
612 let snapshot = manager
613 .create_snapshot(
614 "entity-1",
615 json!({"value": 1}),
616 Utc::now(),
617 100,
618 SnapshotType::Manual,
619 )
620 .unwrap();
621
622 let deleted = manager.delete_snapshot("entity-1", snapshot.id).unwrap();
623 assert!(deleted);
624
625 let latest = manager.get_latest_snapshot("entity-1");
626 assert!(latest.is_none());
627 }
628
629 #[test]
630 fn test_delete_nonexistent_snapshot() {
631 let manager = SnapshotManager::new(SnapshotConfig::default());
632 let deleted = manager.delete_snapshot("entity-1", Uuid::new_v4()).unwrap();
633 assert!(!deleted);
634 }
635
636 #[test]
637 fn test_clear_all() {
638 let manager = SnapshotManager::new(SnapshotConfig::default());
639
640 manager
641 .create_snapshot(
642 "entity-1",
643 json!({"value": 1}),
644 Utc::now(),
645 100,
646 SnapshotType::Manual,
647 )
648 .unwrap();
649 manager
650 .create_snapshot(
651 "entity-2",
652 json!({"value": 2}),
653 Utc::now(),
654 200,
655 SnapshotType::Manual,
656 )
657 .unwrap();
658
659 manager.clear_all();
660
661 let stats = manager.stats();
662 assert_eq!(stats.total_snapshots, 0);
663 assert_eq!(stats.total_entities, 0);
664 }
665
666 #[test]
667 fn test_list_entities() {
668 let manager = SnapshotManager::new(SnapshotConfig::default());
669
670 manager
671 .create_snapshot(
672 "entity-1",
673 json!({"value": 1}),
674 Utc::now(),
675 100,
676 SnapshotType::Manual,
677 )
678 .unwrap();
679 manager
680 .create_snapshot(
681 "entity-2",
682 json!({"value": 2}),
683 Utc::now(),
684 200,
685 SnapshotType::Manual,
686 )
687 .unwrap();
688
689 let entities = manager.list_entities();
690 assert_eq!(entities.len(), 2);
691 assert!(entities.contains(&"entity-1".to_string()));
692 assert!(entities.contains(&"entity-2".to_string()));
693 }
694
695 #[test]
696 fn test_get_config() {
697 let config = SnapshotConfig {
698 event_threshold: 50,
699 ..Default::default()
700 };
701 let manager = SnapshotManager::new(config);
702 assert_eq!(manager.config().event_threshold, 50);
703 }
704
705 #[test]
706 fn test_stats() {
707 let manager = SnapshotManager::new(SnapshotConfig::default());
708
709 manager
710 .create_snapshot(
711 "entity-1",
712 json!({"value": 1}),
713 Utc::now(),
714 100,
715 SnapshotType::Manual,
716 )
717 .unwrap();
718
719 let stats = manager.stats();
720 assert_eq!(stats.total_snapshots, 1);
721 assert_eq!(stats.total_entities, 1);
722 assert_eq!(stats.snapshots_created, 1);
723 }
724
725 #[test]
726 fn test_snapshot_as_of() {
727 let manager = SnapshotManager::new(SnapshotConfig::default());
728 let now = Utc::now();
729 let past = now - Duration::hours(2);
730 let future = now + Duration::hours(2);
731
732 manager
734 .create_snapshot(
735 "entity-1",
736 json!({"value": 1}),
737 past,
738 100,
739 SnapshotType::Manual,
740 )
741 .unwrap();
742
743 let snapshot = manager.get_snapshot_as_of("entity-1", now);
745 assert!(snapshot.is_some());
746
747 let very_past = past - Duration::hours(1);
749 let snapshot = manager.get_snapshot_as_of("entity-1", very_past);
750 assert!(snapshot.is_none());
751 }
752
753 #[test]
754 fn test_should_create_snapshot_time_threshold() {
755 let config = SnapshotConfig {
756 event_threshold: 1000, time_threshold_seconds: 1, auto_snapshot: true,
759 ..Default::default()
760 };
761 let manager = SnapshotManager::new(config);
762
763 let past = Utc::now() - Duration::seconds(2);
765 manager
766 .create_snapshot(
767 "entity-1",
768 json!({"value": 1}),
769 past,
770 100,
771 SnapshotType::Manual,
772 )
773 .unwrap();
774
775 assert!(manager.should_create_snapshot("entity-1", 101, Utc::now()));
777 }
778
779 #[test]
780 fn test_should_create_snapshot_disabled() {
781 let config = SnapshotConfig {
782 auto_snapshot: false,
783 ..Default::default()
784 };
785 let manager = SnapshotManager::new(config);
786
787 assert!(!manager.should_create_snapshot("entity-1", 1000, Utc::now()));
789 }
790
791 #[test]
792 fn test_snapshot_info_from() {
793 let snapshot = create_test_snapshot("entity-1", 50);
794 let info: SnapshotInfo = snapshot.clone().into();
795
796 assert_eq!(info.id, snapshot.id);
797 assert_eq!(info.entity_id, snapshot.entity_id);
798 assert_eq!(info.event_count, snapshot.event_count);
799 assert_eq!(info.snapshot_type, snapshot.metadata.snapshot_type);
800 }
801
802 #[test]
803 fn test_snapshot_metadata() {
804 let snapshot = create_test_snapshot("entity-1", 100);
805 assert_eq!(snapshot.metadata.version, 1);
806 assert!(snapshot.metadata.size_bytes > 0);
807 }
808
809 #[test]
810 fn test_multiple_entities() {
811 let manager = SnapshotManager::new(SnapshotConfig::default());
812
813 for i in 0..5 {
814 manager
815 .create_snapshot(
816 &format!("entity-{i}"),
817 json!({"value": i}),
818 Utc::now(),
819 100 + i,
820 SnapshotType::Automatic,
821 )
822 .unwrap();
823 }
824
825 let stats = manager.stats();
826 assert_eq!(stats.total_entities, 5);
827 assert_eq!(stats.total_snapshots, 5);
828 }
829
830 #[test]
831 fn test_snapshot_stats_default() {
832 let stats = SnapshotStats::default();
833 assert_eq!(stats.total_snapshots, 0);
834 assert_eq!(stats.total_entities, 0);
835 assert_eq!(stats.total_size_bytes, 0);
836 assert_eq!(stats.snapshots_created, 0);
837 assert_eq!(stats.snapshots_pruned, 0);
838 }
839}