Skip to main content

nodedb_array/query/
ceiling.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Coordinate-level Ceiling resolution for bitemporal cell versions.
4//!
5//! `ceiling_resolve_cell` finds the latest system-time version of a cell
6//! at or before `system_as_of`, optionally filtered by a valid-time
7//! point `valid_at_ms`.  The caller supplies the version iterator
8//! newest-first; this function performs no I/O.
9
10use crate::error::{ArrayError, ArrayResult};
11use crate::tile::cell_payload::{CellPayload, is_cell_gdpr_erasure, is_cell_tombstone};
12use crate::types::TileId;
13use crate::types::coord::value::CoordValue;
14
15/// Result of a Ceiling resolution for a single cell coordinate.
16#[derive(Debug, Clone, PartialEq)]
17pub enum CeilingResult {
18    /// A live (non-sentinel) payload whose valid-time bounds satisfy the query.
19    Live(CellPayload),
20    /// The most recent qualifying version is a soft-delete tombstone.
21    Tombstoned,
22    /// The most recent qualifying version is a GDPR erasure marker.
23    Erased,
24    /// No version exists at or before `system_as_of`, or all versions failed
25    /// the valid-time filter and no older version qualifies.
26    NotFound,
27}
28
29/// Parameters for [`ceiling_resolve_cell`] bundled to avoid a long argument list.
30pub struct CeilingParams {
31    /// Upper bound (inclusive) on `TileId::system_from_ms`. Versions newer
32    /// than this cutoff are skipped.
33    pub system_as_of: i64,
34    /// When `Some(vt)`, a live payload is only returned if
35    /// `valid_from_ms <= vt < valid_until_ms`.  Tombstone and erasure
36    /// sentinels short-circuit before this check.
37    pub valid_at_ms: Option<i64>,
38}
39
40/// Resolve the Ceiling for a single cell coordinate.
41///
42/// `tile_versions_newest_first` — `(TileId, raw_cell_bytes)` pairs ordered
43/// newest-first by `system_from_ms`.  The caller is responsible for the
44/// ordering; this function additionally defends against misuse with a guard
45/// that skips entries where `tile_id.system_from_ms > params.system_as_of`.
46///
47/// For each version the function applies:
48/// 1. System-time guard: skip if `tile_id.system_from_ms > system_as_of`.
49/// 2. Sentinel check: tombstone → `Tombstoned`; GDPR erasure → `Erased`.
50/// 3. Valid-time filter (when `valid_at_ms` is `Some`): if the decoded
51///    payload's valid-time interval does not contain `vt`, continue to the
52///    next older version.
53/// 4. Return `Live(payload)`.
54///
55/// If the iterator is exhausted without a match, returns `NotFound`.
56pub fn ceiling_resolve_cell<'a, I>(
57    tile_versions_newest_first: I,
58    _coord: &[CoordValue],
59    params: &CeilingParams,
60) -> ArrayResult<CeilingResult>
61where
62    I: IntoIterator<Item = (TileId, &'a [u8])>,
63{
64    for (tile_id, bytes) in tile_versions_newest_first {
65        if tile_id.system_from_ms > params.system_as_of {
66            continue;
67        }
68        if is_cell_tombstone(bytes) {
69            return Ok(CeilingResult::Tombstoned);
70        }
71        if is_cell_gdpr_erasure(bytes) {
72            return Ok(CeilingResult::Erased);
73        }
74        let payload = CellPayload::decode(bytes).map_err(|e| ArrayError::SegmentCorruption {
75            detail: format!("ceiling_resolve_cell: {e}"),
76        })?;
77        match params.valid_at_ms {
78            Some(vt) if !(payload.valid_from_ms <= vt && vt < payload.valid_until_ms) => {
79                continue;
80            }
81            _ => return Ok(CeilingResult::Live(payload)),
82        }
83    }
84    Ok(CeilingResult::NotFound)
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use crate::tile::cell_payload::{
91        CELL_GDPR_ERASURE_SENTINEL, CELL_TOMBSTONE_SENTINEL, CellPayload, OPEN_UPPER,
92    };
93    use crate::types::TileId;
94    use crate::types::cell_value::value::CellValue;
95    use nodedb_types::Surrogate;
96
97    fn payload(valid_from: i64, valid_until: i64, val: i64) -> Vec<u8> {
98        CellPayload {
99            valid_from_ms: valid_from,
100            valid_until_ms: valid_until,
101            attrs: vec![CellValue::Int64(val)],
102            surrogate: Surrogate::ZERO,
103        }
104        .encode()
105        .unwrap()
106    }
107
108    fn live_params(system_as_of: i64, valid_at_ms: Option<i64>) -> CeilingParams {
109        CeilingParams {
110            system_as_of,
111            valid_at_ms,
112        }
113    }
114
115    #[test]
116    fn live_ceiling_at_cutoff() {
117        // Three versions: v1@100, v2@200, v3@300.
118        // Query at system_as_of=250 → v2.
119        let v1 = payload(0, OPEN_UPPER, 1);
120        let v2 = payload(0, OPEN_UPPER, 2);
121        let v3 = payload(0, OPEN_UPPER, 3);
122
123        let versions = vec![
124            (TileId::new(1, 300), v3.as_slice()),
125            (TileId::new(1, 200), v2.as_slice()),
126            (TileId::new(1, 100), v1.as_slice()),
127        ];
128
129        let result = ceiling_resolve_cell(versions, &[], &live_params(250, None)).unwrap();
130        match result {
131            CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(2)),
132            other => panic!("expected Live(2), got {other:?}"),
133        }
134    }
135
136    #[test]
137    fn tombstone_shadows_earlier_versions() {
138        let v1 = payload(0, OPEN_UPPER, 1);
139        let versions_at_300 = vec![
140            (TileId::new(1, 200), CELL_TOMBSTONE_SENTINEL),
141            (TileId::new(1, 100), v1.as_slice()),
142        ];
143        let result = ceiling_resolve_cell(versions_at_300, &[], &live_params(300, None)).unwrap();
144        assert!(matches!(result, CeilingResult::Tombstoned));
145
146        // Even at cutoff equal to the tombstone's system_from_ms.
147        let v1b = payload(0, OPEN_UPPER, 1);
148        let versions_at_200 = vec![
149            (TileId::new(1, 200), CELL_TOMBSTONE_SENTINEL),
150            (TileId::new(1, 100), v1b.as_slice()),
151        ];
152        let result2 = ceiling_resolve_cell(versions_at_200, &[], &live_params(200, None)).unwrap();
153        assert!(matches!(result2, CeilingResult::Tombstoned));
154    }
155
156    #[test]
157    fn gdpr_erasure_distinct_from_tombstone() {
158        let v1 = payload(0, OPEN_UPPER, 1);
159        let versions = vec![
160            (TileId::new(1, 200), CELL_GDPR_ERASURE_SENTINEL),
161            (TileId::new(1, 100), v1.as_slice()),
162        ];
163        let result = ceiling_resolve_cell(versions, &[], &live_params(300, None)).unwrap();
164        assert!(matches!(result, CeilingResult::Erased));
165        assert!(!matches!(result, CeilingResult::Tombstoned));
166    }
167
168    #[test]
169    fn valid_time_filter_falls_back_to_older_version() {
170        // v1: valid [0,100), v2: valid [200,300).
171        let v1 = payload(0, 100, 1);
172        let v2 = payload(200, 300, 2);
173
174        let make_versions = || {
175            vec![
176                (TileId::new(1, 200), v2.clone()),
177                (TileId::new(1, 100), v1.clone()),
178            ]
179        };
180
181        // valid_at=50 → falls through v2 (not in [200,300)) → returns v1.
182        let r1 = ceiling_resolve_cell(
183            make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
184            &[],
185            &live_params(300, Some(50)),
186        )
187        .unwrap();
188        match r1 {
189            CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(1)),
190            other => panic!("expected Live(1) for valid_at=50, got {other:?}"),
191        }
192
193        // valid_at=150 → v2 not in [200,300), v1 not in [0,100) → NotFound.
194        let r2 = ceiling_resolve_cell(
195            make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
196            &[],
197            &live_params(300, Some(150)),
198        )
199        .unwrap();
200        assert!(matches!(r2, CeilingResult::NotFound));
201
202        // valid_at=250 → v2 in [200,300) → Live(2).
203        let r3 = ceiling_resolve_cell(
204            make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
205            &[],
206            &live_params(300, Some(250)),
207        )
208        .unwrap();
209        match r3 {
210            CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(2)),
211            other => panic!("expected Live(2) for valid_at=250, got {other:?}"),
212        }
213    }
214
215    #[test]
216    fn not_found_below_horizon() {
217        let v1 = payload(0, OPEN_UPPER, 1);
218        // Only version at system_from_ms=100; query at system_as_of=50.
219        let versions = vec![(TileId::new(1, 100), v1.as_slice())];
220        let result = ceiling_resolve_cell(versions, &[], &live_params(50, None)).unwrap();
221        assert!(matches!(result, CeilingResult::NotFound));
222    }
223}