Skip to main content

nodedb_array/query/
retention.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Cell-level retention merger for bitemporal tile compaction.
4//!
5//! [`merge_for_retention`] replaces the tile-granularity
6//! `partition_by_retention` for compaction paths that must honour the sparse
7//! tile model: because tile-versions are SPARSE (each version only stores the
8//! cells that were written at that `system_from_ms`), the retention ceiling
9//! must be computed per cell coordinate, not per tile.
10
11use std::collections::{HashMap, HashSet};
12
13use crate::error::{ArrayError, ArrayResult};
14use crate::schema::ArraySchema;
15use crate::segment::TileEntry;
16use crate::segment::reader::{SegmentReader, TilePayload};
17use crate::tile::cell_payload::CellPayload;
18use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTile, SparseTileBuilder};
19use crate::types::TileId;
20use crate::types::coord::value::CoordValue;
21use nodedb_types::{OPEN_UPPER, Surrogate};
22
23// ── Result type ─────────────────────────────────────────────────────────────
24
25/// Result of [`merge_for_retention`].
26pub struct RetentionMergeResult {
27    /// Synthetic ceiling tile materialising all cells live AS OF the horizon.
28    ///
29    /// `None` when there are zero cells to carry forward (everything erased
30    /// or no out-of-horizon versions existed).
31    pub ceiling_tile: Option<SparseTile>,
32    /// TileIds of in-horizon tile-versions that pass through unchanged.
33    pub keep_inhorizon: Vec<TileId>,
34    /// TileIds of out-of-horizon tile-versions whose cells were merged into
35    /// the ceiling tile (or dropped via GDPR) and should be removed from the
36    /// segment.
37    pub dropped_tile_ids: Vec<TileId>,
38    /// Telemetry: number of distinct cells materialised into the ceiling.
39    pub cells_carried_forward: usize,
40}
41
42// ── Coord key helpers ────────────────────────────────────────────────────────
43
44/// Encode a coordinate vector into a stable byte key for HashMap / HashSet use.
45///
46/// Uses the same zerompk encoding as the write path so keys are comparable
47/// across tile-versions.
48pub fn encode_coord_key(coord: &[CoordValue]) -> ArrayResult<Vec<u8>> {
49    let owned = coord.to_vec();
50    zerompk::to_msgpack_vec(&owned).map_err(|e| ArrayError::SegmentCorruption {
51        detail: format!("encode_coord_key: {e}"),
52    })
53}
54
55// ── Row iteration helper ─────────────────────────────────────────────────────
56
57/// Decoded row from a [`SparseTile`].
58pub struct DecodedRow {
59    pub coord_key: Vec<u8>,
60    pub coord: Vec<CoordValue>,
61    pub kind: RowKind,
62    /// `Some` for `Live` rows; `None` for `Tombstone` / `GdprErased`.
63    pub payload: Option<CellPayload>,
64}
65
66/// Iterate every row in a [`SparseTile`], decoding coords and payload.
67///
68/// Rows are returned in storage order (same as `row_count()` iteration).
69/// Attribute columns in `SparseTile` are indexed by *live-row index*, not by
70/// iteration index, so a separate live counter advances only for Live rows.
71pub fn decode_sparse_rows(tile: &SparseTile) -> ArrayResult<Vec<DecodedRow>> {
72    let n = tile.row_count();
73    let arity = tile.dim_dicts.len();
74    let mut rows = Vec::with_capacity(n);
75    let mut live_idx: usize = 0;
76
77    for row in 0..n {
78        // Decode coordinate for this row.
79        let mut coord = Vec::with_capacity(arity);
80        for dim_idx in 0..arity {
81            let dict =
82                tile.dim_dicts
83                    .get(dim_idx)
84                    .ok_or_else(|| ArrayError::SegmentCorruption {
85                        detail: format!("decode_sparse_rows: dim {dim_idx} missing"),
86                    })?;
87            let entry_idx = *dict
88                .indices
89                .get(row)
90                .ok_or_else(|| ArrayError::SegmentCorruption {
91                    detail: format!("decode_sparse_rows: row {row} index out of range"),
92                })? as usize;
93            let val = dict
94                .values
95                .get(entry_idx)
96                .ok_or_else(|| ArrayError::SegmentCorruption {
97                    detail: format!("decode_sparse_rows: dict entry {entry_idx} out of range"),
98                })?;
99            coord.push(val.clone());
100        }
101
102        let coord_key = encode_coord_key(&coord)?;
103        let kind = tile.row_kind(row)?;
104
105        let payload = match kind {
106            RowKind::Live => {
107                // Build attrs from each attr column using live_idx.
108                let attrs: Vec<_> = tile
109                    .attr_cols
110                    .iter()
111                    .map(|col| {
112                        col.get(live_idx)
113                            .cloned()
114                            .ok_or_else(|| ArrayError::SegmentCorruption {
115                                detail: format!(
116                                    "decode_sparse_rows: attr col live_idx {live_idx} out of range"
117                                ),
118                            })
119                    })
120                    .collect::<ArrayResult<Vec<_>>>()?;
121
122                let surrogate = tile.surrogates.get(row).copied().unwrap_or(Surrogate::ZERO);
123                let valid_from_ms = tile.valid_from_ms.get(row).copied().ok_or_else(|| {
124                    ArrayError::SegmentCorruption {
125                        detail: format!("decode_sparse_rows: valid_from_ms row {row} out of range"),
126                    }
127                })?;
128                let valid_until_ms = tile.valid_until_ms.get(row).copied().ok_or_else(|| {
129                    ArrayError::SegmentCorruption {
130                        detail: format!(
131                            "decode_sparse_rows: valid_until_ms row {row} out of range"
132                        ),
133                    }
134                })?;
135
136                live_idx += 1;
137                Some(CellPayload {
138                    valid_from_ms,
139                    valid_until_ms,
140                    attrs,
141                    surrogate,
142                })
143            }
144            RowKind::Tombstone | RowKind::GdprErased => None,
145        };
146
147        rows.push(DecodedRow {
148            coord_key,
149            coord,
150            kind,
151            payload,
152        });
153    }
154
155    Ok(rows)
156}
157
158// ── Public API ───────────────────────────────────────────────────────────────
159
160/// Merge tile-versions for one `hilbert_prefix` according to the retention
161/// policy, operating at cell granularity.
162///
163/// `versions` contains all [`TileEntry`] records for a single `hilbert_prefix`
164/// (the caller is responsible for grouping). They may arrive in any order; this
165/// function sorts internally.
166///
167/// `horizon_ms` is the retention boundary expressed as an absolute
168/// `system_from_ms` timestamp: versions with `system_from_ms < horizon_ms`
169/// are outside the retention window.
170pub fn merge_for_retention(
171    versions: &[TileEntry],
172    reader: &SegmentReader<'_>,
173    schema: &ArraySchema,
174    horizon_ms: i64,
175) -> ArrayResult<RetentionMergeResult> {
176    if versions.is_empty() {
177        return Ok(RetentionMergeResult {
178            ceiling_tile: None,
179            keep_inhorizon: Vec::new(),
180            dropped_tile_ids: Vec::new(),
181            cells_carried_forward: 0,
182        });
183    }
184
185    // Partition into inside / outside by system_from_ms.
186    let mut inside: Vec<&TileEntry> = Vec::new();
187    let mut outside: Vec<&TileEntry> = Vec::new();
188    for entry in versions {
189        if entry.tile_id.system_from_ms >= horizon_ms {
190            inside.push(entry);
191        } else {
192            outside.push(entry);
193        }
194    }
195
196    // Collect TileIds for in-horizon pass-through.
197    let keep_inhorizon: Vec<TileId> = inside.iter().map(|e| e.tile_id).collect();
198
199    // If nothing is outside the horizon, there is nothing to merge.
200    if outside.is_empty() {
201        return Ok(RetentionMergeResult {
202            ceiling_tile: None,
203            keep_inhorizon,
204            dropped_tile_ids: Vec::new(),
205            cells_carried_forward: 0,
206        });
207    }
208
209    // Build the set of coord keys already covered by inside-horizon versions.
210    // Any coord present in any inside tile (regardless of RowKind) supersedes
211    // what the ceiling would contribute for that coord.
212    let mut inhorizon_coords: HashSet<Vec<u8>> = HashSet::new();
213    for entry in &inside {
214        let tile_idx = find_tile_index(reader, entry.tile_id)?;
215        let payload = reader.read_tile(tile_idx)?;
216        if let TilePayload::Sparse(ref tile) = payload {
217            for trow in decode_sparse_rows(tile)? {
218                inhorizon_coords.insert(trow.coord_key);
219            }
220        }
221        // Dense tiles inside horizon contribute no coords to the exclusion set
222        // for the ceiling (dense tiles don't participate in sparse retention
223        // merge; their coords are not relevant here).
224    }
225
226    // Walk outside-horizon versions newest → oldest, accumulating the ceiling.
227    // For each coord not yet seen, the first (newest) occurrence wins.
228    outside.sort_by_key(|e| std::cmp::Reverse(e.tile_id.system_from_ms));
229
230    // Maps coord_key → (coord, RowKind, Option<CellPayload>).
231    let mut ceiling: HashMap<Vec<u8>, (Vec<CoordValue>, RowKind, Option<CellPayload>)> =
232        HashMap::new();
233
234    for entry in &outside {
235        let tile_idx = find_tile_index(reader, entry.tile_id)?;
236        let payload = reader.read_tile(tile_idx)?;
237        if let TilePayload::Sparse(ref tile) = payload {
238            for trow in decode_sparse_rows(tile)? {
239                // Skip coords already covered by an inside-horizon version.
240                if inhorizon_coords.contains(&trow.coord_key) {
241                    continue;
242                }
243                // First occurrence (newest) wins for each coord.
244                ceiling
245                    .entry(trow.coord_key)
246                    .or_insert((trow.coord, trow.kind, trow.payload));
247            }
248        }
249    }
250
251    // Collect dropped TileIds — all outside tile-versions are superseded by
252    // the ceiling or dropped via GDPR.
253    let dropped_tile_ids: Vec<TileId> = outside.iter().map(|e| e.tile_id).collect();
254
255    // Build the synthetic ceiling SparseTile.
256    // GdprErased coords are excluded from the ceiling entirely.
257    // Tombstone and Live coords are included with their respective RowKind.
258    let mut builder = SparseTileBuilder::new(schema);
259    let mut cells_carried_forward: usize = 0;
260
261    // Deterministic output order: sort by coord_key for stable tests.
262    type CeilingRow = (Vec<u8>, Vec<CoordValue>, RowKind, Option<CellPayload>);
263    let mut ceiling_rows: Vec<CeilingRow> = ceiling
264        .into_iter()
265        .map(|(k, (coord, kind, payload))| (k, coord, kind, payload))
266        .collect();
267    ceiling_rows.sort_by(|a, b| a.0.cmp(&b.0));
268
269    for (_key, coord, kind, payload) in ceiling_rows {
270        match kind {
271            RowKind::GdprErased => {
272                // Drop entirely — GDPR erasure removes the cell from the ceiling.
273            }
274            RowKind::Tombstone => {
275                builder.push_row(SparseRow {
276                    coord: &coord,
277                    attrs: &[],
278                    surrogate: Surrogate::ZERO,
279                    valid_from_ms: 0,
280                    valid_until_ms: OPEN_UPPER,
281                    kind: RowKind::Tombstone,
282                })?;
283                cells_carried_forward += 1;
284            }
285            RowKind::Live => {
286                let p = payload.ok_or_else(|| ArrayError::SegmentCorruption {
287                    detail: "Live row in ceiling has no CellPayload".into(),
288                })?;
289                builder.push_row(SparseRow {
290                    coord: &coord,
291                    attrs: &p.attrs,
292                    surrogate: p.surrogate,
293                    valid_from_ms: p.valid_from_ms,
294                    valid_until_ms: p.valid_until_ms,
295                    kind: RowKind::Live,
296                })?;
297                cells_carried_forward += 1;
298            }
299        }
300    }
301
302    let ceiling_tile = if cells_carried_forward == 0 {
303        None
304    } else {
305        Some(builder.build())
306    };
307
308    Ok(RetentionMergeResult {
309        ceiling_tile,
310        keep_inhorizon,
311        dropped_tile_ids,
312        cells_carried_forward,
313    })
314}
315
316// ── Internal helpers ─────────────────────────────────────────────────────────
317
318/// Locate the position of a tile with `tile_id` in the reader's footer.
319fn find_tile_index(reader: &SegmentReader<'_>, tile_id: TileId) -> ArrayResult<usize> {
320    reader
321        .tiles()
322        .iter()
323        .position(|e| e.tile_id == tile_id)
324        .ok_or_else(|| ArrayError::SegmentCorruption {
325            detail: format!("find_tile_index: TileId {tile_id:?} not found in segment"),
326        })
327}
328
329// ── Tests ────────────────────────────────────────────────────────────────────
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use crate::schema::ArraySchemaBuilder;
335    use crate::schema::attr_spec::{AttrSpec, AttrType};
336    use crate::schema::dim_spec::{DimSpec, DimType};
337    use crate::segment::writer::SegmentWriter;
338    use crate::types::cell_value::value::CellValue;
339    use crate::types::coord::value::CoordValue;
340    use crate::types::domain::{Domain, DomainBound};
341
342    fn schema() -> ArraySchema {
343        ArraySchemaBuilder::new("t")
344            .dim(DimSpec::new(
345                "x",
346                DimType::Int64,
347                Domain::new(DomainBound::Int64(0), DomainBound::Int64(1000)),
348            ))
349            .attr(AttrSpec::new("v", AttrType::Int64, true))
350            .tile_extents(vec![100])
351            .build()
352            .unwrap()
353    }
354
355    /// Write a segment with the given (TileId, SparseTile) pairs (must be
356    /// strictly ascending by TileId) and return the in-memory bytes.
357    fn build_segment(pairs: Vec<(TileId, SparseTile)>) -> Vec<u8> {
358        let mut w = SegmentWriter::new(0xBEEF);
359        for (id, tile) in pairs {
360            w.append_sparse(id, &tile).unwrap();
361        }
362        w.finish(None).unwrap()
363    }
364
365    /// Build a single-row Live SparseTile at coord `x` with value `v`.
366    fn live_tile(schema: &ArraySchema, x: i64, v: i64) -> SparseTile {
367        let mut b = SparseTileBuilder::new(schema);
368        b.push_row(SparseRow {
369            coord: &[CoordValue::Int64(x)],
370            attrs: &[CellValue::Int64(v)],
371            surrogate: Surrogate::ZERO,
372            valid_from_ms: 0,
373            valid_until_ms: OPEN_UPPER,
374            kind: RowKind::Live,
375        })
376        .unwrap();
377        b.build()
378    }
379
380    /// Build a single-row Tombstone SparseTile at coord `x`.
381    fn tombstone_tile(schema: &ArraySchema, x: i64) -> SparseTile {
382        let mut b = SparseTileBuilder::new(schema);
383        b.push_row(SparseRow {
384            coord: &[CoordValue::Int64(x)],
385            attrs: &[],
386            surrogate: Surrogate::ZERO,
387            valid_from_ms: 0,
388            valid_until_ms: OPEN_UPPER,
389            kind: RowKind::Tombstone,
390        })
391        .unwrap();
392        b.build()
393    }
394
395    /// Build a single-row GdprErased SparseTile at coord `x`.
396    fn gdpr_tile(schema: &ArraySchema, x: i64) -> SparseTile {
397        let mut b = SparseTileBuilder::new(schema);
398        b.push_row(SparseRow {
399            coord: &[CoordValue::Int64(x)],
400            attrs: &[],
401            surrogate: Surrogate::ZERO,
402            valid_from_ms: 0,
403            valid_until_ms: OPEN_UPPER,
404            kind: RowKind::GdprErased,
405        })
406        .unwrap();
407        b.build()
408    }
409
410    /// Collect coord x-values from a SparseTile (single-dim schema).
411    fn ceiling_x_values(tile: &SparseTile) -> Vec<i64> {
412        let rows = decode_sparse_rows(tile).unwrap();
413        let mut xs: Vec<i64> = rows
414            .iter()
415            .map(|r| match r.coord[0] {
416                CoordValue::Int64(x) => x,
417                _ => panic!("unexpected coord type"),
418            })
419            .collect();
420        xs.sort_unstable();
421        xs
422    }
423
424    /// Collect RowKinds from ceiling tile rows, keyed by coord.
425    fn ceiling_kinds(tile: &SparseTile) -> HashMap<i64, RowKind> {
426        decode_sparse_rows(tile)
427            .unwrap()
428            .into_iter()
429            .map(|r| {
430                let x = match r.coord[0] {
431                    CoordValue::Int64(x) => x,
432                    _ => panic!("unexpected coord type"),
433                };
434                (x, r.kind)
435            })
436            .collect()
437    }
438
439    // ── Test 1 ──────────────────────────────────────────────────────────────
440
441    /// Two coords written at different system times, both outside horizon.
442    /// Both must appear in the ceiling (the classic cell-loss regression).
443    #[test]
444    fn cell_preservation_across_sparse_writes() {
445        let s = schema();
446        // T=100: writes coord x=1. T=200: writes coord x=2. Horizon=300.
447        let t100 = TileId::new(0, 100);
448        let t200 = TileId::new(0, 200);
449        let bytes = build_segment(vec![
450            (t100, live_tile(&s, 1, 10)),
451            (t200, live_tile(&s, 2, 20)),
452        ]);
453        let reader = SegmentReader::open(&bytes).unwrap();
454        let versions: Vec<TileEntry> = reader.tiles().to_vec();
455        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
456        let ceiling = result.ceiling_tile.expect("ceiling must be non-None");
457        let xs = ceiling_x_values(&ceiling);
458        assert_eq!(xs, vec![1, 2], "both cells must survive in ceiling");
459        assert_eq!(result.cells_carried_forward, 2);
460        assert!(result.keep_inhorizon.is_empty());
461        assert_eq!(result.dropped_tile_ids.len(), 2);
462    }
463
464    // ── Test 2 ──────────────────────────────────────────────────────────────
465
466    /// All versions inside horizon → pass through unchanged, no ceiling.
467    #[test]
468    fn inhorizon_versions_pass_through_unchanged() {
469        let s = schema();
470        let t400 = TileId::new(0, 400);
471        let t500 = TileId::new(0, 500);
472        let bytes = build_segment(vec![
473            (t400, live_tile(&s, 1, 10)),
474            (t500, live_tile(&s, 1, 20)),
475        ]);
476        let reader = SegmentReader::open(&bytes).unwrap();
477        let versions: Vec<TileEntry> = reader.tiles().to_vec();
478        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
479        assert!(result.ceiling_tile.is_none());
480        let mut keep: Vec<i64> = result
481            .keep_inhorizon
482            .iter()
483            .map(|id| id.system_from_ms)
484            .collect();
485        keep.sort_unstable();
486        assert_eq!(keep, vec![400, 500]);
487        assert!(result.dropped_tile_ids.is_empty());
488    }
489
490    // ── Test 3 ──────────────────────────────────────────────────────────────
491
492    /// Mixed: T=100 (cell A), T=200 (cell B), T=400 (cell C). Horizon=300.
493    /// Ceiling contains A+B; T=400 is in keep_inhorizon.
494    #[test]
495    fn mixed_inhorizon_and_outhorizon() {
496        let s = schema();
497        let bytes = build_segment(vec![
498            (TileId::new(0, 100), live_tile(&s, 1, 10)),
499            (TileId::new(0, 200), live_tile(&s, 2, 20)),
500            (TileId::new(0, 400), live_tile(&s, 3, 30)),
501        ]);
502        let reader = SegmentReader::open(&bytes).unwrap();
503        let versions: Vec<TileEntry> = reader.tiles().to_vec();
504        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
505        let ceiling = result.ceiling_tile.expect("ceiling must have A and B");
506        let xs = ceiling_x_values(&ceiling);
507        assert_eq!(xs, vec![1, 2]);
508        assert_eq!(result.cells_carried_forward, 2);
509        assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
510        let mut dropped: Vec<i64> = result
511            .dropped_tile_ids
512            .iter()
513            .map(|id| id.system_from_ms)
514            .collect();
515        dropped.sort_unstable();
516        assert_eq!(dropped, vec![100, 200]);
517    }
518
519    // ── Test 4 ──────────────────────────────────────────────────────────────
520
521    /// T=100 Live(A), T=200 Tombstone(A). Horizon=300.
522    /// Ceiling contains Tombstone(A) — newest outside-horizon wins.
523    #[test]
524    fn tombstone_collapses_into_ceiling() {
525        let s = schema();
526        let bytes = build_segment(vec![
527            (TileId::new(0, 100), live_tile(&s, 1, 10)),
528            (TileId::new(0, 200), tombstone_tile(&s, 1)),
529        ]);
530        let reader = SegmentReader::open(&bytes).unwrap();
531        let versions: Vec<TileEntry> = reader.tiles().to_vec();
532        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
533        let ceiling = result.ceiling_tile.expect("ceiling must contain tombstone");
534        let kinds = ceiling_kinds(&ceiling);
535        assert_eq!(kinds.get(&1), Some(&RowKind::Tombstone));
536        assert_eq!(result.cells_carried_forward, 1);
537    }
538
539    // ── Test 5 ──────────────────────────────────────────────────────────────
540
541    /// T=100 Tombstone(A), T=400 Live(A). Horizon=300.
542    /// Inside version T=400 has Live(A) — ceiling must NOT include A.
543    #[test]
544    fn tombstone_below_with_inhorizon_live_succeeds() {
545        let s = schema();
546        // TileIds must be strictly ascending for SegmentWriter.
547        let bytes = build_segment(vec![
548            (TileId::new(0, 100), tombstone_tile(&s, 1)),
549            (TileId::new(0, 400), live_tile(&s, 1, 99)),
550        ]);
551        let reader = SegmentReader::open(&bytes).unwrap();
552        let versions: Vec<TileEntry> = reader.tiles().to_vec();
553        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
554        // Ceiling should be None: coord x=1 is covered by the inside version.
555        assert!(
556            result.ceiling_tile.is_none(),
557            "inside version covers x=1; ceiling must be empty"
558        );
559        assert_eq!(result.cells_carried_forward, 0);
560        assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
561    }
562
563    // ── Test 6 ──────────────────────────────────────────────────────────────
564
565    /// T=100 Live(A), T=200 GdprErased(A). Horizon=300.
566    /// GDPR erasure takes precedence: ceiling does NOT contain A.
567    #[test]
568    fn gdpr_erasure_drops_cell_outright() {
569        let s = schema();
570        let bytes = build_segment(vec![
571            (TileId::new(0, 100), live_tile(&s, 1, 10)),
572            (TileId::new(0, 200), gdpr_tile(&s, 1)),
573        ]);
574        let reader = SegmentReader::open(&bytes).unwrap();
575        let versions: Vec<TileEntry> = reader.tiles().to_vec();
576        let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
577        assert!(
578            result.ceiling_tile.is_none(),
579            "GdprErased newest version must suppress cell from ceiling"
580        );
581        assert_eq!(result.cells_carried_forward, 0);
582        assert_eq!(result.dropped_tile_ids.len(), 2);
583    }
584
585    // ── Test 7 ──────────────────────────────────────────────────────────────
586
587    /// Empty input → all result fields empty.
588    #[test]
589    fn empty_input_returns_empty() {
590        let s = schema();
591        let bytes = build_segment(vec![]);
592        let reader = SegmentReader::open(&bytes).unwrap();
593        let result = merge_for_retention(&[], &reader, &s, 300).unwrap();
594        assert!(result.ceiling_tile.is_none());
595        assert!(result.keep_inhorizon.is_empty());
596        assert!(result.dropped_tile_ids.is_empty());
597        assert_eq!(result.cells_carried_forward, 0);
598    }
599}