allsource_core/
snapshot.rs

1use crate::error::Result;
2use crate::domain::entities::Event;
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// A point-in-time snapshot of entity state
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Snapshot {
13    /// Unique snapshot identifier
14    pub id: Uuid,
15
16    /// Entity this snapshot represents
17    pub entity_id: String,
18
19    /// The state data at this point in time
20    pub state: serde_json::Value,
21
22    /// Timestamp when this snapshot was created
23    pub created_at: DateTime<Utc>,
24
25    /// Last event timestamp included in this snapshot
26    pub as_of: DateTime<Utc>,
27
28    /// Number of events processed to create this snapshot
29    pub event_count: usize,
30
31    /// Metadata about the snapshot
32    pub metadata: SnapshotMetadata,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct SnapshotMetadata {
37    /// Type of snapshot (manual, automatic, etc.)
38    pub snapshot_type: SnapshotType,
39
40    /// Size of the snapshot in bytes (approximate)
41    pub size_bytes: usize,
42
43    /// Version of the snapshot format
44    pub version: u32,
45}
46
47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum SnapshotType {
50    Manual,
51    Automatic,
52    OnDemand,
53}
54
55impl Snapshot {
56    /// Create a new snapshot from entity state
57    pub fn new(
58        entity_id: String,
59        state: serde_json::Value,
60        as_of: DateTime<Utc>,
61        event_count: usize,
62        snapshot_type: SnapshotType,
63    ) -> Self {
64        let state_json = serde_json::to_string(&state).unwrap_or_default();
65        let size_bytes = state_json.len();
66
67        Self {
68            id: Uuid::new_v4(),
69            entity_id,
70            state,
71            created_at: Utc::now(),
72            as_of,
73            event_count,
74            metadata: SnapshotMetadata {
75                snapshot_type,
76                size_bytes,
77                version: 1,
78            },
79        }
80    }
81
82    /// Merge this snapshot with subsequent events to get current state
83    pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
84        let mut merged = self.state.clone();
85
86        for event in events {
87            // Only process events after the snapshot
88            if event.timestamp > self.as_of {
89                if let serde_json::Value::Object(ref mut state_map) = merged {
90                    if let serde_json::Value::Object(ref payload_map) = event.payload {
91                        for (key, value) in payload_map {
92                            state_map.insert(key.clone(), value.clone());
93                        }
94                    }
95                }
96            }
97        }
98
99        merged
100    }
101}
102
103/// Configuration for snapshot creation
104#[derive(Debug, Clone)]
105pub struct SnapshotConfig {
106    /// Create snapshot after this many events for an entity
107    pub event_threshold: usize,
108
109    /// Maximum age before creating a new snapshot
110    pub time_threshold_seconds: i64,
111
112    /// Maximum number of snapshots to keep per entity
113    pub max_snapshots_per_entity: usize,
114
115    /// Enable automatic snapshot creation
116    pub auto_snapshot: bool,
117}
118
119impl Default for SnapshotConfig {
120    fn default() -> Self {
121        Self {
122            event_threshold: 100,
123            time_threshold_seconds: 3600, // 1 hour
124            max_snapshots_per_entity: 10,
125            auto_snapshot: true,
126        }
127    }
128}
129
130/// Manages entity snapshots for fast state recovery
131pub struct SnapshotManager {
132    /// Snapshots organized by entity_id
133    snapshots: Arc<RwLock<HashMap<String, Vec<Snapshot>>>>,
134
135    /// Configuration
136    config: SnapshotConfig,
137
138    /// Statistics
139    stats: Arc<RwLock<SnapshotStats>>,
140}
141
142#[derive(Debug, Clone, Default, Serialize)]
143pub struct SnapshotStats {
144    pub total_snapshots: usize,
145    pub total_entities: usize,
146    pub total_size_bytes: usize,
147    pub snapshots_created: u64,
148    pub snapshots_pruned: u64,
149}
150
151impl SnapshotManager {
152    /// Create a new snapshot manager
153    pub fn new(config: SnapshotConfig) -> Self {
154        Self {
155            snapshots: Arc::new(RwLock::new(HashMap::new())),
156            config,
157            stats: Arc::new(RwLock::new(SnapshotStats::default())),
158        }
159    }
160
161    /// Create a new snapshot for an entity
162    pub fn create_snapshot(
163        &self,
164        entity_id: String,
165        state: serde_json::Value,
166        as_of: DateTime<Utc>,
167        event_count: usize,
168        snapshot_type: SnapshotType,
169    ) -> Result<Snapshot> {
170        let snapshot = Snapshot::new(entity_id.clone(), state, as_of, event_count, snapshot_type);
171
172        let mut snapshots = self.snapshots.write();
173        let entity_snapshots = snapshots.entry(entity_id.clone()).or_insert_with(Vec::new);
174
175        // Add new snapshot
176        entity_snapshots.push(snapshot.clone());
177
178        // Sort by timestamp (newest first)
179        entity_snapshots.sort_by(|a, b| b.as_of.cmp(&a.as_of));
180
181        // Prune old snapshots if over limit
182        let mut pruned = 0;
183        if entity_snapshots.len() > self.config.max_snapshots_per_entity {
184            let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
185            entity_snapshots.truncate(self.config.max_snapshots_per_entity);
186            pruned = to_remove;
187        }
188
189        // Update statistics
190        let mut stats = self.stats.write();
191        stats.snapshots_created += 1;
192        stats.snapshots_pruned += pruned as u64;
193        stats.total_snapshots = snapshots.values().map(|v| v.len()).sum();
194        stats.total_entities = snapshots.len();
195        stats.total_size_bytes = snapshots
196            .values()
197            .flatten()
198            .map(|s| s.metadata.size_bytes)
199            .sum();
200
201        tracing::info!(
202            "๐Ÿ“ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
203            match snapshot_type {
204                SnapshotType::Manual => "manual",
205                SnapshotType::Automatic => "automatic",
206                SnapshotType::OnDemand => "on-demand",
207            },
208            entity_id,
209            event_count,
210            snapshot.metadata.size_bytes
211        );
212
213        Ok(snapshot)
214    }
215
216    /// Get the most recent snapshot for an entity
217    pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
218        let snapshots = self.snapshots.read();
219        snapshots
220            .get(entity_id)
221            .and_then(|entity_snapshots| entity_snapshots.first().cloned())
222    }
223
224    /// Get the best snapshot to use for reconstruction as of a specific time
225    pub fn get_snapshot_as_of(
226        &self,
227        entity_id: &str,
228        as_of: DateTime<Utc>,
229    ) -> Option<Snapshot> {
230        let snapshots = self.snapshots.read();
231        snapshots.get(entity_id).and_then(|entity_snapshots| {
232            entity_snapshots
233                .iter()
234                .filter(|s| s.as_of <= as_of)
235                .max_by_key(|s| s.as_of)
236                .cloned()
237        })
238    }
239
240    /// Get all snapshots for an entity
241    pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
242        let snapshots = self.snapshots.read();
243        snapshots
244            .get(entity_id)
245            .map(|v| v.clone())
246            .unwrap_or_default()
247    }
248
249    /// Check if a new snapshot should be created for an entity
250    pub fn should_create_snapshot(
251        &self,
252        entity_id: &str,
253        current_event_count: usize,
254        last_event_time: DateTime<Utc>,
255    ) -> bool {
256        if !self.config.auto_snapshot {
257            return false;
258        }
259
260        let snapshots = self.snapshots.read();
261        let entity_snapshots = snapshots.get(entity_id);
262
263        match entity_snapshots {
264            None => {
265                // No snapshots exist, create one if we have enough events
266                current_event_count >= self.config.event_threshold
267            }
268            Some(snaps) => {
269                if let Some(latest) = snaps.first() {
270                    // Check event count threshold
271                    let events_since_snapshot = current_event_count - latest.event_count;
272                    if events_since_snapshot >= self.config.event_threshold {
273                        return true;
274                    }
275
276                    // Check time threshold
277                    let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
278                    if time_since_snapshot >= self.config.time_threshold_seconds {
279                        return true;
280                    }
281                }
282                false
283            }
284        }
285    }
286
287    /// Delete all snapshots for an entity
288    pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
289        let mut snapshots = self.snapshots.write();
290        let removed = snapshots.remove(entity_id).map(|v| v.len()).unwrap_or(0);
291
292        // Update stats
293        let mut stats = self.stats.write();
294        stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
295        stats.total_entities = snapshots.len();
296
297        tracing::info!("๐Ÿ—‘๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
298
299        Ok(removed)
300    }
301
302    /// Delete a specific snapshot by ID
303    pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
304        let mut snapshots = self.snapshots.write();
305
306        if let Some(entity_snapshots) = snapshots.get_mut(entity_id) {
307            let initial_len = entity_snapshots.len();
308            entity_snapshots.retain(|s| s.id != snapshot_id);
309            let removed = initial_len != entity_snapshots.len();
310
311            if removed {
312                // Update stats
313                let mut stats = self.stats.write();
314                stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
315                tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
316            }
317
318            return Ok(removed);
319        }
320
321        Ok(false)
322    }
323
324    /// Get snapshot statistics
325    pub fn stats(&self) -> SnapshotStats {
326        (*self.stats.read()).clone()
327    }
328
329    /// Clear all snapshots
330    pub fn clear_all(&self) {
331        let mut snapshots = self.snapshots.write();
332        snapshots.clear();
333
334        let mut stats = self.stats.write();
335        *stats = SnapshotStats::default();
336
337        tracing::info!("๐Ÿงน Cleared all snapshots");
338    }
339
340    /// Get configuration
341    pub fn config(&self) -> &SnapshotConfig {
342        &self.config
343    }
344
345    /// List all entities with snapshots
346    pub fn list_entities(&self) -> Vec<String> {
347        let snapshots = self.snapshots.read();
348        snapshots.keys().cloned().collect()
349    }
350}
351
352/// Request to create a manual snapshot
353#[derive(Debug, Deserialize)]
354pub struct CreateSnapshotRequest {
355    pub entity_id: String,
356}
357
358/// Response after creating a snapshot
359#[derive(Debug, Serialize)]
360pub struct CreateSnapshotResponse {
361    pub snapshot_id: Uuid,
362    pub entity_id: String,
363    pub created_at: DateTime<Utc>,
364    pub event_count: usize,
365    pub size_bytes: usize,
366}
367
368/// Request to list snapshots
369#[derive(Debug, Deserialize)]
370pub struct ListSnapshotsRequest {
371    pub entity_id: Option<String>,
372}
373
374/// Response containing snapshot list
375#[derive(Debug, Serialize)]
376pub struct ListSnapshotsResponse {
377    pub snapshots: Vec<SnapshotInfo>,
378    pub total: usize,
379}
380
381#[derive(Debug, Serialize)]
382pub struct SnapshotInfo {
383    pub id: Uuid,
384    pub entity_id: String,
385    pub created_at: DateTime<Utc>,
386    pub as_of: DateTime<Utc>,
387    pub event_count: usize,
388    pub size_bytes: usize,
389    pub snapshot_type: SnapshotType,
390}
391
392impl From<Snapshot> for SnapshotInfo {
393    fn from(snapshot: Snapshot) -> Self {
394        Self {
395            id: snapshot.id,
396            entity_id: snapshot.entity_id,
397            created_at: snapshot.created_at,
398            as_of: snapshot.as_of,
399            event_count: snapshot.event_count,
400            size_bytes: snapshot.metadata.size_bytes,
401            snapshot_type: snapshot.metadata.snapshot_type,
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use serde_json::json;
410
411    fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
412        Snapshot::new(
413            entity_id.to_string(),
414            json!({"count": event_count}),
415            Utc::now(),
416            event_count,
417            SnapshotType::Automatic,
418        )
419    }
420
421    #[test]
422    fn test_snapshot_creation() {
423        let snapshot = create_test_snapshot("entity-1", 100);
424        assert_eq!(snapshot.entity_id, "entity-1");
425        assert_eq!(snapshot.event_count, 100);
426        assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
427    }
428
429    #[test]
430    fn test_snapshot_manager() {
431        let manager = SnapshotManager::new(SnapshotConfig::default());
432
433        let result = manager.create_snapshot(
434            "entity-1".to_string(),
435            json!({"value": 42}),
436            Utc::now(),
437            100,
438            SnapshotType::Manual,
439        );
440
441        assert!(result.is_ok());
442
443        let latest = manager.get_latest_snapshot("entity-1");
444        assert!(latest.is_some());
445        assert_eq!(latest.unwrap().event_count, 100);
446    }
447
448    #[test]
449    fn test_snapshot_pruning() {
450        let config = SnapshotConfig {
451            max_snapshots_per_entity: 3,
452            ..Default::default()
453        };
454        let manager = SnapshotManager::new(config);
455
456        // Create 5 snapshots
457        for i in 0..5 {
458            manager
459                .create_snapshot(
460                    "entity-1".to_string(),
461                    json!({"count": i}),
462                    Utc::now(),
463                    i,
464                    SnapshotType::Automatic,
465                )
466                .unwrap();
467        }
468
469        // Should only keep 3 most recent
470        let snapshots = manager.get_all_snapshots("entity-1");
471        assert_eq!(snapshots.len(), 3);
472    }
473
474    #[test]
475    fn test_should_create_snapshot() {
476        let config = SnapshotConfig {
477            event_threshold: 100,
478            time_threshold_seconds: 3600,
479            auto_snapshot: true,
480            ..Default::default()
481        };
482        let manager = SnapshotManager::new(config);
483
484        // No snapshots, not enough events
485        assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
486
487        // No snapshots, enough events
488        assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
489
490        // Create a snapshot
491        manager
492            .create_snapshot(
493                "entity-1".to_string(),
494                json!({"value": 1}),
495                Utc::now(),
496                100,
497                SnapshotType::Automatic,
498            )
499            .unwrap();
500
501        // Not enough new events
502        assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
503
504        // Enough new events
505        assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
506    }
507
508    #[test]
509    fn test_merge_with_events() {
510        let snapshot = Snapshot::new(
511            "entity-1".to_string(),
512            json!({"name": "Alice", "score": 10}),
513            Utc::now(),
514            5,
515            SnapshotType::Automatic,
516        );
517
518        let event = Event::reconstruct_from_strings(
519            Uuid::new_v4(),
520            "score.updated".to_string(),
521            "entity-1".to_string(),
522            "default".to_string(),
523            json!({"score": 20}),
524            Utc::now(),
525            None,
526            1,
527        );
528
529        let merged = snapshot.merge_with_events(&[event]);
530        assert_eq!(merged["name"], "Alice");
531        assert_eq!(merged["score"], 20);
532    }
533}