Skip to main content

nodedb_crdt/state/
bitemporal_archive.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Bitemporal version archive for CRDT collections.
4//!
5//! Physically preserves **superseded** versions of a row before it is
6//! overwritten, so `FOR SYSTEM_TIME AS OF` reads and cross-peer
7//! divergence reconstruction see the full history — not just the
8//! latest value Loro's `insert_container` would leave behind.
9//!
10//! Storage layout: a single sibling `LoroMap` at root key
11//! `__bitemporal_history__`. Archive keys are flat strings of the form
12//! `{collection}\0{row_id}\0{sys_ms:020}` so lexicographic ordering
13//! matches numeric `_ts_system` ordering, and prefix scans bound the
14//! set of versions for one logical row.
15//!
16//! Retention: [`purge_history_before`] deletes archived entries with
17//! `sys_ms < cutoff_ms` across all row ids in the collection. The
18//! *current* (live) row is never touched — only archive entries —
19//! so `AS OF now()` reads always see the latest version regardless of
20//! retention configuration.
21
22use loro::{LoroMap, LoroValue, ValueOrContainer};
23
24use super::core::CrdtState;
25use crate::error::{CrdtError, Result};
26
27/// Root Loro map holding archived versions for every bitemporal
28/// collection. Single top-level sibling keeps the doc shape stable —
29/// adding a bitemporal collection does not rewrite existing structure.
30pub const HISTORY_ROOT: &str = "__bitemporal_history__";
31
32/// Format the flat archive key. `sys_ms` is zero-padded so that keys
33/// sort lexicographically in `_ts_system` order. NUL separators avoid
34/// collisions with user-supplied `/` or `:` inside collection or row
35/// identifiers.
36pub fn archive_key(collection: &str, row_id: &str, sys_ms: i64) -> String {
37    format!("{collection}\u{0}{row_id}\u{0}{sys_ms:020}")
38}
39
40/// Parse an archive key; returns `None` when the key does not have the
41/// three NUL-separated segments produced by [`archive_key`].
42fn parse_archive_key(key: &str) -> Option<(&str, &str, i64)> {
43    let mut parts = key.splitn(3, '\u{0}');
44    let collection = parts.next()?;
45    let row_id = parts.next()?;
46    let sys_ms = parts.next()?.parse::<i64>().ok()?;
47    Some((collection, row_id, sys_ms))
48}
49
50impl CrdtState {
51    /// Upsert a row with prior-version archiving for bitemporal
52    /// collections. When a row with `row_id` already exists and carries
53    /// a finite `_ts_system`, its field map is copied into the
54    /// bitemporal history sibling before the new fields overwrite the
55    /// live row. Non-bitemporal callers should stay on
56    /// [`CrdtState::upsert`] — the archive has a non-trivial doc-size
57    /// cost and is only meaningful when the collection participates in
58    /// `AS OF` / audit queries.
59    pub fn upsert_versioned(
60        &self,
61        collection: &str,
62        row_id: &str,
63        fields: &[(&str, LoroValue)],
64    ) -> Result<()> {
65        if let Some((prior_sys_ms, prior_fields)) = self.prior_system_snapshot(collection, row_id) {
66            let archive = self.doc.get_map(HISTORY_ROOT);
67            let key = archive_key(collection, row_id, prior_sys_ms);
68            let slot = archive
69                .insert_container(&key, LoroMap::new())
70                .map_err(|e| CrdtError::Loro(format!("archive insert: {e}")))?;
71            for (k, v) in &prior_fields {
72                slot.insert(k.as_str(), v.clone())
73                    .map_err(|e| CrdtError::Loro(format!("archive field: {e}")))?;
74            }
75        }
76        self.upsert(collection, row_id, fields)
77    }
78
79    /// Read the row as it was at `asof_ms` (system-time). Scans the
80    /// archive for the highest `sys_ms <= asof_ms`; falls back to the
81    /// current row when its `_ts_system <= asof_ms`; returns `None`
82    /// when no version existed at or before the requested time.
83    pub fn read_row_as_of(
84        &self,
85        collection: &str,
86        row_id: &str,
87        asof_ms: i64,
88    ) -> Option<LoroValue> {
89        let archive = self.doc.get_map(HISTORY_ROOT);
90        let mut best: Option<(i64, LoroValue)> = None;
91
92        for key in archive.keys() {
93            let key_str = key.to_string();
94            let (c, r, ts) = match parse_archive_key(&key_str) {
95                Some(t) => t,
96                None => continue,
97            };
98            if c != collection || r != row_id || ts > asof_ms {
99                continue;
100            }
101            if let Some(ValueOrContainer::Container(loro::Container::Map(m))) =
102                archive.get(&key_str)
103                && best.as_ref().is_none_or(|(b, _)| ts > *b)
104            {
105                best = Some((ts, m.get_value()));
106            }
107        }
108
109        if let Some(LoroValue::Map(current_map)) = self.read_row(collection, row_id)
110            && let Some(&LoroValue::I64(cur_ts)) = current_map.get("_ts_system")
111            && cur_ts <= asof_ms
112            && best.as_ref().is_none_or(|(b, _)| cur_ts > *b)
113        {
114            return Some(LoroValue::Map(current_map));
115        }
116
117        best.map(|(_, v)| v)
118    }
119
120    /// Count archived versions for a row (live row excluded).
121    /// Primarily for tests and operational introspection.
122    pub fn archive_version_count(&self, collection: &str, row_id: &str) -> usize {
123        let archive = self.doc.get_map(HISTORY_ROOT);
124        archive
125            .keys()
126            .filter(|k| {
127                parse_archive_key(k).is_some_and(|(c, r, _)| c == collection && r == row_id)
128            })
129            .count()
130    }
131
132    /// Drop archived versions with `sys_ms < cutoff_ms` for the given
133    /// collection. Returns the number of archive entries deleted. The
134    /// live row is never touched — retention only reclaims history, so
135    /// the current state of every logical row remains readable even
136    /// when the entire archive is pruned.
137    pub fn purge_history_before(&self, collection: &str, cutoff_ms: i64) -> Result<usize> {
138        let archive = self.doc.get_map(HISTORY_ROOT);
139        let victims: Vec<String> = archive
140            .keys()
141            .filter_map(|k| {
142                let ks = k.to_string();
143                let matches = parse_archive_key(&ks)
144                    .is_some_and(|(c, _, ts)| c == collection && ts < cutoff_ms);
145                matches.then_some(ks)
146            })
147            .collect();
148        let count = victims.len();
149        for key in victims {
150            archive
151                .delete(&key)
152                .map_err(|e| CrdtError::Loro(format!("archive delete: {e}")))?;
153        }
154        Ok(count)
155    }
156
157    /// Read the live row's (`_ts_system`, field-map) pair when both the
158    /// row exists and carries a finite system stamp. Used exclusively
159    /// by `upsert_versioned` to decide whether to archive the prior
160    /// version; a row without `_ts_system` is treated as non-archivable
161    /// (typically the first insert into a bitemporal collection or a
162    /// row that predates the bitemporal flag on the collection).
163    fn prior_system_snapshot(
164        &self,
165        collection: &str,
166        row_id: &str,
167    ) -> Option<(i64, Vec<(String, LoroValue)>)> {
168        let current = match self.read_row(collection, row_id)? {
169            LoroValue::Map(m) => m,
170            _ => return None,
171        };
172        let sys_ms = match current.get("_ts_system")? {
173            LoroValue::I64(n) => *n,
174            _ => return None,
175        };
176        let fields: Vec<(String, LoroValue)> = current
177            .iter()
178            .map(|(k, v)| (k.to_string(), v.clone()))
179            .collect();
180        Some((sys_ms, fields))
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    fn ts(ms: i64) -> LoroValue {
189        LoroValue::I64(ms)
190    }
191
192    fn string(s: &str) -> LoroValue {
193        LoroValue::String(s.into())
194    }
195
196    fn make_state() -> CrdtState {
197        CrdtState::new(1).unwrap()
198    }
199
200    #[test]
201    fn first_upsert_without_prior_ts_does_not_archive() {
202        let s = make_state();
203        s.upsert_versioned("users", "u1", &[("name", string("alice"))])
204            .unwrap();
205        assert_eq!(s.archive_version_count("users", "u1"), 0);
206    }
207
208    #[test]
209    fn upsert_versioned_archives_prior_with_ts_system() {
210        let s = make_state();
211        s.upsert_versioned(
212            "users",
213            "u1",
214            &[("name", string("alice")), ("_ts_system", ts(100))],
215        )
216        .unwrap();
217        s.upsert_versioned(
218            "users",
219            "u1",
220            &[("name", string("alice2")), ("_ts_system", ts(200))],
221        )
222        .unwrap();
223        s.upsert_versioned(
224            "users",
225            "u1",
226            &[("name", string("alice3")), ("_ts_system", ts(300))],
227        )
228        .unwrap();
229        assert_eq!(s.archive_version_count("users", "u1"), 2);
230    }
231
232    #[test]
233    fn read_row_as_of_returns_historical_version() {
234        let s = make_state();
235        s.upsert_versioned(
236            "users",
237            "u1",
238            &[("name", string("v1")), ("_ts_system", ts(100))],
239        )
240        .unwrap();
241        s.upsert_versioned(
242            "users",
243            "u1",
244            &[("name", string("v2")), ("_ts_system", ts(200))],
245        )
246        .unwrap();
247        s.upsert_versioned(
248            "users",
249            "u1",
250            &[("name", string("v3")), ("_ts_system", ts(300))],
251        )
252        .unwrap();
253
254        let at_150 = s.read_row_as_of("users", "u1", 150).unwrap();
255        if let LoroValue::Map(m) = at_150 {
256            assert_eq!(m.get("name").unwrap(), &string("v1"));
257        } else {
258            panic!("expected map");
259        }
260        let at_250 = s.read_row_as_of("users", "u1", 250).unwrap();
261        if let LoroValue::Map(m) = at_250 {
262            assert_eq!(m.get("name").unwrap(), &string("v2"));
263        } else {
264            panic!("expected map");
265        }
266        let at_999 = s.read_row_as_of("users", "u1", 999).unwrap();
267        if let LoroValue::Map(m) = at_999 {
268            assert_eq!(m.get("name").unwrap(), &string("v3"));
269        } else {
270            panic!("expected map");
271        }
272    }
273
274    #[test]
275    fn read_row_as_of_returns_none_before_first_version() {
276        let s = make_state();
277        s.upsert_versioned(
278            "users",
279            "u1",
280            &[("name", string("v1")), ("_ts_system", ts(100))],
281        )
282        .unwrap();
283        assert!(s.read_row_as_of("users", "u1", 50).is_none());
284    }
285
286    #[test]
287    fn purge_history_before_drops_superseded_versions() {
288        let s = make_state();
289        for (name, t) in [("v1", 100), ("v2", 200), ("v3", 300), ("v4", 400)] {
290            s.upsert_versioned(
291                "users",
292                "u1",
293                &[("name", string(name)), ("_ts_system", ts(t))],
294            )
295            .unwrap();
296        }
297        // Three archived: 100, 200, 300. Latest (400) is live.
298        assert_eq!(s.archive_version_count("users", "u1"), 3);
299        let dropped = s.purge_history_before("users", 250).unwrap();
300        assert_eq!(dropped, 2); // 100 and 200
301        assert_eq!(s.archive_version_count("users", "u1"), 1); // 300 remains
302        // Live row still intact.
303        let live = s.read_row("users", "u1").unwrap();
304        if let LoroValue::Map(m) = live {
305            assert_eq!(m.get("name").unwrap(), &string("v4"));
306        } else {
307            panic!("expected map");
308        }
309    }
310
311    #[test]
312    fn purge_history_before_never_drops_live_row() {
313        let s = make_state();
314        s.upsert_versioned(
315            "users",
316            "u1",
317            &[("name", string("only")), ("_ts_system", ts(100))],
318        )
319        .unwrap();
320        // No prior to archive on first write.
321        let dropped = s.purge_history_before("users", i64::MAX).unwrap();
322        assert_eq!(dropped, 0);
323        let live = s.read_row("users", "u1").unwrap();
324        if let LoroValue::Map(m) = live {
325            assert_eq!(m.get("name").unwrap(), &string("only"));
326        } else {
327            panic!("expected map");
328        }
329    }
330
331    #[test]
332    fn purge_scoped_to_collection() {
333        let s = make_state();
334        for coll in ["users", "orders"] {
335            for (name, t) in [("v1", 100), ("v2", 200)] {
336                s.upsert_versioned(coll, "row", &[("v", string(name)), ("_ts_system", ts(t))])
337                    .unwrap();
338            }
339        }
340        assert_eq!(s.archive_version_count("users", "row"), 1);
341        assert_eq!(s.archive_version_count("orders", "row"), 1);
342        let dropped = s.purge_history_before("users", i64::MAX).unwrap();
343        assert_eq!(dropped, 1);
344        assert_eq!(s.archive_version_count("users", "row"), 0);
345        assert_eq!(s.archive_version_count("orders", "row"), 1); // untouched
346    }
347}