nodedb_crdt/state/
history.rs1use loro::{LoroDoc, LoroValue, ValueOrContainer};
6
7use crate::error::{CrdtError, Result};
8
9use super::core::CrdtState;
10
11impl CrdtState {
12 pub fn oplog_version_vector(&self) -> loro::VersionVector {
14 self.doc.oplog_vv()
15 }
16
17 pub fn read_at_version(
24 &self,
25 collection: &str,
26 row_id: &str,
27 version: &loro::VersionVector,
28 ) -> Result<Option<LoroValue>> {
29 let frontiers = self.doc.vv_to_frontiers(version);
30 let forked = self.doc.fork_at(&frontiers);
31
32 let coll = forked.get_map(collection);
33 match coll.get(row_id) {
34 Some(ValueOrContainer::Container(loro::Container::Map(m))) => Ok(Some(m.get_value())),
35 Some(ValueOrContainer::Container(loro::Container::List(l))) => Ok(Some(l.get_value())),
36 Some(ValueOrContainer::Value(v)) => Ok(Some(v)),
37 Some(ValueOrContainer::Container(_)) => Ok(Some(LoroValue::Null)),
38 None => Ok(None),
39 }
40 }
41
42 pub fn export_updates_since(&self, from_version: &loro::VersionVector) -> Result<Vec<u8>> {
47 self.doc
48 .export(loro::ExportMode::updates(from_version))
49 .map_err(|e| CrdtError::Loro(format!("delta export: {e}")))
50 }
51
52 pub fn compact_at_version(&mut self, version: &loro::VersionVector) -> Result<()> {
57 let frontiers = self.doc.vv_to_frontiers(version);
58 let snapshot = self
59 .doc
60 .export(loro::ExportMode::shallow_snapshot(&frontiers))
61 .map_err(|e| CrdtError::Loro(format!("shallow snapshot export: {e}")))?;
62
63 let new_doc = LoroDoc::new();
64 new_doc
65 .set_peer_id(self.peer_id)
66 .map_err(|e| CrdtError::Loro(format!("set peer_id on compacted doc: {e}")))?;
67 new_doc
68 .import(&snapshot)
69 .map_err(|e| CrdtError::Loro(format!("shallow snapshot import: {e}")))?;
70
71 self.doc = new_doc;
72 Ok(())
73 }
74
75 pub fn restore_to_version(
83 &self,
84 collection: &str,
85 row_id: &str,
86 version: &loro::VersionVector,
87 ) -> Result<Vec<u8>> {
88 let historical = self
89 .read_at_version(collection, row_id, version)?
90 .ok_or_else(|| CrdtError::Loro("document did not exist at target version".into()))?;
91
92 let vv_before = self.doc.oplog_vv();
93
94 let fields: Vec<(&str, LoroValue)> = match &historical {
95 LoroValue::Map(map) => map.iter().map(|(k, v)| (k.as_ref(), v.clone())).collect(),
96 _ => return Err(CrdtError::Loro("historical state is not a map".into())),
97 };
98 self.upsert(collection, row_id, &fields)?;
99
100 self.doc
101 .export(loro::ExportMode::updates(&vv_before))
102 .map_err(|e| CrdtError::Loro(format!("restore delta export: {e}")))
103 }
104}