1use crate::domain::entities::Event;
2use crate::error::Result;
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Snapshot {
13 pub id: Uuid,
15
16 pub entity_id: String,
18
19 pub state: serde_json::Value,
21
22 pub created_at: DateTime<Utc>,
24
25 pub as_of: DateTime<Utc>,
27
28 pub event_count: usize,
30
31 pub metadata: SnapshotMetadata,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct SnapshotMetadata {
37 pub snapshot_type: SnapshotType,
39
40 pub size_bytes: usize,
42
43 pub version: u32,
45}
46
47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum SnapshotType {
50 Manual,
51 Automatic,
52 OnDemand,
53}
54
55impl Snapshot {
56 pub fn new(
58 entity_id: String,
59 state: serde_json::Value,
60 as_of: DateTime<Utc>,
61 event_count: usize,
62 snapshot_type: SnapshotType,
63 ) -> Self {
64 let state_json = serde_json::to_string(&state).unwrap_or_default();
65 let size_bytes = state_json.len();
66
67 Self {
68 id: Uuid::new_v4(),
69 entity_id,
70 state,
71 created_at: Utc::now(),
72 as_of,
73 event_count,
74 metadata: SnapshotMetadata {
75 snapshot_type,
76 size_bytes,
77 version: 1,
78 },
79 }
80 }
81
82 pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
84 let mut merged = self.state.clone();
85
86 for event in events {
87 if event.timestamp > self.as_of {
89 if let serde_json::Value::Object(ref mut state_map) = merged {
90 if let serde_json::Value::Object(ref payload_map) = event.payload {
91 for (key, value) in payload_map {
92 state_map.insert(key.clone(), value.clone());
93 }
94 }
95 }
96 }
97 }
98
99 merged
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct SnapshotConfig {
106 pub event_threshold: usize,
108
109 pub time_threshold_seconds: i64,
111
112 pub max_snapshots_per_entity: usize,
114
115 pub auto_snapshot: bool,
117}
118
119impl Default for SnapshotConfig {
120 fn default() -> Self {
121 Self {
122 event_threshold: 100,
123 time_threshold_seconds: 3600, max_snapshots_per_entity: 10,
125 auto_snapshot: true,
126 }
127 }
128}
129
130pub struct SnapshotManager {
132 snapshots: Arc<RwLock<HashMap<String, Vec<Snapshot>>>>,
134
135 config: SnapshotConfig,
137
138 stats: Arc<RwLock<SnapshotStats>>,
140}
141
142#[derive(Debug, Clone, Default, Serialize)]
143pub struct SnapshotStats {
144 pub total_snapshots: usize,
145 pub total_entities: usize,
146 pub total_size_bytes: usize,
147 pub snapshots_created: u64,
148 pub snapshots_pruned: u64,
149}
150
151impl SnapshotManager {
152 pub fn new(config: SnapshotConfig) -> Self {
154 Self {
155 snapshots: Arc::new(RwLock::new(HashMap::new())),
156 config,
157 stats: Arc::new(RwLock::new(SnapshotStats::default())),
158 }
159 }
160
161 pub fn create_snapshot(
163 &self,
164 entity_id: String,
165 state: serde_json::Value,
166 as_of: DateTime<Utc>,
167 event_count: usize,
168 snapshot_type: SnapshotType,
169 ) -> Result<Snapshot> {
170 let snapshot = Snapshot::new(entity_id.clone(), state, as_of, event_count, snapshot_type);
171
172 let mut snapshots = self.snapshots.write();
173 let entity_snapshots = snapshots.entry(entity_id.clone()).or_default();
174
175 entity_snapshots.push(snapshot.clone());
177
178 entity_snapshots.sort_by_key(|x| std::cmp::Reverse(x.as_of));
180
181 let mut pruned = 0;
183 if entity_snapshots.len() > self.config.max_snapshots_per_entity {
184 let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
185 entity_snapshots.truncate(self.config.max_snapshots_per_entity);
186 pruned = to_remove;
187 }
188
189 let mut stats = self.stats.write();
191 stats.snapshots_created += 1;
192 stats.snapshots_pruned += pruned as u64;
193 stats.total_snapshots = snapshots.values().map(|v| v.len()).sum();
194 stats.total_entities = snapshots.len();
195 stats.total_size_bytes = snapshots
196 .values()
197 .flatten()
198 .map(|s| s.metadata.size_bytes)
199 .sum();
200
201 tracing::info!(
202 "๐ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
203 match snapshot_type {
204 SnapshotType::Manual => "manual",
205 SnapshotType::Automatic => "automatic",
206 SnapshotType::OnDemand => "on-demand",
207 },
208 entity_id,
209 event_count,
210 snapshot.metadata.size_bytes
211 );
212
213 Ok(snapshot)
214 }
215
216 pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
218 let snapshots = self.snapshots.read();
219 snapshots
220 .get(entity_id)
221 .and_then(|entity_snapshots| entity_snapshots.first().cloned())
222 }
223
224 pub fn get_snapshot_as_of(&self, entity_id: &str, as_of: DateTime<Utc>) -> Option<Snapshot> {
226 let snapshots = self.snapshots.read();
227 snapshots.get(entity_id).and_then(|entity_snapshots| {
228 entity_snapshots
229 .iter()
230 .filter(|s| s.as_of <= as_of)
231 .max_by_key(|s| s.as_of)
232 .cloned()
233 })
234 }
235
236 pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
238 let snapshots = self.snapshots.read();
239 snapshots.get(entity_id).cloned().unwrap_or_default()
240 }
241
242 pub fn should_create_snapshot(
244 &self,
245 entity_id: &str,
246 current_event_count: usize,
247 last_event_time: DateTime<Utc>,
248 ) -> bool {
249 if !self.config.auto_snapshot {
250 return false;
251 }
252
253 let snapshots = self.snapshots.read();
254 let entity_snapshots = snapshots.get(entity_id);
255
256 match entity_snapshots {
257 None => {
258 current_event_count >= self.config.event_threshold
260 }
261 Some(snaps) => {
262 if let Some(latest) = snaps.first() {
263 let events_since_snapshot = current_event_count - latest.event_count;
265 if events_since_snapshot >= self.config.event_threshold {
266 return true;
267 }
268
269 let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
271 if time_since_snapshot >= self.config.time_threshold_seconds {
272 return true;
273 }
274 }
275 false
276 }
277 }
278 }
279
280 pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
282 let mut snapshots = self.snapshots.write();
283 let removed = snapshots.remove(entity_id).map(|v| v.len()).unwrap_or(0);
284
285 let mut stats = self.stats.write();
287 stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
288 stats.total_entities = snapshots.len();
289
290 tracing::info!("๐๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
291
292 Ok(removed)
293 }
294
295 pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
297 let mut snapshots = self.snapshots.write();
298
299 if let Some(entity_snapshots) = snapshots.get_mut(entity_id) {
300 let initial_len = entity_snapshots.len();
301 entity_snapshots.retain(|s| s.id != snapshot_id);
302 let removed = initial_len != entity_snapshots.len();
303
304 if removed {
305 let mut stats = self.stats.write();
307 stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
308 tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
309 }
310
311 return Ok(removed);
312 }
313
314 Ok(false)
315 }
316
317 pub fn stats(&self) -> SnapshotStats {
319 (*self.stats.read()).clone()
320 }
321
322 pub fn clear_all(&self) {
324 let mut snapshots = self.snapshots.write();
325 snapshots.clear();
326
327 let mut stats = self.stats.write();
328 *stats = SnapshotStats::default();
329
330 tracing::info!("๐งน Cleared all snapshots");
331 }
332
333 pub fn config(&self) -> &SnapshotConfig {
335 &self.config
336 }
337
338 pub fn list_entities(&self) -> Vec<String> {
340 let snapshots = self.snapshots.read();
341 snapshots.keys().cloned().collect()
342 }
343}
344
345#[derive(Debug, Deserialize)]
347pub struct CreateSnapshotRequest {
348 pub entity_id: String,
349}
350
351#[derive(Debug, Serialize)]
353pub struct CreateSnapshotResponse {
354 pub snapshot_id: Uuid,
355 pub entity_id: String,
356 pub created_at: DateTime<Utc>,
357 pub event_count: usize,
358 pub size_bytes: usize,
359}
360
361#[derive(Debug, Deserialize)]
363pub struct ListSnapshotsRequest {
364 pub entity_id: Option<String>,
365}
366
367#[derive(Debug, Serialize)]
369pub struct ListSnapshotsResponse {
370 pub snapshots: Vec<SnapshotInfo>,
371 pub total: usize,
372}
373
374#[derive(Debug, Serialize)]
375pub struct SnapshotInfo {
376 pub id: Uuid,
377 pub entity_id: String,
378 pub created_at: DateTime<Utc>,
379 pub as_of: DateTime<Utc>,
380 pub event_count: usize,
381 pub size_bytes: usize,
382 pub snapshot_type: SnapshotType,
383}
384
385impl From<Snapshot> for SnapshotInfo {
386 fn from(snapshot: Snapshot) -> Self {
387 Self {
388 id: snapshot.id,
389 entity_id: snapshot.entity_id,
390 created_at: snapshot.created_at,
391 as_of: snapshot.as_of,
392 event_count: snapshot.event_count,
393 size_bytes: snapshot.metadata.size_bytes,
394 snapshot_type: snapshot.metadata.snapshot_type,
395 }
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use chrono::Duration;
403 use serde_json::json;
404
405 fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
406 Snapshot::new(
407 entity_id.to_string(),
408 json!({"count": event_count}),
409 Utc::now(),
410 event_count,
411 SnapshotType::Automatic,
412 )
413 }
414
415 #[test]
416 fn test_snapshot_creation() {
417 let snapshot = create_test_snapshot("entity-1", 100);
418 assert_eq!(snapshot.entity_id, "entity-1");
419 assert_eq!(snapshot.event_count, 100);
420 assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
421 }
422
423 #[test]
424 fn test_snapshot_manager() {
425 let manager = SnapshotManager::new(SnapshotConfig::default());
426
427 let result = manager.create_snapshot(
428 "entity-1".to_string(),
429 json!({"value": 42}),
430 Utc::now(),
431 100,
432 SnapshotType::Manual,
433 );
434
435 assert!(result.is_ok());
436
437 let latest = manager.get_latest_snapshot("entity-1");
438 assert!(latest.is_some());
439 assert_eq!(latest.unwrap().event_count, 100);
440 }
441
442 #[test]
443 fn test_snapshot_pruning() {
444 let config = SnapshotConfig {
445 max_snapshots_per_entity: 3,
446 ..Default::default()
447 };
448 let manager = SnapshotManager::new(config);
449
450 for i in 0..5 {
452 manager
453 .create_snapshot(
454 "entity-1".to_string(),
455 json!({"count": i}),
456 Utc::now(),
457 i,
458 SnapshotType::Automatic,
459 )
460 .unwrap();
461 }
462
463 let snapshots = manager.get_all_snapshots("entity-1");
465 assert_eq!(snapshots.len(), 3);
466 }
467
468 #[test]
469 fn test_should_create_snapshot() {
470 let config = SnapshotConfig {
471 event_threshold: 100,
472 time_threshold_seconds: 3600,
473 auto_snapshot: true,
474 ..Default::default()
475 };
476 let manager = SnapshotManager::new(config);
477
478 assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
480
481 assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
483
484 manager
486 .create_snapshot(
487 "entity-1".to_string(),
488 json!({"value": 1}),
489 Utc::now(),
490 100,
491 SnapshotType::Automatic,
492 )
493 .unwrap();
494
495 assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
497
498 assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
500 }
501
502 #[test]
503 fn test_merge_with_events() {
504 let snapshot = Snapshot::new(
505 "entity-1".to_string(),
506 json!({"name": "Alice", "score": 10}),
507 Utc::now(),
508 5,
509 SnapshotType::Automatic,
510 );
511
512 let event = Event::reconstruct_from_strings(
513 Uuid::new_v4(),
514 "score.updated".to_string(),
515 "entity-1".to_string(),
516 "default".to_string(),
517 json!({"score": 20}),
518 Utc::now(),
519 None,
520 1,
521 );
522
523 let merged = snapshot.merge_with_events(&[event]);
524 assert_eq!(merged["name"], "Alice");
525 assert_eq!(merged["score"], 20);
526 }
527
528 #[test]
529 fn test_default_config() {
530 let config = SnapshotConfig::default();
531 assert_eq!(config.event_threshold, 100);
532 assert_eq!(config.time_threshold_seconds, 3600);
533 assert_eq!(config.max_snapshots_per_entity, 10);
534 assert!(config.auto_snapshot);
535 }
536
537 #[test]
538 fn test_snapshot_type_serde() {
539 let types = vec![
540 SnapshotType::Manual,
541 SnapshotType::Automatic,
542 SnapshotType::OnDemand,
543 ];
544
545 for snapshot_type in types {
546 let json = serde_json::to_string(&snapshot_type).unwrap();
547 let parsed: SnapshotType = serde_json::from_str(&json).unwrap();
548 assert_eq!(parsed, snapshot_type);
549 }
550 }
551
552 #[test]
553 fn test_snapshot_serde() {
554 let snapshot = create_test_snapshot("entity-1", 50);
555 let json = serde_json::to_string(&snapshot).unwrap();
556 let parsed: Snapshot = serde_json::from_str(&json).unwrap();
557 assert_eq!(parsed.entity_id, snapshot.entity_id);
558 assert_eq!(parsed.event_count, snapshot.event_count);
559 }
560
561 #[test]
562 fn test_get_latest_snapshot_none() {
563 let manager = SnapshotManager::new(SnapshotConfig::default());
564 let latest = manager.get_latest_snapshot("non-existent");
565 assert!(latest.is_none());
566 }
567
568 #[test]
569 fn test_get_all_snapshots_empty() {
570 let manager = SnapshotManager::new(SnapshotConfig::default());
571 let snapshots = manager.get_all_snapshots("non-existent");
572 assert!(snapshots.is_empty());
573 }
574
575 #[test]
576 fn test_delete_snapshots() {
577 let manager = SnapshotManager::new(SnapshotConfig::default());
578
579 manager
580 .create_snapshot(
581 "entity-1".to_string(),
582 json!({"value": 1}),
583 Utc::now(),
584 100,
585 SnapshotType::Manual,
586 )
587 .unwrap();
588
589 let deleted = manager.delete_snapshots("entity-1").unwrap();
590 assert_eq!(deleted, 1);
591
592 let latest = manager.get_latest_snapshot("entity-1");
593 assert!(latest.is_none());
594 }
595
596 #[test]
597 fn test_delete_single_snapshot() {
598 let manager = SnapshotManager::new(SnapshotConfig::default());
599
600 let snapshot = manager
601 .create_snapshot(
602 "entity-1".to_string(),
603 json!({"value": 1}),
604 Utc::now(),
605 100,
606 SnapshotType::Manual,
607 )
608 .unwrap();
609
610 let deleted = manager.delete_snapshot("entity-1", snapshot.id).unwrap();
611 assert!(deleted);
612
613 let latest = manager.get_latest_snapshot("entity-1");
614 assert!(latest.is_none());
615 }
616
617 #[test]
618 fn test_delete_nonexistent_snapshot() {
619 let manager = SnapshotManager::new(SnapshotConfig::default());
620 let deleted = manager.delete_snapshot("entity-1", Uuid::new_v4()).unwrap();
621 assert!(!deleted);
622 }
623
624 #[test]
625 fn test_clear_all() {
626 let manager = SnapshotManager::new(SnapshotConfig::default());
627
628 manager
629 .create_snapshot(
630 "entity-1".to_string(),
631 json!({"value": 1}),
632 Utc::now(),
633 100,
634 SnapshotType::Manual,
635 )
636 .unwrap();
637 manager
638 .create_snapshot(
639 "entity-2".to_string(),
640 json!({"value": 2}),
641 Utc::now(),
642 200,
643 SnapshotType::Manual,
644 )
645 .unwrap();
646
647 manager.clear_all();
648
649 let stats = manager.stats();
650 assert_eq!(stats.total_snapshots, 0);
651 assert_eq!(stats.total_entities, 0);
652 }
653
654 #[test]
655 fn test_list_entities() {
656 let manager = SnapshotManager::new(SnapshotConfig::default());
657
658 manager
659 .create_snapshot(
660 "entity-1".to_string(),
661 json!({"value": 1}),
662 Utc::now(),
663 100,
664 SnapshotType::Manual,
665 )
666 .unwrap();
667 manager
668 .create_snapshot(
669 "entity-2".to_string(),
670 json!({"value": 2}),
671 Utc::now(),
672 200,
673 SnapshotType::Manual,
674 )
675 .unwrap();
676
677 let entities = manager.list_entities();
678 assert_eq!(entities.len(), 2);
679 assert!(entities.contains(&"entity-1".to_string()));
680 assert!(entities.contains(&"entity-2".to_string()));
681 }
682
683 #[test]
684 fn test_get_config() {
685 let config = SnapshotConfig {
686 event_threshold: 50,
687 ..Default::default()
688 };
689 let manager = SnapshotManager::new(config);
690 assert_eq!(manager.config().event_threshold, 50);
691 }
692
693 #[test]
694 fn test_stats() {
695 let manager = SnapshotManager::new(SnapshotConfig::default());
696
697 manager
698 .create_snapshot(
699 "entity-1".to_string(),
700 json!({"value": 1}),
701 Utc::now(),
702 100,
703 SnapshotType::Manual,
704 )
705 .unwrap();
706
707 let stats = manager.stats();
708 assert_eq!(stats.total_snapshots, 1);
709 assert_eq!(stats.total_entities, 1);
710 assert_eq!(stats.snapshots_created, 1);
711 }
712
713 #[test]
714 fn test_snapshot_as_of() {
715 let manager = SnapshotManager::new(SnapshotConfig::default());
716 let now = Utc::now();
717 let past = now - Duration::hours(2);
718 let future = now + Duration::hours(2);
719
720 manager
722 .create_snapshot(
723 "entity-1".to_string(),
724 json!({"value": 1}),
725 past,
726 100,
727 SnapshotType::Manual,
728 )
729 .unwrap();
730
731 let snapshot = manager.get_snapshot_as_of("entity-1", now);
733 assert!(snapshot.is_some());
734
735 let very_past = past - Duration::hours(1);
737 let snapshot = manager.get_snapshot_as_of("entity-1", very_past);
738 assert!(snapshot.is_none());
739 }
740
741 #[test]
742 fn test_should_create_snapshot_time_threshold() {
743 let config = SnapshotConfig {
744 event_threshold: 1000, time_threshold_seconds: 1, auto_snapshot: true,
747 ..Default::default()
748 };
749 let manager = SnapshotManager::new(config);
750
751 let past = Utc::now() - Duration::seconds(2);
753 manager
754 .create_snapshot(
755 "entity-1".to_string(),
756 json!({"value": 1}),
757 past,
758 100,
759 SnapshotType::Manual,
760 )
761 .unwrap();
762
763 assert!(manager.should_create_snapshot("entity-1", 101, Utc::now()));
765 }
766
767 #[test]
768 fn test_should_create_snapshot_disabled() {
769 let config = SnapshotConfig {
770 auto_snapshot: false,
771 ..Default::default()
772 };
773 let manager = SnapshotManager::new(config);
774
775 assert!(!manager.should_create_snapshot("entity-1", 1000, Utc::now()));
777 }
778
779 #[test]
780 fn test_snapshot_info_from() {
781 let snapshot = create_test_snapshot("entity-1", 50);
782 let info: SnapshotInfo = snapshot.clone().into();
783
784 assert_eq!(info.id, snapshot.id);
785 assert_eq!(info.entity_id, snapshot.entity_id);
786 assert_eq!(info.event_count, snapshot.event_count);
787 assert_eq!(info.snapshot_type, snapshot.metadata.snapshot_type);
788 }
789
790 #[test]
791 fn test_snapshot_metadata() {
792 let snapshot = create_test_snapshot("entity-1", 100);
793 assert_eq!(snapshot.metadata.version, 1);
794 assert!(snapshot.metadata.size_bytes > 0);
795 }
796
797 #[test]
798 fn test_multiple_entities() {
799 let manager = SnapshotManager::new(SnapshotConfig::default());
800
801 for i in 0..5 {
802 manager
803 .create_snapshot(
804 format!("entity-{}", i),
805 json!({"value": i}),
806 Utc::now(),
807 100 + i,
808 SnapshotType::Automatic,
809 )
810 .unwrap();
811 }
812
813 let stats = manager.stats();
814 assert_eq!(stats.total_entities, 5);
815 assert_eq!(stats.total_snapshots, 5);
816 }
817
818 #[test]
819 fn test_snapshot_stats_default() {
820 let stats = SnapshotStats::default();
821 assert_eq!(stats.total_snapshots, 0);
822 assert_eq!(stats.total_entities, 0);
823 assert_eq!(stats.total_size_bytes, 0);
824 assert_eq!(stats.snapshots_created, 0);
825 assert_eq!(stats.snapshots_pruned, 0);
826 }
827}