1use crate::{domain::entities::Event, error::Result};
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Snapshot {
12 pub id: Uuid,
14
15 pub entity_id: String,
17
18 pub state: serde_json::Value,
20
21 pub created_at: DateTime<Utc>,
23
24 pub as_of: DateTime<Utc>,
26
27 pub event_count: usize,
29
30 pub metadata: SnapshotMetadata,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct SnapshotMetadata {
36 pub snapshot_type: SnapshotType,
38
39 pub size_bytes: usize,
41
42 pub version: u32,
44}
45
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47#[serde(rename_all = "lowercase")]
48pub enum SnapshotType {
49 Manual,
50 Automatic,
51 OnDemand,
52}
53
54impl Snapshot {
55 pub fn new(
57 entity_id: String,
58 state: serde_json::Value,
59 as_of: DateTime<Utc>,
60 event_count: usize,
61 snapshot_type: SnapshotType,
62 ) -> Self {
63 let state_json = serde_json::to_string(&state).unwrap_or_default();
64 let size_bytes = state_json.len();
65
66 Self {
67 id: Uuid::new_v4(),
68 entity_id,
69 state,
70 created_at: Utc::now(),
71 as_of,
72 event_count,
73 metadata: SnapshotMetadata {
74 snapshot_type,
75 size_bytes,
76 version: 1,
77 },
78 }
79 }
80
81 pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
83 let mut merged = self.state.clone();
84
85 for event in events {
86 if event.timestamp > self.as_of
88 && let serde_json::Value::Object(ref mut state_map) = merged
89 && let serde_json::Value::Object(ref payload_map) = event.payload
90 {
91 for (key, value) in payload_map {
92 state_map.insert(key.clone(), value.clone());
93 }
94 }
95 }
96
97 merged
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct SnapshotConfig {
104 pub event_threshold: usize,
106
107 pub time_threshold_seconds: i64,
109
110 pub max_snapshots_per_entity: usize,
112
113 pub auto_snapshot: bool,
115}
116
117impl Default for SnapshotConfig {
118 fn default() -> Self {
119 Self {
120 event_threshold: 100,
121 time_threshold_seconds: 3600, max_snapshots_per_entity: 10,
123 auto_snapshot: true,
124 }
125 }
126}
127
128pub struct SnapshotManager {
130 snapshots: Arc<DashMap<String, Vec<Snapshot>>>,
132
133 config: SnapshotConfig,
135
136 stats: Arc<RwLock<SnapshotStats>>,
138}
139
140#[derive(Debug, Clone, Default, Serialize)]
141pub struct SnapshotStats {
142 pub total_snapshots: usize,
143 pub total_entities: usize,
144 pub total_size_bytes: usize,
145 pub snapshots_created: u64,
146 pub snapshots_pruned: u64,
147}
148
149impl SnapshotManager {
150 pub fn new(config: SnapshotConfig) -> Self {
152 Self {
153 snapshots: Arc::new(DashMap::new()),
154 config,
155 stats: Arc::new(RwLock::new(SnapshotStats::default())),
156 }
157 }
158
159 pub fn create_snapshot(
161 &self,
162 entity_id: String,
163 state: serde_json::Value,
164 as_of: DateTime<Utc>,
165 event_count: usize,
166 snapshot_type: SnapshotType,
167 ) -> Result<Snapshot> {
168 let snapshot = Snapshot::new(entity_id.clone(), state, as_of, event_count, snapshot_type);
169
170 let mut entity_snapshots = self.snapshots.entry(entity_id.clone()).or_default();
171
172 entity_snapshots.push(snapshot.clone());
174
175 entity_snapshots.sort_by_key(|x| std::cmp::Reverse(x.as_of));
177
178 let mut pruned = 0;
180 if entity_snapshots.len() > self.config.max_snapshots_per_entity {
181 let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
182 entity_snapshots.truncate(self.config.max_snapshots_per_entity);
183 pruned = to_remove;
184 }
185
186 drop(entity_snapshots);
188
189 let mut stats = self.stats.write();
191 stats.snapshots_created += 1;
192 stats.snapshots_pruned += pruned as u64;
193 stats.total_snapshots = self.snapshots.iter().map(|entry| entry.value().len()).sum();
194 stats.total_entities = self.snapshots.len();
195 stats.total_size_bytes = self
196 .snapshots
197 .iter()
198 .map(|entry| {
199 entry
200 .value()
201 .iter()
202 .map(|s| s.metadata.size_bytes)
203 .sum::<usize>()
204 })
205 .sum();
206
207 tracing::info!(
208 "๐ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
209 match snapshot_type {
210 SnapshotType::Manual => "manual",
211 SnapshotType::Automatic => "automatic",
212 SnapshotType::OnDemand => "on-demand",
213 },
214 entity_id,
215 event_count,
216 snapshot.metadata.size_bytes
217 );
218
219 Ok(snapshot)
220 }
221
222 pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
224 self.snapshots
225 .get(entity_id)
226 .and_then(|entry| entry.value().first().cloned())
227 }
228
229 pub fn get_snapshot_as_of(&self, entity_id: &str, as_of: DateTime<Utc>) -> Option<Snapshot> {
231 self.snapshots.get(entity_id).and_then(|entry| {
232 entry
233 .value()
234 .iter()
235 .filter(|s| s.as_of <= as_of)
236 .max_by_key(|s| s.as_of)
237 .cloned()
238 })
239 }
240
241 pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
243 self.snapshots
244 .get(entity_id)
245 .map(|entry| entry.value().clone())
246 .unwrap_or_default()
247 }
248
249 pub fn should_create_snapshot(
251 &self,
252 entity_id: &str,
253 current_event_count: usize,
254 last_event_time: DateTime<Utc>,
255 ) -> bool {
256 if !self.config.auto_snapshot {
257 return false;
258 }
259
260 match self.snapshots.get(entity_id) {
261 None => {
262 current_event_count >= self.config.event_threshold
264 }
265 Some(entry) => {
266 let snaps = entry.value();
267 if let Some(latest) = snaps.first() {
268 let events_since_snapshot = current_event_count - latest.event_count;
270 if events_since_snapshot >= self.config.event_threshold {
271 return true;
272 }
273
274 let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
276 if time_since_snapshot >= self.config.time_threshold_seconds {
277 return true;
278 }
279 }
280 false
281 }
282 }
283 }
284
285 pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
287 let removed = self
288 .snapshots
289 .remove(entity_id)
290 .map(|(_, v)| v.len())
291 .unwrap_or(0);
292
293 let mut stats = self.stats.write();
295 stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
296 stats.total_entities = self.snapshots.len();
297
298 tracing::info!("๐๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
299
300 Ok(removed)
301 }
302
303 pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
305 if let Some(mut entity_snapshots) = self.snapshots.get_mut(entity_id) {
306 let initial_len = entity_snapshots.len();
307 entity_snapshots.retain(|s| s.id != snapshot_id);
308 let removed = initial_len != entity_snapshots.len();
309
310 if removed {
311 let mut stats = self.stats.write();
313 stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
314 tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
315 }
316
317 return Ok(removed);
318 }
319
320 Ok(false)
321 }
322
323 pub fn stats(&self) -> SnapshotStats {
325 (*self.stats.read()).clone()
326 }
327
328 pub fn clear_all(&self) {
330 self.snapshots.clear();
331
332 let mut stats = self.stats.write();
333 *stats = SnapshotStats::default();
334
335 tracing::info!("๐งน Cleared all snapshots");
336 }
337
338 pub fn config(&self) -> &SnapshotConfig {
340 &self.config
341 }
342
343 pub fn list_entities(&self) -> Vec<String> {
345 self.snapshots
346 .iter()
347 .map(|entry| entry.key().clone())
348 .collect()
349 }
350}
351
352#[derive(Debug, Deserialize)]
354pub struct CreateSnapshotRequest {
355 pub entity_id: String,
356}
357
358#[derive(Debug, Serialize)]
360pub struct CreateSnapshotResponse {
361 pub snapshot_id: Uuid,
362 pub entity_id: String,
363 pub created_at: DateTime<Utc>,
364 pub event_count: usize,
365 pub size_bytes: usize,
366}
367
368#[derive(Debug, Deserialize)]
370pub struct ListSnapshotsRequest {
371 pub entity_id: Option<String>,
372}
373
374#[derive(Debug, Serialize)]
376pub struct ListSnapshotsResponse {
377 pub snapshots: Vec<SnapshotInfo>,
378 pub total: usize,
379}
380
381#[derive(Debug, Serialize)]
382pub struct SnapshotInfo {
383 pub id: Uuid,
384 pub entity_id: String,
385 pub created_at: DateTime<Utc>,
386 pub as_of: DateTime<Utc>,
387 pub event_count: usize,
388 pub size_bytes: usize,
389 pub snapshot_type: SnapshotType,
390}
391
392impl From<Snapshot> for SnapshotInfo {
393 fn from(snapshot: Snapshot) -> Self {
394 Self {
395 id: snapshot.id,
396 entity_id: snapshot.entity_id,
397 created_at: snapshot.created_at,
398 as_of: snapshot.as_of,
399 event_count: snapshot.event_count,
400 size_bytes: snapshot.metadata.size_bytes,
401 snapshot_type: snapshot.metadata.snapshot_type,
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use chrono::Duration;
410 use serde_json::json;
411
412 fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
413 Snapshot::new(
414 entity_id.to_string(),
415 json!({"count": event_count}),
416 Utc::now(),
417 event_count,
418 SnapshotType::Automatic,
419 )
420 }
421
422 #[test]
423 fn test_snapshot_creation() {
424 let snapshot = create_test_snapshot("entity-1", 100);
425 assert_eq!(snapshot.entity_id, "entity-1");
426 assert_eq!(snapshot.event_count, 100);
427 assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
428 }
429
430 #[test]
431 fn test_snapshot_manager() {
432 let manager = SnapshotManager::new(SnapshotConfig::default());
433
434 let result = manager.create_snapshot(
435 "entity-1".to_string(),
436 json!({"value": 42}),
437 Utc::now(),
438 100,
439 SnapshotType::Manual,
440 );
441
442 assert!(result.is_ok());
443
444 let latest = manager.get_latest_snapshot("entity-1");
445 assert!(latest.is_some());
446 assert_eq!(latest.unwrap().event_count, 100);
447 }
448
449 #[test]
450 fn test_snapshot_pruning() {
451 let config = SnapshotConfig {
452 max_snapshots_per_entity: 3,
453 ..Default::default()
454 };
455 let manager = SnapshotManager::new(config);
456
457 for i in 0..5 {
459 manager
460 .create_snapshot(
461 "entity-1".to_string(),
462 json!({"count": i}),
463 Utc::now(),
464 i,
465 SnapshotType::Automatic,
466 )
467 .unwrap();
468 }
469
470 let snapshots = manager.get_all_snapshots("entity-1");
472 assert_eq!(snapshots.len(), 3);
473 }
474
475 #[test]
476 fn test_should_create_snapshot() {
477 let config = SnapshotConfig {
478 event_threshold: 100,
479 time_threshold_seconds: 3600,
480 auto_snapshot: true,
481 ..Default::default()
482 };
483 let manager = SnapshotManager::new(config);
484
485 assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
487
488 assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
490
491 manager
493 .create_snapshot(
494 "entity-1".to_string(),
495 json!({"value": 1}),
496 Utc::now(),
497 100,
498 SnapshotType::Automatic,
499 )
500 .unwrap();
501
502 assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
504
505 assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
507 }
508
509 #[test]
510 fn test_merge_with_events() {
511 let snapshot = Snapshot::new(
512 "entity-1".to_string(),
513 json!({"name": "Alice", "score": 10}),
514 Utc::now(),
515 5,
516 SnapshotType::Automatic,
517 );
518
519 let event = Event::reconstruct_from_strings(
520 Uuid::new_v4(),
521 "score.updated".to_string(),
522 "entity-1".to_string(),
523 "default".to_string(),
524 json!({"score": 20}),
525 Utc::now(),
526 None,
527 1,
528 );
529
530 let merged = snapshot.merge_with_events(&[event]);
531 assert_eq!(merged["name"], "Alice");
532 assert_eq!(merged["score"], 20);
533 }
534
535 #[test]
536 fn test_default_config() {
537 let config = SnapshotConfig::default();
538 assert_eq!(config.event_threshold, 100);
539 assert_eq!(config.time_threshold_seconds, 3600);
540 assert_eq!(config.max_snapshots_per_entity, 10);
541 assert!(config.auto_snapshot);
542 }
543
544 #[test]
545 fn test_snapshot_type_serde() {
546 let types = vec![
547 SnapshotType::Manual,
548 SnapshotType::Automatic,
549 SnapshotType::OnDemand,
550 ];
551
552 for snapshot_type in types {
553 let json = serde_json::to_string(&snapshot_type).unwrap();
554 let parsed: SnapshotType = serde_json::from_str(&json).unwrap();
555 assert_eq!(parsed, snapshot_type);
556 }
557 }
558
559 #[test]
560 fn test_snapshot_serde() {
561 let snapshot = create_test_snapshot("entity-1", 50);
562 let json = serde_json::to_string(&snapshot).unwrap();
563 let parsed: Snapshot = serde_json::from_str(&json).unwrap();
564 assert_eq!(parsed.entity_id, snapshot.entity_id);
565 assert_eq!(parsed.event_count, snapshot.event_count);
566 }
567
568 #[test]
569 fn test_get_latest_snapshot_none() {
570 let manager = SnapshotManager::new(SnapshotConfig::default());
571 let latest = manager.get_latest_snapshot("non-existent");
572 assert!(latest.is_none());
573 }
574
575 #[test]
576 fn test_get_all_snapshots_empty() {
577 let manager = SnapshotManager::new(SnapshotConfig::default());
578 let snapshots = manager.get_all_snapshots("non-existent");
579 assert!(snapshots.is_empty());
580 }
581
582 #[test]
583 fn test_delete_snapshots() {
584 let manager = SnapshotManager::new(SnapshotConfig::default());
585
586 manager
587 .create_snapshot(
588 "entity-1".to_string(),
589 json!({"value": 1}),
590 Utc::now(),
591 100,
592 SnapshotType::Manual,
593 )
594 .unwrap();
595
596 let deleted = manager.delete_snapshots("entity-1").unwrap();
597 assert_eq!(deleted, 1);
598
599 let latest = manager.get_latest_snapshot("entity-1");
600 assert!(latest.is_none());
601 }
602
603 #[test]
604 fn test_delete_single_snapshot() {
605 let manager = SnapshotManager::new(SnapshotConfig::default());
606
607 let snapshot = manager
608 .create_snapshot(
609 "entity-1".to_string(),
610 json!({"value": 1}),
611 Utc::now(),
612 100,
613 SnapshotType::Manual,
614 )
615 .unwrap();
616
617 let deleted = manager.delete_snapshot("entity-1", snapshot.id).unwrap();
618 assert!(deleted);
619
620 let latest = manager.get_latest_snapshot("entity-1");
621 assert!(latest.is_none());
622 }
623
624 #[test]
625 fn test_delete_nonexistent_snapshot() {
626 let manager = SnapshotManager::new(SnapshotConfig::default());
627 let deleted = manager.delete_snapshot("entity-1", Uuid::new_v4()).unwrap();
628 assert!(!deleted);
629 }
630
631 #[test]
632 fn test_clear_all() {
633 let manager = SnapshotManager::new(SnapshotConfig::default());
634
635 manager
636 .create_snapshot(
637 "entity-1".to_string(),
638 json!({"value": 1}),
639 Utc::now(),
640 100,
641 SnapshotType::Manual,
642 )
643 .unwrap();
644 manager
645 .create_snapshot(
646 "entity-2".to_string(),
647 json!({"value": 2}),
648 Utc::now(),
649 200,
650 SnapshotType::Manual,
651 )
652 .unwrap();
653
654 manager.clear_all();
655
656 let stats = manager.stats();
657 assert_eq!(stats.total_snapshots, 0);
658 assert_eq!(stats.total_entities, 0);
659 }
660
661 #[test]
662 fn test_list_entities() {
663 let manager = SnapshotManager::new(SnapshotConfig::default());
664
665 manager
666 .create_snapshot(
667 "entity-1".to_string(),
668 json!({"value": 1}),
669 Utc::now(),
670 100,
671 SnapshotType::Manual,
672 )
673 .unwrap();
674 manager
675 .create_snapshot(
676 "entity-2".to_string(),
677 json!({"value": 2}),
678 Utc::now(),
679 200,
680 SnapshotType::Manual,
681 )
682 .unwrap();
683
684 let entities = manager.list_entities();
685 assert_eq!(entities.len(), 2);
686 assert!(entities.contains(&"entity-1".to_string()));
687 assert!(entities.contains(&"entity-2".to_string()));
688 }
689
690 #[test]
691 fn test_get_config() {
692 let config = SnapshotConfig {
693 event_threshold: 50,
694 ..Default::default()
695 };
696 let manager = SnapshotManager::new(config);
697 assert_eq!(manager.config().event_threshold, 50);
698 }
699
700 #[test]
701 fn test_stats() {
702 let manager = SnapshotManager::new(SnapshotConfig::default());
703
704 manager
705 .create_snapshot(
706 "entity-1".to_string(),
707 json!({"value": 1}),
708 Utc::now(),
709 100,
710 SnapshotType::Manual,
711 )
712 .unwrap();
713
714 let stats = manager.stats();
715 assert_eq!(stats.total_snapshots, 1);
716 assert_eq!(stats.total_entities, 1);
717 assert_eq!(stats.snapshots_created, 1);
718 }
719
720 #[test]
721 fn test_snapshot_as_of() {
722 let manager = SnapshotManager::new(SnapshotConfig::default());
723 let now = Utc::now();
724 let past = now - Duration::hours(2);
725 let future = now + Duration::hours(2);
726
727 manager
729 .create_snapshot(
730 "entity-1".to_string(),
731 json!({"value": 1}),
732 past,
733 100,
734 SnapshotType::Manual,
735 )
736 .unwrap();
737
738 let snapshot = manager.get_snapshot_as_of("entity-1", now);
740 assert!(snapshot.is_some());
741
742 let very_past = past - Duration::hours(1);
744 let snapshot = manager.get_snapshot_as_of("entity-1", very_past);
745 assert!(snapshot.is_none());
746 }
747
748 #[test]
749 fn test_should_create_snapshot_time_threshold() {
750 let config = SnapshotConfig {
751 event_threshold: 1000, time_threshold_seconds: 1, auto_snapshot: true,
754 ..Default::default()
755 };
756 let manager = SnapshotManager::new(config);
757
758 let past = Utc::now() - Duration::seconds(2);
760 manager
761 .create_snapshot(
762 "entity-1".to_string(),
763 json!({"value": 1}),
764 past,
765 100,
766 SnapshotType::Manual,
767 )
768 .unwrap();
769
770 assert!(manager.should_create_snapshot("entity-1", 101, Utc::now()));
772 }
773
774 #[test]
775 fn test_should_create_snapshot_disabled() {
776 let config = SnapshotConfig {
777 auto_snapshot: false,
778 ..Default::default()
779 };
780 let manager = SnapshotManager::new(config);
781
782 assert!(!manager.should_create_snapshot("entity-1", 1000, Utc::now()));
784 }
785
786 #[test]
787 fn test_snapshot_info_from() {
788 let snapshot = create_test_snapshot("entity-1", 50);
789 let info: SnapshotInfo = snapshot.clone().into();
790
791 assert_eq!(info.id, snapshot.id);
792 assert_eq!(info.entity_id, snapshot.entity_id);
793 assert_eq!(info.event_count, snapshot.event_count);
794 assert_eq!(info.snapshot_type, snapshot.metadata.snapshot_type);
795 }
796
797 #[test]
798 fn test_snapshot_metadata() {
799 let snapshot = create_test_snapshot("entity-1", 100);
800 assert_eq!(snapshot.metadata.version, 1);
801 assert!(snapshot.metadata.size_bytes > 0);
802 }
803
804 #[test]
805 fn test_multiple_entities() {
806 let manager = SnapshotManager::new(SnapshotConfig::default());
807
808 for i in 0..5 {
809 manager
810 .create_snapshot(
811 format!("entity-{}", i),
812 json!({"value": i}),
813 Utc::now(),
814 100 + i,
815 SnapshotType::Automatic,
816 )
817 .unwrap();
818 }
819
820 let stats = manager.stats();
821 assert_eq!(stats.total_entities, 5);
822 assert_eq!(stats.total_snapshots, 5);
823 }
824
825 #[test]
826 fn test_snapshot_stats_default() {
827 let stats = SnapshotStats::default();
828 assert_eq!(stats.total_snapshots, 0);
829 assert_eq!(stats.total_entities, 0);
830 assert_eq!(stats.total_size_bytes, 0);
831 assert_eq!(stats.snapshots_created, 0);
832 assert_eq!(stats.snapshots_pruned, 0);
833 }
834}