Skip to main content

nodedb_crdt/state/
history.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Version-history operations: version vectors, time-travel reads, targeted compaction, restore.
4
5use loro::{LoroDoc, LoroValue, ValueOrContainer};
6
7use crate::error::{CrdtError, Result};
8
9use super::core::CrdtState;
10
11impl CrdtState {
12    /// Get the current oplog version vector.
13    pub fn oplog_version_vector(&self) -> loro::VersionVector {
14        self.doc.oplog_vv()
15    }
16
17    /// Read the document state at a historical version.
18    ///
19    /// Uses `fork_at` to create a lightweight copy at the target version
20    /// and reads the specified row. Returns `None` if the row didn't exist.
21    ///
22    /// Cost: O(oplog_size) for the fork — not for hot-path queries.
23    pub fn read_at_version(
24        &self,
25        collection: &str,
26        row_id: &str,
27        version: &loro::VersionVector,
28    ) -> Result<Option<LoroValue>> {
29        let frontiers = self.doc.vv_to_frontiers(version);
30        let forked = self.doc.fork_at(&frontiers);
31
32        let coll = forked.get_map(collection);
33        match coll.get(row_id) {
34            Some(ValueOrContainer::Container(loro::Container::Map(m))) => Ok(Some(m.get_value())),
35            Some(ValueOrContainer::Container(loro::Container::List(l))) => Ok(Some(l.get_value())),
36            Some(ValueOrContainer::Value(v)) => Ok(Some(v)),
37            Some(ValueOrContainer::Container(_)) => Ok(Some(LoroValue::Null)),
38            None => Ok(None),
39        }
40    }
41
42    /// Export the oplog delta from a version to the current state.
43    ///
44    /// Returns the operations that transform `from_version` into current state.
45    /// Used for DIFF rendering and delta sync.
46    pub fn export_updates_since(&self, from_version: &loro::VersionVector) -> Result<Vec<u8>> {
47        self.doc
48            .export(loro::ExportMode::updates(from_version))
49            .map_err(|e| CrdtError::Loro(format!("delta export: {e}")))
50    }
51
52    /// Compact history at a specific version (not just current frontiers).
53    ///
54    /// Discards oplog entries before the target version. Current state and
55    /// all versions after the target are preserved.
56    pub fn compact_at_version(&mut self, version: &loro::VersionVector) -> Result<()> {
57        let frontiers = self.doc.vv_to_frontiers(version);
58        let snapshot = self
59            .doc
60            .export(loro::ExportMode::shallow_snapshot(&frontiers))
61            .map_err(|e| CrdtError::Loro(format!("shallow snapshot export: {e}")))?;
62
63        let new_doc = LoroDoc::new();
64        new_doc
65            .set_peer_id(self.peer_id)
66            .map_err(|e| CrdtError::Loro(format!("set peer_id on compacted doc: {e}")))?;
67        new_doc
68            .import(&snapshot)
69            .map_err(|e| CrdtError::Loro(format!("shallow snapshot import: {e}")))?;
70
71        self.doc = new_doc;
72        Ok(())
73    }
74
75    /// Restore a document to a historical version by creating a forward delta.
76    ///
77    /// Reads the state at the target version, then generates a new mutation
78    /// that sets the current state to match the historical state. History is
79    /// preserved — this is a forward operation, not a rollback.
80    ///
81    /// Returns the delta bytes to be applied through the normal write path.
82    pub fn restore_to_version(
83        &self,
84        collection: &str,
85        row_id: &str,
86        version: &loro::VersionVector,
87    ) -> Result<Vec<u8>> {
88        let historical = self
89            .read_at_version(collection, row_id, version)?
90            .ok_or_else(|| CrdtError::Loro("document did not exist at target version".into()))?;
91
92        let vv_before = self.doc.oplog_vv();
93
94        let fields: Vec<(&str, LoroValue)> = match &historical {
95            LoroValue::Map(map) => map.iter().map(|(k, v)| (k.as_ref(), v.clone())).collect(),
96            _ => return Err(CrdtError::Loro("historical state is not a map".into())),
97        };
98        self.upsert(collection, row_id, &fields)?;
99
100        self.doc
101            .export(loro::ExportMode::updates(&vv_before))
102            .map_err(|e| CrdtError::Loro(format!("restore delta export: {e}")))
103    }
104}