1use crate::domain::entities::Event;
2use crate::error::Result;
3use chrono::{DateTime, Utc};
4use dashmap::DashMap;
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Snapshot {
13 pub id: Uuid,
15
16 pub entity_id: String,
18
19 pub state: serde_json::Value,
21
22 pub created_at: DateTime<Utc>,
24
25 pub as_of: DateTime<Utc>,
27
28 pub event_count: usize,
30
31 pub metadata: SnapshotMetadata,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct SnapshotMetadata {
37 pub snapshot_type: SnapshotType,
39
40 pub size_bytes: usize,
42
43 pub version: u32,
45}
46
47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum SnapshotType {
50 Manual,
51 Automatic,
52 OnDemand,
53}
54
55impl Snapshot {
56 pub fn new(
58 entity_id: String,
59 state: serde_json::Value,
60 as_of: DateTime<Utc>,
61 event_count: usize,
62 snapshot_type: SnapshotType,
63 ) -> Self {
64 let state_json = serde_json::to_string(&state).unwrap_or_default();
65 let size_bytes = state_json.len();
66
67 Self {
68 id: Uuid::new_v4(),
69 entity_id,
70 state,
71 created_at: Utc::now(),
72 as_of,
73 event_count,
74 metadata: SnapshotMetadata {
75 snapshot_type,
76 size_bytes,
77 version: 1,
78 },
79 }
80 }
81
82 pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
84 let mut merged = self.state.clone();
85
86 for event in events {
87 if event.timestamp > self.as_of {
89 if let serde_json::Value::Object(ref mut state_map) = merged {
90 if let serde_json::Value::Object(ref payload_map) = event.payload {
91 for (key, value) in payload_map {
92 state_map.insert(key.clone(), value.clone());
93 }
94 }
95 }
96 }
97 }
98
99 merged
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct SnapshotConfig {
106 pub event_threshold: usize,
108
109 pub time_threshold_seconds: i64,
111
112 pub max_snapshots_per_entity: usize,
114
115 pub auto_snapshot: bool,
117}
118
119impl Default for SnapshotConfig {
120 fn default() -> Self {
121 Self {
122 event_threshold: 100,
123 time_threshold_seconds: 3600, max_snapshots_per_entity: 10,
125 auto_snapshot: true,
126 }
127 }
128}
129
130pub struct SnapshotManager {
132 snapshots: Arc<DashMap<String, Vec<Snapshot>>>,
134
135 config: SnapshotConfig,
137
138 stats: Arc<RwLock<SnapshotStats>>,
140}
141
142#[derive(Debug, Clone, Default, Serialize)]
143pub struct SnapshotStats {
144 pub total_snapshots: usize,
145 pub total_entities: usize,
146 pub total_size_bytes: usize,
147 pub snapshots_created: u64,
148 pub snapshots_pruned: u64,
149}
150
151impl SnapshotManager {
152 pub fn new(config: SnapshotConfig) -> Self {
154 Self {
155 snapshots: Arc::new(DashMap::new()),
156 config,
157 stats: Arc::new(RwLock::new(SnapshotStats::default())),
158 }
159 }
160
161 pub fn create_snapshot(
163 &self,
164 entity_id: String,
165 state: serde_json::Value,
166 as_of: DateTime<Utc>,
167 event_count: usize,
168 snapshot_type: SnapshotType,
169 ) -> Result<Snapshot> {
170 let snapshot = Snapshot::new(entity_id.clone(), state, as_of, event_count, snapshot_type);
171
172 let mut entity_snapshots = self.snapshots.entry(entity_id.clone()).or_default();
173
174 entity_snapshots.push(snapshot.clone());
176
177 entity_snapshots.sort_by_key(|x| std::cmp::Reverse(x.as_of));
179
180 let mut pruned = 0;
182 if entity_snapshots.len() > self.config.max_snapshots_per_entity {
183 let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
184 entity_snapshots.truncate(self.config.max_snapshots_per_entity);
185 pruned = to_remove;
186 }
187
188 drop(entity_snapshots);
190
191 let mut stats = self.stats.write();
193 stats.snapshots_created += 1;
194 stats.snapshots_pruned += pruned as u64;
195 stats.total_snapshots = self.snapshots.iter().map(|entry| entry.value().len()).sum();
196 stats.total_entities = self.snapshots.len();
197 stats.total_size_bytes = self.snapshots
198 .iter()
199 .map(|entry| entry.value().iter().map(|s| s.metadata.size_bytes).sum::<usize>())
200 .sum();
201
202 tracing::info!(
203 "๐ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
204 match snapshot_type {
205 SnapshotType::Manual => "manual",
206 SnapshotType::Automatic => "automatic",
207 SnapshotType::OnDemand => "on-demand",
208 },
209 entity_id,
210 event_count,
211 snapshot.metadata.size_bytes
212 );
213
214 Ok(snapshot)
215 }
216
217 pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
219 self.snapshots
220 .get(entity_id)
221 .and_then(|entry| entry.value().first().cloned())
222 }
223
224 pub fn get_snapshot_as_of(&self, entity_id: &str, as_of: DateTime<Utc>) -> Option<Snapshot> {
226 self.snapshots.get(entity_id).and_then(|entry| {
227 entry.value()
228 .iter()
229 .filter(|s| s.as_of <= as_of)
230 .max_by_key(|s| s.as_of)
231 .cloned()
232 })
233 }
234
235 pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
237 self.snapshots.get(entity_id).map(|entry| entry.value().clone()).unwrap_or_default()
238 }
239
240 pub fn should_create_snapshot(
242 &self,
243 entity_id: &str,
244 current_event_count: usize,
245 last_event_time: DateTime<Utc>,
246 ) -> bool {
247 if !self.config.auto_snapshot {
248 return false;
249 }
250
251 match self.snapshots.get(entity_id) {
252 None => {
253 current_event_count >= self.config.event_threshold
255 }
256 Some(entry) => {
257 let snaps = entry.value();
258 if let Some(latest) = snaps.first() {
259 let events_since_snapshot = current_event_count - latest.event_count;
261 if events_since_snapshot >= self.config.event_threshold {
262 return true;
263 }
264
265 let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
267 if time_since_snapshot >= self.config.time_threshold_seconds {
268 return true;
269 }
270 }
271 false
272 }
273 }
274 }
275
276 pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
278 let removed = self.snapshots.remove(entity_id).map(|(_, v)| v.len()).unwrap_or(0);
279
280 let mut stats = self.stats.write();
282 stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
283 stats.total_entities = self.snapshots.len();
284
285 tracing::info!("๐๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
286
287 Ok(removed)
288 }
289
290 pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
292 if let Some(mut entity_snapshots) = self.snapshots.get_mut(entity_id) {
293 let initial_len = entity_snapshots.len();
294 entity_snapshots.retain(|s| s.id != snapshot_id);
295 let removed = initial_len != entity_snapshots.len();
296
297 if removed {
298 let mut stats = self.stats.write();
300 stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
301 tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
302 }
303
304 return Ok(removed);
305 }
306
307 Ok(false)
308 }
309
310 pub fn stats(&self) -> SnapshotStats {
312 (*self.stats.read()).clone()
313 }
314
315 pub fn clear_all(&self) {
317 self.snapshots.clear();
318
319 let mut stats = self.stats.write();
320 *stats = SnapshotStats::default();
321
322 tracing::info!("๐งน Cleared all snapshots");
323 }
324
325 pub fn config(&self) -> &SnapshotConfig {
327 &self.config
328 }
329
330 pub fn list_entities(&self) -> Vec<String> {
332 self.snapshots.iter().map(|entry| entry.key().clone()).collect()
333 }
334}
335
336#[derive(Debug, Deserialize)]
338pub struct CreateSnapshotRequest {
339 pub entity_id: String,
340}
341
342#[derive(Debug, Serialize)]
344pub struct CreateSnapshotResponse {
345 pub snapshot_id: Uuid,
346 pub entity_id: String,
347 pub created_at: DateTime<Utc>,
348 pub event_count: usize,
349 pub size_bytes: usize,
350}
351
352#[derive(Debug, Deserialize)]
354pub struct ListSnapshotsRequest {
355 pub entity_id: Option<String>,
356}
357
358#[derive(Debug, Serialize)]
360pub struct ListSnapshotsResponse {
361 pub snapshots: Vec<SnapshotInfo>,
362 pub total: usize,
363}
364
365#[derive(Debug, Serialize)]
366pub struct SnapshotInfo {
367 pub id: Uuid,
368 pub entity_id: String,
369 pub created_at: DateTime<Utc>,
370 pub as_of: DateTime<Utc>,
371 pub event_count: usize,
372 pub size_bytes: usize,
373 pub snapshot_type: SnapshotType,
374}
375
376impl From<Snapshot> for SnapshotInfo {
377 fn from(snapshot: Snapshot) -> Self {
378 Self {
379 id: snapshot.id,
380 entity_id: snapshot.entity_id,
381 created_at: snapshot.created_at,
382 as_of: snapshot.as_of,
383 event_count: snapshot.event_count,
384 size_bytes: snapshot.metadata.size_bytes,
385 snapshot_type: snapshot.metadata.snapshot_type,
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use chrono::Duration;
394 use serde_json::json;
395
396 fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
397 Snapshot::new(
398 entity_id.to_string(),
399 json!({"count": event_count}),
400 Utc::now(),
401 event_count,
402 SnapshotType::Automatic,
403 )
404 }
405
406 #[test]
407 fn test_snapshot_creation() {
408 let snapshot = create_test_snapshot("entity-1", 100);
409 assert_eq!(snapshot.entity_id, "entity-1");
410 assert_eq!(snapshot.event_count, 100);
411 assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
412 }
413
414 #[test]
415 fn test_snapshot_manager() {
416 let manager = SnapshotManager::new(SnapshotConfig::default());
417
418 let result = manager.create_snapshot(
419 "entity-1".to_string(),
420 json!({"value": 42}),
421 Utc::now(),
422 100,
423 SnapshotType::Manual,
424 );
425
426 assert!(result.is_ok());
427
428 let latest = manager.get_latest_snapshot("entity-1");
429 assert!(latest.is_some());
430 assert_eq!(latest.unwrap().event_count, 100);
431 }
432
433 #[test]
434 fn test_snapshot_pruning() {
435 let config = SnapshotConfig {
436 max_snapshots_per_entity: 3,
437 ..Default::default()
438 };
439 let manager = SnapshotManager::new(config);
440
441 for i in 0..5 {
443 manager
444 .create_snapshot(
445 "entity-1".to_string(),
446 json!({"count": i}),
447 Utc::now(),
448 i,
449 SnapshotType::Automatic,
450 )
451 .unwrap();
452 }
453
454 let snapshots = manager.get_all_snapshots("entity-1");
456 assert_eq!(snapshots.len(), 3);
457 }
458
459 #[test]
460 fn test_should_create_snapshot() {
461 let config = SnapshotConfig {
462 event_threshold: 100,
463 time_threshold_seconds: 3600,
464 auto_snapshot: true,
465 ..Default::default()
466 };
467 let manager = SnapshotManager::new(config);
468
469 assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
471
472 assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
474
475 manager
477 .create_snapshot(
478 "entity-1".to_string(),
479 json!({"value": 1}),
480 Utc::now(),
481 100,
482 SnapshotType::Automatic,
483 )
484 .unwrap();
485
486 assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
488
489 assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
491 }
492
493 #[test]
494 fn test_merge_with_events() {
495 let snapshot = Snapshot::new(
496 "entity-1".to_string(),
497 json!({"name": "Alice", "score": 10}),
498 Utc::now(),
499 5,
500 SnapshotType::Automatic,
501 );
502
503 let event = Event::reconstruct_from_strings(
504 Uuid::new_v4(),
505 "score.updated".to_string(),
506 "entity-1".to_string(),
507 "default".to_string(),
508 json!({"score": 20}),
509 Utc::now(),
510 None,
511 1,
512 );
513
514 let merged = snapshot.merge_with_events(&[event]);
515 assert_eq!(merged["name"], "Alice");
516 assert_eq!(merged["score"], 20);
517 }
518
519 #[test]
520 fn test_default_config() {
521 let config = SnapshotConfig::default();
522 assert_eq!(config.event_threshold, 100);
523 assert_eq!(config.time_threshold_seconds, 3600);
524 assert_eq!(config.max_snapshots_per_entity, 10);
525 assert!(config.auto_snapshot);
526 }
527
528 #[test]
529 fn test_snapshot_type_serde() {
530 let types = vec![
531 SnapshotType::Manual,
532 SnapshotType::Automatic,
533 SnapshotType::OnDemand,
534 ];
535
536 for snapshot_type in types {
537 let json = serde_json::to_string(&snapshot_type).unwrap();
538 let parsed: SnapshotType = serde_json::from_str(&json).unwrap();
539 assert_eq!(parsed, snapshot_type);
540 }
541 }
542
543 #[test]
544 fn test_snapshot_serde() {
545 let snapshot = create_test_snapshot("entity-1", 50);
546 let json = serde_json::to_string(&snapshot).unwrap();
547 let parsed: Snapshot = serde_json::from_str(&json).unwrap();
548 assert_eq!(parsed.entity_id, snapshot.entity_id);
549 assert_eq!(parsed.event_count, snapshot.event_count);
550 }
551
552 #[test]
553 fn test_get_latest_snapshot_none() {
554 let manager = SnapshotManager::new(SnapshotConfig::default());
555 let latest = manager.get_latest_snapshot("non-existent");
556 assert!(latest.is_none());
557 }
558
559 #[test]
560 fn test_get_all_snapshots_empty() {
561 let manager = SnapshotManager::new(SnapshotConfig::default());
562 let snapshots = manager.get_all_snapshots("non-existent");
563 assert!(snapshots.is_empty());
564 }
565
566 #[test]
567 fn test_delete_snapshots() {
568 let manager = SnapshotManager::new(SnapshotConfig::default());
569
570 manager
571 .create_snapshot(
572 "entity-1".to_string(),
573 json!({"value": 1}),
574 Utc::now(),
575 100,
576 SnapshotType::Manual,
577 )
578 .unwrap();
579
580 let deleted = manager.delete_snapshots("entity-1").unwrap();
581 assert_eq!(deleted, 1);
582
583 let latest = manager.get_latest_snapshot("entity-1");
584 assert!(latest.is_none());
585 }
586
587 #[test]
588 fn test_delete_single_snapshot() {
589 let manager = SnapshotManager::new(SnapshotConfig::default());
590
591 let snapshot = manager
592 .create_snapshot(
593 "entity-1".to_string(),
594 json!({"value": 1}),
595 Utc::now(),
596 100,
597 SnapshotType::Manual,
598 )
599 .unwrap();
600
601 let deleted = manager.delete_snapshot("entity-1", snapshot.id).unwrap();
602 assert!(deleted);
603
604 let latest = manager.get_latest_snapshot("entity-1");
605 assert!(latest.is_none());
606 }
607
608 #[test]
609 fn test_delete_nonexistent_snapshot() {
610 let manager = SnapshotManager::new(SnapshotConfig::default());
611 let deleted = manager.delete_snapshot("entity-1", Uuid::new_v4()).unwrap();
612 assert!(!deleted);
613 }
614
615 #[test]
616 fn test_clear_all() {
617 let manager = SnapshotManager::new(SnapshotConfig::default());
618
619 manager
620 .create_snapshot(
621 "entity-1".to_string(),
622 json!({"value": 1}),
623 Utc::now(),
624 100,
625 SnapshotType::Manual,
626 )
627 .unwrap();
628 manager
629 .create_snapshot(
630 "entity-2".to_string(),
631 json!({"value": 2}),
632 Utc::now(),
633 200,
634 SnapshotType::Manual,
635 )
636 .unwrap();
637
638 manager.clear_all();
639
640 let stats = manager.stats();
641 assert_eq!(stats.total_snapshots, 0);
642 assert_eq!(stats.total_entities, 0);
643 }
644
645 #[test]
646 fn test_list_entities() {
647 let manager = SnapshotManager::new(SnapshotConfig::default());
648
649 manager
650 .create_snapshot(
651 "entity-1".to_string(),
652 json!({"value": 1}),
653 Utc::now(),
654 100,
655 SnapshotType::Manual,
656 )
657 .unwrap();
658 manager
659 .create_snapshot(
660 "entity-2".to_string(),
661 json!({"value": 2}),
662 Utc::now(),
663 200,
664 SnapshotType::Manual,
665 )
666 .unwrap();
667
668 let entities = manager.list_entities();
669 assert_eq!(entities.len(), 2);
670 assert!(entities.contains(&"entity-1".to_string()));
671 assert!(entities.contains(&"entity-2".to_string()));
672 }
673
674 #[test]
675 fn test_get_config() {
676 let config = SnapshotConfig {
677 event_threshold: 50,
678 ..Default::default()
679 };
680 let manager = SnapshotManager::new(config);
681 assert_eq!(manager.config().event_threshold, 50);
682 }
683
684 #[test]
685 fn test_stats() {
686 let manager = SnapshotManager::new(SnapshotConfig::default());
687
688 manager
689 .create_snapshot(
690 "entity-1".to_string(),
691 json!({"value": 1}),
692 Utc::now(),
693 100,
694 SnapshotType::Manual,
695 )
696 .unwrap();
697
698 let stats = manager.stats();
699 assert_eq!(stats.total_snapshots, 1);
700 assert_eq!(stats.total_entities, 1);
701 assert_eq!(stats.snapshots_created, 1);
702 }
703
704 #[test]
705 fn test_snapshot_as_of() {
706 let manager = SnapshotManager::new(SnapshotConfig::default());
707 let now = Utc::now();
708 let past = now - Duration::hours(2);
709 let future = now + Duration::hours(2);
710
711 manager
713 .create_snapshot(
714 "entity-1".to_string(),
715 json!({"value": 1}),
716 past,
717 100,
718 SnapshotType::Manual,
719 )
720 .unwrap();
721
722 let snapshot = manager.get_snapshot_as_of("entity-1", now);
724 assert!(snapshot.is_some());
725
726 let very_past = past - Duration::hours(1);
728 let snapshot = manager.get_snapshot_as_of("entity-1", very_past);
729 assert!(snapshot.is_none());
730 }
731
732 #[test]
733 fn test_should_create_snapshot_time_threshold() {
734 let config = SnapshotConfig {
735 event_threshold: 1000, time_threshold_seconds: 1, auto_snapshot: true,
738 ..Default::default()
739 };
740 let manager = SnapshotManager::new(config);
741
742 let past = Utc::now() - Duration::seconds(2);
744 manager
745 .create_snapshot(
746 "entity-1".to_string(),
747 json!({"value": 1}),
748 past,
749 100,
750 SnapshotType::Manual,
751 )
752 .unwrap();
753
754 assert!(manager.should_create_snapshot("entity-1", 101, Utc::now()));
756 }
757
758 #[test]
759 fn test_should_create_snapshot_disabled() {
760 let config = SnapshotConfig {
761 auto_snapshot: false,
762 ..Default::default()
763 };
764 let manager = SnapshotManager::new(config);
765
766 assert!(!manager.should_create_snapshot("entity-1", 1000, Utc::now()));
768 }
769
770 #[test]
771 fn test_snapshot_info_from() {
772 let snapshot = create_test_snapshot("entity-1", 50);
773 let info: SnapshotInfo = snapshot.clone().into();
774
775 assert_eq!(info.id, snapshot.id);
776 assert_eq!(info.entity_id, snapshot.entity_id);
777 assert_eq!(info.event_count, snapshot.event_count);
778 assert_eq!(info.snapshot_type, snapshot.metadata.snapshot_type);
779 }
780
781 #[test]
782 fn test_snapshot_metadata() {
783 let snapshot = create_test_snapshot("entity-1", 100);
784 assert_eq!(snapshot.metadata.version, 1);
785 assert!(snapshot.metadata.size_bytes > 0);
786 }
787
788 #[test]
789 fn test_multiple_entities() {
790 let manager = SnapshotManager::new(SnapshotConfig::default());
791
792 for i in 0..5 {
793 manager
794 .create_snapshot(
795 format!("entity-{}", i),
796 json!({"value": i}),
797 Utc::now(),
798 100 + i,
799 SnapshotType::Automatic,
800 )
801 .unwrap();
802 }
803
804 let stats = manager.stats();
805 assert_eq!(stats.total_entities, 5);
806 assert_eq!(stats.total_snapshots, 5);
807 }
808
809 #[test]
810 fn test_snapshot_stats_default() {
811 let stats = SnapshotStats::default();
812 assert_eq!(stats.total_snapshots, 0);
813 assert_eq!(stats.total_entities, 0);
814 assert_eq!(stats.total_size_bytes, 0);
815 assert_eq!(stats.snapshots_created, 0);
816 assert_eq!(stats.snapshots_pruned, 0);
817 }
818}