1use loro::{LoroMap, LoroValue, ValueOrContainer};
23
24use super::core::CrdtState;
25use crate::error::{CrdtError, Result};
26
27pub const HISTORY_ROOT: &str = "__bitemporal_history__";
31
32pub fn archive_key(collection: &str, row_id: &str, sys_ms: i64) -> String {
37 format!("{collection}\u{0}{row_id}\u{0}{sys_ms:020}")
38}
39
40fn parse_archive_key(key: &str) -> Option<(&str, &str, i64)> {
43 let mut parts = key.splitn(3, '\u{0}');
44 let collection = parts.next()?;
45 let row_id = parts.next()?;
46 let sys_ms = parts.next()?.parse::<i64>().ok()?;
47 Some((collection, row_id, sys_ms))
48}
49
50impl CrdtState {
51 pub fn upsert_versioned(
60 &self,
61 collection: &str,
62 row_id: &str,
63 fields: &[(&str, LoroValue)],
64 ) -> Result<()> {
65 if let Some((prior_sys_ms, prior_fields)) = self.prior_system_snapshot(collection, row_id) {
66 let archive = self.doc.get_map(HISTORY_ROOT);
67 let key = archive_key(collection, row_id, prior_sys_ms);
68 let slot = archive
69 .insert_container(&key, LoroMap::new())
70 .map_err(|e| CrdtError::Loro(format!("archive insert: {e}")))?;
71 for (k, v) in &prior_fields {
72 slot.insert(k.as_str(), v.clone())
73 .map_err(|e| CrdtError::Loro(format!("archive field: {e}")))?;
74 }
75 }
76 self.upsert(collection, row_id, fields)
77 }
78
79 pub fn read_row_as_of(
84 &self,
85 collection: &str,
86 row_id: &str,
87 asof_ms: i64,
88 ) -> Option<LoroValue> {
89 let archive = self.doc.get_map(HISTORY_ROOT);
90 let mut best: Option<(i64, LoroValue)> = None;
91
92 for key in archive.keys() {
93 let key_str = key.to_string();
94 let (c, r, ts) = match parse_archive_key(&key_str) {
95 Some(t) => t,
96 None => continue,
97 };
98 if c != collection || r != row_id || ts > asof_ms {
99 continue;
100 }
101 if let Some(ValueOrContainer::Container(loro::Container::Map(m))) =
102 archive.get(&key_str)
103 && best.as_ref().is_none_or(|(b, _)| ts > *b)
104 {
105 best = Some((ts, m.get_value()));
106 }
107 }
108
109 if let Some(LoroValue::Map(current_map)) = self.read_row(collection, row_id)
110 && let Some(&LoroValue::I64(cur_ts)) = current_map.get("_ts_system")
111 && cur_ts <= asof_ms
112 && best.as_ref().is_none_or(|(b, _)| cur_ts > *b)
113 {
114 return Some(LoroValue::Map(current_map));
115 }
116
117 best.map(|(_, v)| v)
118 }
119
120 pub fn archive_version_count(&self, collection: &str, row_id: &str) -> usize {
123 let archive = self.doc.get_map(HISTORY_ROOT);
124 archive
125 .keys()
126 .filter(|k| {
127 parse_archive_key(k).is_some_and(|(c, r, _)| c == collection && r == row_id)
128 })
129 .count()
130 }
131
132 pub fn purge_history_before(&self, collection: &str, cutoff_ms: i64) -> Result<usize> {
138 let archive = self.doc.get_map(HISTORY_ROOT);
139 let victims: Vec<String> = archive
140 .keys()
141 .filter_map(|k| {
142 let ks = k.to_string();
143 let matches = parse_archive_key(&ks)
144 .is_some_and(|(c, _, ts)| c == collection && ts < cutoff_ms);
145 matches.then_some(ks)
146 })
147 .collect();
148 let count = victims.len();
149 for key in victims {
150 archive
151 .delete(&key)
152 .map_err(|e| CrdtError::Loro(format!("archive delete: {e}")))?;
153 }
154 Ok(count)
155 }
156
157 fn prior_system_snapshot(
164 &self,
165 collection: &str,
166 row_id: &str,
167 ) -> Option<(i64, Vec<(String, LoroValue)>)> {
168 let current = match self.read_row(collection, row_id)? {
169 LoroValue::Map(m) => m,
170 _ => return None,
171 };
172 let sys_ms = match current.get("_ts_system")? {
173 LoroValue::I64(n) => *n,
174 _ => return None,
175 };
176 let fields: Vec<(String, LoroValue)> = current
177 .iter()
178 .map(|(k, v)| (k.to_string(), v.clone()))
179 .collect();
180 Some((sys_ms, fields))
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 fn ts(ms: i64) -> LoroValue {
189 LoroValue::I64(ms)
190 }
191
192 fn string(s: &str) -> LoroValue {
193 LoroValue::String(s.into())
194 }
195
196 fn make_state() -> CrdtState {
197 CrdtState::new(1).unwrap()
198 }
199
200 #[test]
201 fn first_upsert_without_prior_ts_does_not_archive() {
202 let s = make_state();
203 s.upsert_versioned("users", "u1", &[("name", string("alice"))])
204 .unwrap();
205 assert_eq!(s.archive_version_count("users", "u1"), 0);
206 }
207
208 #[test]
209 fn upsert_versioned_archives_prior_with_ts_system() {
210 let s = make_state();
211 s.upsert_versioned(
212 "users",
213 "u1",
214 &[("name", string("alice")), ("_ts_system", ts(100))],
215 )
216 .unwrap();
217 s.upsert_versioned(
218 "users",
219 "u1",
220 &[("name", string("alice2")), ("_ts_system", ts(200))],
221 )
222 .unwrap();
223 s.upsert_versioned(
224 "users",
225 "u1",
226 &[("name", string("alice3")), ("_ts_system", ts(300))],
227 )
228 .unwrap();
229 assert_eq!(s.archive_version_count("users", "u1"), 2);
230 }
231
232 #[test]
233 fn read_row_as_of_returns_historical_version() {
234 let s = make_state();
235 s.upsert_versioned(
236 "users",
237 "u1",
238 &[("name", string("v1")), ("_ts_system", ts(100))],
239 )
240 .unwrap();
241 s.upsert_versioned(
242 "users",
243 "u1",
244 &[("name", string("v2")), ("_ts_system", ts(200))],
245 )
246 .unwrap();
247 s.upsert_versioned(
248 "users",
249 "u1",
250 &[("name", string("v3")), ("_ts_system", ts(300))],
251 )
252 .unwrap();
253
254 let at_150 = s.read_row_as_of("users", "u1", 150).unwrap();
255 if let LoroValue::Map(m) = at_150 {
256 assert_eq!(m.get("name").unwrap(), &string("v1"));
257 } else {
258 panic!("expected map");
259 }
260 let at_250 = s.read_row_as_of("users", "u1", 250).unwrap();
261 if let LoroValue::Map(m) = at_250 {
262 assert_eq!(m.get("name").unwrap(), &string("v2"));
263 } else {
264 panic!("expected map");
265 }
266 let at_999 = s.read_row_as_of("users", "u1", 999).unwrap();
267 if let LoroValue::Map(m) = at_999 {
268 assert_eq!(m.get("name").unwrap(), &string("v3"));
269 } else {
270 panic!("expected map");
271 }
272 }
273
274 #[test]
275 fn read_row_as_of_returns_none_before_first_version() {
276 let s = make_state();
277 s.upsert_versioned(
278 "users",
279 "u1",
280 &[("name", string("v1")), ("_ts_system", ts(100))],
281 )
282 .unwrap();
283 assert!(s.read_row_as_of("users", "u1", 50).is_none());
284 }
285
286 #[test]
287 fn purge_history_before_drops_superseded_versions() {
288 let s = make_state();
289 for (name, t) in [("v1", 100), ("v2", 200), ("v3", 300), ("v4", 400)] {
290 s.upsert_versioned(
291 "users",
292 "u1",
293 &[("name", string(name)), ("_ts_system", ts(t))],
294 )
295 .unwrap();
296 }
297 assert_eq!(s.archive_version_count("users", "u1"), 3);
299 let dropped = s.purge_history_before("users", 250).unwrap();
300 assert_eq!(dropped, 2); assert_eq!(s.archive_version_count("users", "u1"), 1); let live = s.read_row("users", "u1").unwrap();
304 if let LoroValue::Map(m) = live {
305 assert_eq!(m.get("name").unwrap(), &string("v4"));
306 } else {
307 panic!("expected map");
308 }
309 }
310
311 #[test]
312 fn purge_history_before_never_drops_live_row() {
313 let s = make_state();
314 s.upsert_versioned(
315 "users",
316 "u1",
317 &[("name", string("only")), ("_ts_system", ts(100))],
318 )
319 .unwrap();
320 let dropped = s.purge_history_before("users", i64::MAX).unwrap();
322 assert_eq!(dropped, 0);
323 let live = s.read_row("users", "u1").unwrap();
324 if let LoroValue::Map(m) = live {
325 assert_eq!(m.get("name").unwrap(), &string("only"));
326 } else {
327 panic!("expected map");
328 }
329 }
330
331 #[test]
332 fn purge_scoped_to_collection() {
333 let s = make_state();
334 for coll in ["users", "orders"] {
335 for (name, t) in [("v1", 100), ("v2", 200)] {
336 s.upsert_versioned(coll, "row", &[("v", string(name)), ("_ts_system", ts(t))])
337 .unwrap();
338 }
339 }
340 assert_eq!(s.archive_version_count("users", "row"), 1);
341 assert_eq!(s.archive_version_count("orders", "row"), 1);
342 let dropped = s.purge_history_before("users", i64::MAX).unwrap();
343 assert_eq!(dropped, 1);
344 assert_eq!(s.archive_version_count("users", "row"), 0);
345 assert_eq!(s.archive_version_count("orders", "row"), 1); }
347}