eventuali_core/snapshot/
mod.rs

1mod sqlite_store;
2
3pub use sqlite_store::SqliteSnapshotStore;
4
5use crate::{AggregateId, AggregateVersion, Result, EventualiError};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12/// Represents a snapshot of an aggregate at a specific version
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AggregateSnapshot {
15    /// Unique identifier for the snapshot
16    pub snapshot_id: Uuid,
17    /// ID of the aggregate this snapshot represents
18    pub aggregate_id: AggregateId,
19    /// Type of the aggregate
20    pub aggregate_type: String,
21    /// Version of the aggregate when this snapshot was taken
22    pub aggregate_version: AggregateVersion,
23    /// Serialized aggregate state data
24    pub state_data: Vec<u8>,
25    /// Compression algorithm used (if any)
26    pub compression: SnapshotCompression,
27    /// Metadata about the snapshot
28    pub metadata: SnapshotMetadata,
29    /// When this snapshot was created
30    pub created_at: DateTime<Utc>,
31}
32
33/// Compression algorithms supported for snapshots
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub enum SnapshotCompression {
36    None,
37    Gzip,
38    Lz4,
39}
40
41/// Metadata for snapshots
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SnapshotMetadata {
44    /// Size of the original data before compression
45    pub original_size: usize,
46    /// Size of the compressed data
47    pub compressed_size: usize,
48    /// Number of events that were used to build this snapshot
49    pub event_count: usize,
50    /// Checksum of the snapshot data for integrity verification
51    pub checksum: String,
52    /// Additional custom metadata
53    pub custom: HashMap<String, String>,
54}
55
56/// Configuration for snapshot behavior
57#[derive(Debug, Clone)]
58pub struct SnapshotConfig {
59    /// How often to take snapshots (every N events)
60    pub snapshot_frequency: AggregateVersion,
61    /// Maximum age of snapshots before they should be replaced
62    pub max_snapshot_age_hours: u64,
63    /// Compression algorithm to use
64    pub compression: SnapshotCompression,
65    /// Whether to automatically clean up old snapshots
66    pub auto_cleanup: bool,
67}
68
69impl Default for SnapshotConfig {
70    fn default() -> Self {
71        Self {
72            snapshot_frequency: 100, // Snapshot every 100 events
73            max_snapshot_age_hours: 24 * 7, // Keep snapshots for a week
74            compression: SnapshotCompression::Gzip,
75            auto_cleanup: true,
76        }
77    }
78}
79
80/// Trait for snapshot storage backends
81#[async_trait]
82pub trait SnapshotStore {
83    /// Store a new snapshot
84    async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()>;
85    
86    /// Load the latest snapshot for an aggregate
87    async fn load_latest_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>>;
88    
89    /// Load a specific snapshot by ID
90    async fn load_snapshot(&self, snapshot_id: Uuid) -> Result<Option<AggregateSnapshot>>;
91    
92    /// Get all snapshots for an aggregate, ordered by version descending
93    async fn list_snapshots(&self, aggregate_id: &AggregateId) -> Result<Vec<AggregateSnapshot>>;
94    
95    /// Delete a snapshot
96    async fn delete_snapshot(&self, snapshot_id: Uuid) -> Result<()>;
97    
98    /// Clean up old snapshots based on configuration
99    async fn cleanup_old_snapshots(&self, config: &SnapshotConfig) -> Result<u64>;
100    
101    /// Check if a snapshot should be taken for an aggregate at the given version
102    async fn should_take_snapshot(
103        &self, 
104        aggregate_id: &AggregateId, 
105        current_version: AggregateVersion,
106        config: &SnapshotConfig
107    ) -> Result<bool>;
108}
109
110/// Service for managing aggregate snapshots
111pub struct SnapshotService<S: SnapshotStore> {
112    store: S,
113    config: SnapshotConfig,
114}
115
116impl<S: SnapshotStore> SnapshotService<S> {
117    pub fn new(store: S, config: SnapshotConfig) -> Self {
118        Self { store, config }
119    }
120
121    /// Create a snapshot from aggregate state data
122    pub async fn create_snapshot(
123        &self,
124        aggregate_id: AggregateId,
125        aggregate_type: String,
126        aggregate_version: AggregateVersion,
127        state_data: Vec<u8>,
128        event_count: usize,
129    ) -> Result<AggregateSnapshot> {
130        let compressed_data = self.compress_data(&state_data)?;
131        let checksum = self.calculate_checksum(&compressed_data);
132
133        let metadata = SnapshotMetadata {
134            original_size: state_data.len(),
135            compressed_size: compressed_data.len(),
136            event_count,
137            checksum,
138            custom: HashMap::new(),
139        };
140
141        let snapshot = AggregateSnapshot {
142            snapshot_id: Uuid::new_v4(),
143            aggregate_id,
144            aggregate_type,
145            aggregate_version,
146            state_data: compressed_data,
147            compression: self.config.compression.clone(),
148            metadata,
149            created_at: Utc::now(),
150        };
151
152        self.store.save_snapshot(snapshot.clone()).await?;
153        Ok(snapshot)
154    }
155
156    /// Load the most recent snapshot for an aggregate
157    pub async fn load_latest_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>> {
158        self.store.load_latest_snapshot(aggregate_id).await
159    }
160
161    /// Decompress snapshot data
162    pub fn decompress_snapshot_data(&self, snapshot: &AggregateSnapshot) -> Result<Vec<u8>> {
163        self.decompress_data(&snapshot.state_data, &snapshot.compression)
164    }
165
166    /// Check if a snapshot should be taken
167    pub async fn should_take_snapshot(
168        &self,
169        aggregate_id: &AggregateId,
170        current_version: AggregateVersion,
171    ) -> Result<bool> {
172        self.store.should_take_snapshot(aggregate_id, current_version, &self.config).await
173    }
174
175    /// Compress data using the configured compression algorithm
176    fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
177        match self.config.compression {
178            SnapshotCompression::None => Ok(data.to_vec()),
179            SnapshotCompression::Gzip => {
180                use flate2::write::GzEncoder;
181                use flate2::Compression;
182                use std::io::Write;
183
184                let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
185                encoder.write_all(data).map_err(EventualiError::Io)?;
186                encoder.finish().map_err(EventualiError::Io)
187            }
188            SnapshotCompression::Lz4 => {
189                // For now, fallback to no compression if lz4 is not available
190                // In a real implementation, you'd use lz4_flex or similar crate
191                Ok(data.to_vec())
192            }
193        }
194    }
195
196    /// Decompress data using the specified compression algorithm
197    fn decompress_data(&self, data: &[u8], compression: &SnapshotCompression) -> Result<Vec<u8>> {
198        match compression {
199            SnapshotCompression::None => Ok(data.to_vec()),
200            SnapshotCompression::Gzip => {
201                use flate2::read::GzDecoder;
202                use std::io::Read;
203
204                let mut decoder = GzDecoder::new(data);
205                let mut decompressed = Vec::new();
206                decoder.read_to_end(&mut decompressed).map_err(EventualiError::Io)?;
207                Ok(decompressed)
208            }
209            SnapshotCompression::Lz4 => {
210                // For now, fallback to no compression
211                Ok(data.to_vec())
212            }
213        }
214    }
215
216    /// Calculate checksum for data integrity
217    fn calculate_checksum(&self, data: &[u8]) -> String {
218        use sha2::{Sha256, Digest};
219        let mut hasher = Sha256::new();
220        hasher.update(data);
221        format!("{:x}", hasher.finalize())
222    }
223
224    /// Perform cleanup of old snapshots
225    pub async fn cleanup_old_snapshots(&self) -> Result<u64> {
226        self.store.cleanup_old_snapshots(&self.config).await
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_snapshot_compression_none() {
236        let config = SnapshotConfig {
237            compression: SnapshotCompression::None,
238            ..Default::default()
239        };
240        
241        // Create a mock store for testing
242        struct MockStore;
243        #[async_trait]
244        impl SnapshotStore for MockStore {
245            async fn save_snapshot(&self, _: AggregateSnapshot) -> Result<()> { Ok(()) }
246            async fn load_latest_snapshot(&self, _: &AggregateId) -> Result<Option<AggregateSnapshot>> { Ok(None) }
247            async fn load_snapshot(&self, _: Uuid) -> Result<Option<AggregateSnapshot>> { Ok(None) }
248            async fn list_snapshots(&self, _: &AggregateId) -> Result<Vec<AggregateSnapshot>> { Ok(vec![]) }
249            async fn delete_snapshot(&self, _: Uuid) -> Result<()> { Ok(()) }
250            async fn cleanup_old_snapshots(&self, _: &SnapshotConfig) -> Result<u64> { Ok(0) }
251            async fn should_take_snapshot(&self, _: &AggregateId, _: AggregateVersion, _: &SnapshotConfig) -> Result<bool> { Ok(false) }
252        }
253        
254        let service = SnapshotService::new(MockStore, config);
255        let data = b"test data".to_vec();
256        let compressed = service.compress_data(&data).unwrap();
257        
258        assert_eq!(compressed, data);
259    }
260
261    #[test]
262    fn test_snapshot_config_default() {
263        let config = SnapshotConfig::default();
264        assert_eq!(config.snapshot_frequency, 100);
265        assert_eq!(config.compression, SnapshotCompression::Gzip);
266        assert!(config.auto_cleanup);
267    }
268}