1mod sqlite_store;
2
3pub use sqlite_store::SqliteSnapshotStore;
4
5use crate::{AggregateId, AggregateVersion, Result, EventualiError};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AggregateSnapshot {
15 pub snapshot_id: Uuid,
17 pub aggregate_id: AggregateId,
19 pub aggregate_type: String,
21 pub aggregate_version: AggregateVersion,
23 pub state_data: Vec<u8>,
25 pub compression: SnapshotCompression,
27 pub metadata: SnapshotMetadata,
29 pub created_at: DateTime<Utc>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
35pub enum SnapshotCompression {
36 None,
37 Gzip,
38 Lz4,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SnapshotMetadata {
44 pub original_size: usize,
46 pub compressed_size: usize,
48 pub event_count: usize,
50 pub checksum: String,
52 pub custom: HashMap<String, String>,
54}
55
56#[derive(Debug, Clone)]
58pub struct SnapshotConfig {
59 pub snapshot_frequency: AggregateVersion,
61 pub max_snapshot_age_hours: u64,
63 pub compression: SnapshotCompression,
65 pub auto_cleanup: bool,
67}
68
69impl Default for SnapshotConfig {
70 fn default() -> Self {
71 Self {
72 snapshot_frequency: 100, max_snapshot_age_hours: 24 * 7, compression: SnapshotCompression::Gzip,
75 auto_cleanup: true,
76 }
77 }
78}
79
80#[async_trait]
82pub trait SnapshotStore {
83 async fn save_snapshot(&self, snapshot: AggregateSnapshot) -> Result<()>;
85
86 async fn load_latest_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>>;
88
89 async fn load_snapshot(&self, snapshot_id: Uuid) -> Result<Option<AggregateSnapshot>>;
91
92 async fn list_snapshots(&self, aggregate_id: &AggregateId) -> Result<Vec<AggregateSnapshot>>;
94
95 async fn delete_snapshot(&self, snapshot_id: Uuid) -> Result<()>;
97
98 async fn cleanup_old_snapshots(&self, config: &SnapshotConfig) -> Result<u64>;
100
101 async fn should_take_snapshot(
103 &self,
104 aggregate_id: &AggregateId,
105 current_version: AggregateVersion,
106 config: &SnapshotConfig
107 ) -> Result<bool>;
108}
109
110pub struct SnapshotService<S: SnapshotStore> {
112 store: S,
113 config: SnapshotConfig,
114}
115
116impl<S: SnapshotStore> SnapshotService<S> {
117 pub fn new(store: S, config: SnapshotConfig) -> Self {
118 Self { store, config }
119 }
120
121 pub async fn create_snapshot(
123 &self,
124 aggregate_id: AggregateId,
125 aggregate_type: String,
126 aggregate_version: AggregateVersion,
127 state_data: Vec<u8>,
128 event_count: usize,
129 ) -> Result<AggregateSnapshot> {
130 let compressed_data = self.compress_data(&state_data)?;
131 let checksum = self.calculate_checksum(&compressed_data);
132
133 let metadata = SnapshotMetadata {
134 original_size: state_data.len(),
135 compressed_size: compressed_data.len(),
136 event_count,
137 checksum,
138 custom: HashMap::new(),
139 };
140
141 let snapshot = AggregateSnapshot {
142 snapshot_id: Uuid::new_v4(),
143 aggregate_id,
144 aggregate_type,
145 aggregate_version,
146 state_data: compressed_data,
147 compression: self.config.compression.clone(),
148 metadata,
149 created_at: Utc::now(),
150 };
151
152 self.store.save_snapshot(snapshot.clone()).await?;
153 Ok(snapshot)
154 }
155
156 pub async fn load_latest_snapshot(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateSnapshot>> {
158 self.store.load_latest_snapshot(aggregate_id).await
159 }
160
161 pub fn decompress_snapshot_data(&self, snapshot: &AggregateSnapshot) -> Result<Vec<u8>> {
163 self.decompress_data(&snapshot.state_data, &snapshot.compression)
164 }
165
166 pub async fn should_take_snapshot(
168 &self,
169 aggregate_id: &AggregateId,
170 current_version: AggregateVersion,
171 ) -> Result<bool> {
172 self.store.should_take_snapshot(aggregate_id, current_version, &self.config).await
173 }
174
175 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
177 match self.config.compression {
178 SnapshotCompression::None => Ok(data.to_vec()),
179 SnapshotCompression::Gzip => {
180 use flate2::write::GzEncoder;
181 use flate2::Compression;
182 use std::io::Write;
183
184 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
185 encoder.write_all(data).map_err(EventualiError::Io)?;
186 encoder.finish().map_err(EventualiError::Io)
187 }
188 SnapshotCompression::Lz4 => {
189 Ok(data.to_vec())
192 }
193 }
194 }
195
196 fn decompress_data(&self, data: &[u8], compression: &SnapshotCompression) -> Result<Vec<u8>> {
198 match compression {
199 SnapshotCompression::None => Ok(data.to_vec()),
200 SnapshotCompression::Gzip => {
201 use flate2::read::GzDecoder;
202 use std::io::Read;
203
204 let mut decoder = GzDecoder::new(data);
205 let mut decompressed = Vec::new();
206 decoder.read_to_end(&mut decompressed).map_err(EventualiError::Io)?;
207 Ok(decompressed)
208 }
209 SnapshotCompression::Lz4 => {
210 Ok(data.to_vec())
212 }
213 }
214 }
215
216 fn calculate_checksum(&self, data: &[u8]) -> String {
218 use sha2::{Sha256, Digest};
219 let mut hasher = Sha256::new();
220 hasher.update(data);
221 format!("{:x}", hasher.finalize())
222 }
223
224 pub async fn cleanup_old_snapshots(&self) -> Result<u64> {
226 self.store.cleanup_old_snapshots(&self.config).await
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_snapshot_compression_none() {
236 let config = SnapshotConfig {
237 compression: SnapshotCompression::None,
238 ..Default::default()
239 };
240
241 struct MockStore;
243 #[async_trait]
244 impl SnapshotStore for MockStore {
245 async fn save_snapshot(&self, _: AggregateSnapshot) -> Result<()> { Ok(()) }
246 async fn load_latest_snapshot(&self, _: &AggregateId) -> Result<Option<AggregateSnapshot>> { Ok(None) }
247 async fn load_snapshot(&self, _: Uuid) -> Result<Option<AggregateSnapshot>> { Ok(None) }
248 async fn list_snapshots(&self, _: &AggregateId) -> Result<Vec<AggregateSnapshot>> { Ok(vec![]) }
249 async fn delete_snapshot(&self, _: Uuid) -> Result<()> { Ok(()) }
250 async fn cleanup_old_snapshots(&self, _: &SnapshotConfig) -> Result<u64> { Ok(0) }
251 async fn should_take_snapshot(&self, _: &AggregateId, _: AggregateVersion, _: &SnapshotConfig) -> Result<bool> { Ok(false) }
252 }
253
254 let service = SnapshotService::new(MockStore, config);
255 let data = b"test data".to_vec();
256 let compressed = service.compress_data(&data).unwrap();
257
258 assert_eq!(compressed, data);
259 }
260
261 #[test]
262 fn test_snapshot_config_default() {
263 let config = SnapshotConfig::default();
264 assert_eq!(config.snapshot_frequency, 100);
265 assert_eq!(config.compression, SnapshotCompression::Gzip);
266 assert!(config.auto_cleanup);
267 }
268}