Skip to main content

nodedb_crdt/state/
snapshot.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Snapshot export/import, history compaction, memory estimation.
4
5use loro::LoroDoc;
6
7use crate::error::{CrdtError, Result};
8
9use super::core::CrdtState;
10
11impl CrdtState {
12    /// Export the current state as bytes for sync.
13    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
14        self.doc
15            .export(loro::ExportMode::Snapshot)
16            .map_err(|e| CrdtError::Loro(format!("snapshot export failed: {e}")))
17    }
18
19    /// Import remote updates.
20    pub fn import(&self, data: &[u8]) -> Result<()> {
21        self.doc
22            .import(data)
23            .map_err(|e| CrdtError::DeltaApplyFailed(e.to_string()))?;
24        Ok(())
25    }
26
27    /// Compact the CRDT history by replacing the internal LoroDoc with a
28    /// shallow snapshot.
29    ///
30    /// A shallow snapshot contains the current state but discards the
31    /// full operation history. This is the CRDT equivalent of WAL
32    /// truncation after checkpoint.
33    ///
34    /// After compaction:
35    /// - All current state is preserved (reads return same values).
36    /// - New deltas can still be applied and merged.
37    /// - Historical operations before the snapshot point are gone.
38    /// - Peers that sync after compaction receive a full snapshot
39    ///   instead of incremental deltas (acceptable for long-offline peers).
40    ///
41    /// Call this periodically (e.g., every 30 minutes or when memory
42    /// pressure exceeds threshold) to prevent unbounded history growth.
43    pub fn compact_history(&mut self) -> Result<()> {
44        // Export a shallow snapshot at the current frontiers.
45        let frontiers = self.doc.oplog_frontiers();
46        let snapshot = self
47            .doc
48            .export(loro::ExportMode::shallow_snapshot(&frontiers))
49            .map_err(|e| CrdtError::Loro(format!("shallow snapshot export: {e}")))?;
50
51        // Replace the doc with a fresh one loaded from the snapshot.
52        let new_doc = LoroDoc::new();
53        new_doc
54            .set_peer_id(self.peer_id)
55            .map_err(|e| CrdtError::Loro(format!("failed to set peer_id on compacted doc: {e}")))?;
56        new_doc
57            .import(&snapshot)
58            .map_err(|e| CrdtError::Loro(format!("shallow snapshot import: {e}")))?;
59
60        self.doc = new_doc;
61        Ok(())
62    }
63
64    /// Estimated memory usage of the CRDT state (bytes).
65    ///
66    /// Includes operation history, current state, and internal caches.
67    /// Use this to decide when to trigger `compact_history()`.
68    pub fn estimated_memory_bytes(&self) -> usize {
69        // Loro doesn't expose a direct memory metric.
70        // Use snapshot size as a proxy — it's proportional to state size.
71        // This is not precise but good enough for pressure monitoring.
72        self.doc
73            .export(loro::ExportMode::Snapshot)
74            .map(|s| s.len())
75            .unwrap_or(0)
76    }
77}