1use crate::error::Result;
2use crate::domain::entities::Event;
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Snapshot {
13 pub id: Uuid,
15
16 pub entity_id: String,
18
19 pub state: serde_json::Value,
21
22 pub created_at: DateTime<Utc>,
24
25 pub as_of: DateTime<Utc>,
27
28 pub event_count: usize,
30
31 pub metadata: SnapshotMetadata,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct SnapshotMetadata {
37 pub snapshot_type: SnapshotType,
39
40 pub size_bytes: usize,
42
43 pub version: u32,
45}
46
47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum SnapshotType {
50 Manual,
51 Automatic,
52 OnDemand,
53}
54
55impl Snapshot {
56 pub fn new(
58 entity_id: String,
59 state: serde_json::Value,
60 as_of: DateTime<Utc>,
61 event_count: usize,
62 snapshot_type: SnapshotType,
63 ) -> Self {
64 let state_json = serde_json::to_string(&state).unwrap_or_default();
65 let size_bytes = state_json.len();
66
67 Self {
68 id: Uuid::new_v4(),
69 entity_id,
70 state,
71 created_at: Utc::now(),
72 as_of,
73 event_count,
74 metadata: SnapshotMetadata {
75 snapshot_type,
76 size_bytes,
77 version: 1,
78 },
79 }
80 }
81
82 pub fn merge_with_events(&self, events: &[Event]) -> serde_json::Value {
84 let mut merged = self.state.clone();
85
86 for event in events {
87 if event.timestamp > self.as_of {
89 if let serde_json::Value::Object(ref mut state_map) = merged {
90 if let serde_json::Value::Object(ref payload_map) = event.payload {
91 for (key, value) in payload_map {
92 state_map.insert(key.clone(), value.clone());
93 }
94 }
95 }
96 }
97 }
98
99 merged
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct SnapshotConfig {
106 pub event_threshold: usize,
108
109 pub time_threshold_seconds: i64,
111
112 pub max_snapshots_per_entity: usize,
114
115 pub auto_snapshot: bool,
117}
118
119impl Default for SnapshotConfig {
120 fn default() -> Self {
121 Self {
122 event_threshold: 100,
123 time_threshold_seconds: 3600, max_snapshots_per_entity: 10,
125 auto_snapshot: true,
126 }
127 }
128}
129
130pub struct SnapshotManager {
132 snapshots: Arc<RwLock<HashMap<String, Vec<Snapshot>>>>,
134
135 config: SnapshotConfig,
137
138 stats: Arc<RwLock<SnapshotStats>>,
140}
141
142#[derive(Debug, Clone, Default, Serialize)]
143pub struct SnapshotStats {
144 pub total_snapshots: usize,
145 pub total_entities: usize,
146 pub total_size_bytes: usize,
147 pub snapshots_created: u64,
148 pub snapshots_pruned: u64,
149}
150
151impl SnapshotManager {
152 pub fn new(config: SnapshotConfig) -> Self {
154 Self {
155 snapshots: Arc::new(RwLock::new(HashMap::new())),
156 config,
157 stats: Arc::new(RwLock::new(SnapshotStats::default())),
158 }
159 }
160
161 pub fn create_snapshot(
163 &self,
164 entity_id: String,
165 state: serde_json::Value,
166 as_of: DateTime<Utc>,
167 event_count: usize,
168 snapshot_type: SnapshotType,
169 ) -> Result<Snapshot> {
170 let snapshot = Snapshot::new(entity_id.clone(), state, as_of, event_count, snapshot_type);
171
172 let mut snapshots = self.snapshots.write();
173 let entity_snapshots = snapshots.entry(entity_id.clone()).or_insert_with(Vec::new);
174
175 entity_snapshots.push(snapshot.clone());
177
178 entity_snapshots.sort_by(|a, b| b.as_of.cmp(&a.as_of));
180
181 let mut pruned = 0;
183 if entity_snapshots.len() > self.config.max_snapshots_per_entity {
184 let to_remove = entity_snapshots.len() - self.config.max_snapshots_per_entity;
185 entity_snapshots.truncate(self.config.max_snapshots_per_entity);
186 pruned = to_remove;
187 }
188
189 let mut stats = self.stats.write();
191 stats.snapshots_created += 1;
192 stats.snapshots_pruned += pruned as u64;
193 stats.total_snapshots = snapshots.values().map(|v| v.len()).sum();
194 stats.total_entities = snapshots.len();
195 stats.total_size_bytes = snapshots
196 .values()
197 .flatten()
198 .map(|s| s.metadata.size_bytes)
199 .sum();
200
201 tracing::info!(
202 "๐ธ Created {} snapshot for entity: {} (events: {}, size: {} bytes)",
203 match snapshot_type {
204 SnapshotType::Manual => "manual",
205 SnapshotType::Automatic => "automatic",
206 SnapshotType::OnDemand => "on-demand",
207 },
208 entity_id,
209 event_count,
210 snapshot.metadata.size_bytes
211 );
212
213 Ok(snapshot)
214 }
215
216 pub fn get_latest_snapshot(&self, entity_id: &str) -> Option<Snapshot> {
218 let snapshots = self.snapshots.read();
219 snapshots
220 .get(entity_id)
221 .and_then(|entity_snapshots| entity_snapshots.first().cloned())
222 }
223
224 pub fn get_snapshot_as_of(
226 &self,
227 entity_id: &str,
228 as_of: DateTime<Utc>,
229 ) -> Option<Snapshot> {
230 let snapshots = self.snapshots.read();
231 snapshots.get(entity_id).and_then(|entity_snapshots| {
232 entity_snapshots
233 .iter()
234 .filter(|s| s.as_of <= as_of)
235 .max_by_key(|s| s.as_of)
236 .cloned()
237 })
238 }
239
240 pub fn get_all_snapshots(&self, entity_id: &str) -> Vec<Snapshot> {
242 let snapshots = self.snapshots.read();
243 snapshots
244 .get(entity_id)
245 .map(|v| v.clone())
246 .unwrap_or_default()
247 }
248
249 pub fn should_create_snapshot(
251 &self,
252 entity_id: &str,
253 current_event_count: usize,
254 last_event_time: DateTime<Utc>,
255 ) -> bool {
256 if !self.config.auto_snapshot {
257 return false;
258 }
259
260 let snapshots = self.snapshots.read();
261 let entity_snapshots = snapshots.get(entity_id);
262
263 match entity_snapshots {
264 None => {
265 current_event_count >= self.config.event_threshold
267 }
268 Some(snaps) => {
269 if let Some(latest) = snaps.first() {
270 let events_since_snapshot = current_event_count - latest.event_count;
272 if events_since_snapshot >= self.config.event_threshold {
273 return true;
274 }
275
276 let time_since_snapshot = (last_event_time - latest.as_of).num_seconds();
278 if time_since_snapshot >= self.config.time_threshold_seconds {
279 return true;
280 }
281 }
282 false
283 }
284 }
285 }
286
287 pub fn delete_snapshots(&self, entity_id: &str) -> Result<usize> {
289 let mut snapshots = self.snapshots.write();
290 let removed = snapshots.remove(entity_id).map(|v| v.len()).unwrap_or(0);
291
292 let mut stats = self.stats.write();
294 stats.total_snapshots = stats.total_snapshots.saturating_sub(removed);
295 stats.total_entities = snapshots.len();
296
297 tracing::info!("๐๏ธ Deleted {} snapshots for entity: {}", removed, entity_id);
298
299 Ok(removed)
300 }
301
302 pub fn delete_snapshot(&self, entity_id: &str, snapshot_id: Uuid) -> Result<bool> {
304 let mut snapshots = self.snapshots.write();
305
306 if let Some(entity_snapshots) = snapshots.get_mut(entity_id) {
307 let initial_len = entity_snapshots.len();
308 entity_snapshots.retain(|s| s.id != snapshot_id);
309 let removed = initial_len != entity_snapshots.len();
310
311 if removed {
312 let mut stats = self.stats.write();
314 stats.total_snapshots = stats.total_snapshots.saturating_sub(1);
315 tracing::debug!("Deleted snapshot {} for entity {}", snapshot_id, entity_id);
316 }
317
318 return Ok(removed);
319 }
320
321 Ok(false)
322 }
323
324 pub fn stats(&self) -> SnapshotStats {
326 (*self.stats.read()).clone()
327 }
328
329 pub fn clear_all(&self) {
331 let mut snapshots = self.snapshots.write();
332 snapshots.clear();
333
334 let mut stats = self.stats.write();
335 *stats = SnapshotStats::default();
336
337 tracing::info!("๐งน Cleared all snapshots");
338 }
339
340 pub fn config(&self) -> &SnapshotConfig {
342 &self.config
343 }
344
345 pub fn list_entities(&self) -> Vec<String> {
347 let snapshots = self.snapshots.read();
348 snapshots.keys().cloned().collect()
349 }
350}
351
352#[derive(Debug, Deserialize)]
354pub struct CreateSnapshotRequest {
355 pub entity_id: String,
356}
357
358#[derive(Debug, Serialize)]
360pub struct CreateSnapshotResponse {
361 pub snapshot_id: Uuid,
362 pub entity_id: String,
363 pub created_at: DateTime<Utc>,
364 pub event_count: usize,
365 pub size_bytes: usize,
366}
367
368#[derive(Debug, Deserialize)]
370pub struct ListSnapshotsRequest {
371 pub entity_id: Option<String>,
372}
373
374#[derive(Debug, Serialize)]
376pub struct ListSnapshotsResponse {
377 pub snapshots: Vec<SnapshotInfo>,
378 pub total: usize,
379}
380
381#[derive(Debug, Serialize)]
382pub struct SnapshotInfo {
383 pub id: Uuid,
384 pub entity_id: String,
385 pub created_at: DateTime<Utc>,
386 pub as_of: DateTime<Utc>,
387 pub event_count: usize,
388 pub size_bytes: usize,
389 pub snapshot_type: SnapshotType,
390}
391
392impl From<Snapshot> for SnapshotInfo {
393 fn from(snapshot: Snapshot) -> Self {
394 Self {
395 id: snapshot.id,
396 entity_id: snapshot.entity_id,
397 created_at: snapshot.created_at,
398 as_of: snapshot.as_of,
399 event_count: snapshot.event_count,
400 size_bytes: snapshot.metadata.size_bytes,
401 snapshot_type: snapshot.metadata.snapshot_type,
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use serde_json::json;
410
411 fn create_test_snapshot(entity_id: &str, event_count: usize) -> Snapshot {
412 Snapshot::new(
413 entity_id.to_string(),
414 json!({"count": event_count}),
415 Utc::now(),
416 event_count,
417 SnapshotType::Automatic,
418 )
419 }
420
421 #[test]
422 fn test_snapshot_creation() {
423 let snapshot = create_test_snapshot("entity-1", 100);
424 assert_eq!(snapshot.entity_id, "entity-1");
425 assert_eq!(snapshot.event_count, 100);
426 assert_eq!(snapshot.metadata.snapshot_type, SnapshotType::Automatic);
427 }
428
429 #[test]
430 fn test_snapshot_manager() {
431 let manager = SnapshotManager::new(SnapshotConfig::default());
432
433 let result = manager.create_snapshot(
434 "entity-1".to_string(),
435 json!({"value": 42}),
436 Utc::now(),
437 100,
438 SnapshotType::Manual,
439 );
440
441 assert!(result.is_ok());
442
443 let latest = manager.get_latest_snapshot("entity-1");
444 assert!(latest.is_some());
445 assert_eq!(latest.unwrap().event_count, 100);
446 }
447
448 #[test]
449 fn test_snapshot_pruning() {
450 let config = SnapshotConfig {
451 max_snapshots_per_entity: 3,
452 ..Default::default()
453 };
454 let manager = SnapshotManager::new(config);
455
456 for i in 0..5 {
458 manager
459 .create_snapshot(
460 "entity-1".to_string(),
461 json!({"count": i}),
462 Utc::now(),
463 i,
464 SnapshotType::Automatic,
465 )
466 .unwrap();
467 }
468
469 let snapshots = manager.get_all_snapshots("entity-1");
471 assert_eq!(snapshots.len(), 3);
472 }
473
474 #[test]
475 fn test_should_create_snapshot() {
476 let config = SnapshotConfig {
477 event_threshold: 100,
478 time_threshold_seconds: 3600,
479 auto_snapshot: true,
480 ..Default::default()
481 };
482 let manager = SnapshotManager::new(config);
483
484 assert!(!manager.should_create_snapshot("entity-1", 50, Utc::now()));
486
487 assert!(manager.should_create_snapshot("entity-1", 100, Utc::now()));
489
490 manager
492 .create_snapshot(
493 "entity-1".to_string(),
494 json!({"value": 1}),
495 Utc::now(),
496 100,
497 SnapshotType::Automatic,
498 )
499 .unwrap();
500
501 assert!(!manager.should_create_snapshot("entity-1", 150, Utc::now()));
503
504 assert!(manager.should_create_snapshot("entity-1", 200, Utc::now()));
506 }
507
508 #[test]
509 fn test_merge_with_events() {
510 let snapshot = Snapshot::new(
511 "entity-1".to_string(),
512 json!({"name": "Alice", "score": 10}),
513 Utc::now(),
514 5,
515 SnapshotType::Automatic,
516 );
517
518 let event = Event::reconstruct_from_strings(
519 Uuid::new_v4(),
520 "score.updated".to_string(),
521 "entity-1".to_string(),
522 "default".to_string(),
523 json!({"score": 20}),
524 Utc::now(),
525 None,
526 1,
527 );
528
529 let merged = snapshot.merge_with_events(&[event]);
530 assert_eq!(merged["name"], "Alice");
531 assert_eq!(merged["score"], 20);
532 }
533}