nodedb_crdt/state/snapshot.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Snapshot export/import, history compaction, memory estimation.
4
5use loro::LoroDoc;
6
7use crate::error::{CrdtError, Result};
8
9use super::core::CrdtState;
10
11impl CrdtState {
12 /// Export the current state as bytes for sync.
13 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
14 self.doc
15 .export(loro::ExportMode::Snapshot)
16 .map_err(|e| CrdtError::Loro(format!("snapshot export failed: {e}")))
17 }
18
19 /// Import remote updates.
20 pub fn import(&self, data: &[u8]) -> Result<()> {
21 self.doc
22 .import(data)
23 .map_err(|e| CrdtError::DeltaApplyFailed(e.to_string()))?;
24 Ok(())
25 }
26
27 /// Compact the CRDT history by replacing the internal LoroDoc with a
28 /// shallow snapshot.
29 ///
30 /// A shallow snapshot contains the current state but discards the
31 /// full operation history. This is the CRDT equivalent of WAL
32 /// truncation after checkpoint.
33 ///
34 /// After compaction:
35 /// - All current state is preserved (reads return same values).
36 /// - New deltas can still be applied and merged.
37 /// - Historical operations before the snapshot point are gone.
38 /// - Peers that sync after compaction receive a full snapshot
39 /// instead of incremental deltas (acceptable for long-offline peers).
40 ///
41 /// Call this periodically (e.g., every 30 minutes or when memory
42 /// pressure exceeds threshold) to prevent unbounded history growth.
43 pub fn compact_history(&mut self) -> Result<()> {
44 // Export a shallow snapshot at the current frontiers.
45 let frontiers = self.doc.oplog_frontiers();
46 let snapshot = self
47 .doc
48 .export(loro::ExportMode::shallow_snapshot(&frontiers))
49 .map_err(|e| CrdtError::Loro(format!("shallow snapshot export: {e}")))?;
50
51 // Replace the doc with a fresh one loaded from the snapshot.
52 let new_doc = LoroDoc::new();
53 new_doc
54 .set_peer_id(self.peer_id)
55 .map_err(|e| CrdtError::Loro(format!("failed to set peer_id on compacted doc: {e}")))?;
56 new_doc
57 .import(&snapshot)
58 .map_err(|e| CrdtError::Loro(format!("shallow snapshot import: {e}")))?;
59
60 self.doc = new_doc;
61 Ok(())
62 }
63
64 /// Estimated memory usage of the CRDT state (bytes).
65 ///
66 /// Includes operation history, current state, and internal caches.
67 /// Use this to decide when to trigger `compact_history()`.
68 pub fn estimated_memory_bytes(&self) -> usize {
69 // Loro doesn't expose a direct memory metric.
70 // Use snapshot size as a proxy — it's proportional to state size.
71 // This is not precise but good enough for pressure monitoring.
72 self.doc
73 .export(loro::ExportMode::Snapshot)
74 .map(|s| s.len())
75 .unwrap_or(0)
76 }
77}