Skip to main content

nodedb_array/tile/
sparse_tile.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Sparse tile payload — coordinate list + per-attribute columns.
4//!
5//! Storage layout is column-major so per-attribute codecs
6//! (`nodedb-codec`) can be applied without a transpose. Dimension values are
7//! dictionary-encoded inline: rare in genomic / single-cell workloads
8//! that have many repeats per tile.
9
10use nodedb_types::{OPEN_UPPER, Surrogate};
11use serde::{Deserialize, Serialize};
12
13use super::mbr::{MbrBuilder, TileMBR};
14use crate::error::{ArrayError, ArrayResult};
15use crate::schema::ArraySchema;
16use crate::types::cell_value::value::CellValue;
17use crate::types::coord::value::CoordValue;
18
19/// Per-row lifecycle classification for a [`SparseTile`] row.
20///
21/// Stored as a parallel `u8` column (`row_kinds`) so the segment can
22/// carry tombstone and GDPR-erasure markers that survive flush and are
23/// visible to post-flush reads. Compaction's retention policy is the
24/// only path allowed to drop these rows.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum RowKind {
27    Live = 0,
28    Tombstone = 1,
29    GdprErased = 2,
30}
31
32impl RowKind {
33    /// Convert a raw byte to a `RowKind`. Fails on unknown values so
34    /// corrupt data surfaces as a typed error rather than silent `Live`.
35    pub fn from_u8(b: u8) -> ArrayResult<Self> {
36        match b {
37            0 => Ok(Self::Live),
38            1 => Ok(Self::Tombstone),
39            2 => Ok(Self::GdprErased),
40            other => Err(ArrayError::SegmentCorruption {
41                detail: format!("unknown RowKind byte: {other:#04x}"),
42            }),
43        }
44    }
45
46    pub fn as_u8(self) -> u8 {
47        self as u8
48    }
49}
50
51impl From<RowKind> for u8 {
52    fn from(k: RowKind) -> u8 {
53        k.as_u8()
54    }
55}
56
57/// Per-dim dictionary: distinct values seen, and one index per cell
58/// pointing into the dictionary. Index width is selected by callers
59/// when picking the codec.
60#[derive(
61    Debug,
62    Clone,
63    PartialEq,
64    Serialize,
65    Deserialize,
66    zerompk::ToMessagePack,
67    zerompk::FromMessagePack,
68)]
69pub struct DimDict {
70    pub values: Vec<CoordValue>,
71    pub indices: Vec<u32>,
72}
73
74impl DimDict {
75    fn new() -> Self {
76        Self {
77            values: Vec::new(),
78            indices: Vec::new(),
79        }
80    }
81
82    fn push(&mut self, v: &CoordValue) {
83        if let Some(idx) = self.values.iter().position(|x| x == v) {
84            self.indices.push(idx as u32);
85        } else {
86            self.indices.push(self.values.len() as u32);
87            self.values.push(v.clone());
88        }
89    }
90
91    pub fn cardinality(&self) -> usize {
92        self.values.len()
93    }
94
95    pub fn len(&self) -> usize {
96        self.indices.len()
97    }
98
99    pub fn is_empty(&self) -> bool {
100        self.indices.is_empty()
101    }
102}
103
104/// Sparse tile payload.
105#[derive(
106    Debug,
107    Clone,
108    PartialEq,
109    Serialize,
110    Deserialize,
111    zerompk::ToMessagePack,
112    zerompk::FromMessagePack,
113)]
114pub struct SparseTile {
115    /// One dictionary per schema dim, parallel to `schema.dims`.
116    pub dim_dicts: Vec<DimDict>,
117    /// One column per schema attr, parallel to `schema.attrs`.
118    pub attr_cols: Vec<Vec<CellValue>>,
119    /// Per-cell global surrogate (one entry per row, parallel to the
120    /// dim-dict index streams and `attr_cols`). Cross-engine bitmap
121    /// joins read this column directly without translating coords back
122    /// to user-visible primary keys.
123    pub surrogates: Vec<Surrogate>,
124    /// Per-cell valid-time lower bound in milliseconds (inclusive).
125    /// Parallel to `surrogates`.
126    pub valid_from_ms: Vec<i64>,
127    /// Per-cell valid-time upper bound in milliseconds (exclusive).
128    /// [`nodedb_types::OPEN_UPPER`] (`i64::MAX`) means open-ended.
129    /// Parallel to `surrogates`.
130    pub valid_until_ms: Vec<i64>,
131    /// Per-row [`RowKind`] encoded as `u8`. Parallel to `surrogates`.
132    /// `0` = Live, `1` = Tombstone, `2` = GdprErased.
133    ///
134    /// Older segments written before this column existed will deserialise
135    /// with an empty `Vec`; readers treat a missing entry as `Live`.
136    pub row_kinds: Vec<u8>,
137    pub mbr: TileMBR,
138}
139
140impl SparseTile {
141    /// Empty tile sized for the given schema.
142    pub fn empty(schema: &ArraySchema) -> Self {
143        Self {
144            dim_dicts: (0..schema.arity()).map(|_| DimDict::new()).collect(),
145            attr_cols: (0..schema.attrs.len()).map(|_| Vec::new()).collect(),
146            surrogates: Vec::new(),
147            valid_from_ms: Vec::new(),
148            valid_until_ms: Vec::new(),
149            row_kinds: Vec::new(),
150            mbr: TileMBR::new(schema.arity(), schema.attrs.len()),
151        }
152    }
153
154    /// Count of live (non-sentinel) rows. Used for MBR statistics and
155    /// predicate-pushdown decisions. Sentinel rows are excluded.
156    ///
157    /// **Do not use this value as a row-iteration bound.** Iteration must
158    /// cover sentinel rows so callers can observe Tombstone / GdprErased
159    /// markers when applying bitemporal visibility rules — use
160    /// [`row_count`](Self::row_count) and filter with
161    /// [`row_kind`](Self::row_kind) instead.
162    pub fn nnz(&self) -> u32 {
163        self.mbr.nnz
164    }
165
166    /// Total physical row count including sentinel (Tombstone / GdprErased)
167    /// rows. Use this as the upper bound when iterating over rows; check
168    /// [`row_kind`](Self::row_kind) inside the loop and skip non-Live rows
169    /// when reading attribute columns. The attribute column vectors are
170    /// indexed by the **live-row index** (incremented only for Live rows),
171    /// not by the iteration index — confusing the two yields incorrect or
172    /// out-of-bounds reads.
173    pub fn row_count(&self) -> usize {
174        self.surrogates.len()
175    }
176
177    /// Decode the `RowKind` for the given row index.
178    ///
179    /// Returns [`RowKind::Live`] when `row_kinds` is empty (forward-compat
180    /// with pre-v4 data still in memory during migration) or when the row
181    /// index is out of range.
182    pub fn row_kind(&self, row: usize) -> ArrayResult<RowKind> {
183        match self.row_kinds.get(row) {
184            None => Ok(RowKind::Live),
185            Some(&b) => RowKind::from_u8(b),
186        }
187    }
188}
189
190/// All row-level data passed to [`SparseTileBuilder::push_row`].
191pub struct SparseRow<'a> {
192    pub coord: &'a [CoordValue],
193    pub attrs: &'a [CellValue],
194    pub surrogate: Surrogate,
195    pub valid_from_ms: i64,
196    pub valid_until_ms: i64,
197    pub kind: RowKind,
198}
199
200impl<'a> SparseRow<'a> {
201    /// Construct a live row. Convenience wrapper so callers that only write
202    /// live data don't have to spell out `kind: RowKind::Live` explicitly.
203    pub fn live(
204        coord: &'a [CoordValue],
205        attrs: &'a [CellValue],
206        surrogate: Surrogate,
207        valid_from_ms: i64,
208        valid_until_ms: i64,
209    ) -> Self {
210        Self {
211            coord,
212            attrs,
213            surrogate,
214            valid_from_ms,
215            valid_until_ms,
216            kind: RowKind::Live,
217        }
218    }
219}
220
221/// Streaming builder. Folds `(coord, attrs)` pairs into the tile body
222/// and the MBR in one pass.
223pub struct SparseTileBuilder<'a> {
224    schema: &'a ArraySchema,
225    dim_dicts: Vec<DimDict>,
226    attr_cols: Vec<Vec<CellValue>>,
227    surrogates: Vec<Surrogate>,
228    valid_from_ms: Vec<i64>,
229    valid_until_ms: Vec<i64>,
230    row_kinds: Vec<u8>,
231    mbr: MbrBuilder,
232}
233
234impl<'a> SparseTileBuilder<'a> {
235    pub fn new(schema: &'a ArraySchema) -> Self {
236        Self {
237            schema,
238            dim_dicts: (0..schema.arity()).map(|_| DimDict::new()).collect(),
239            attr_cols: (0..schema.attrs.len()).map(|_| Vec::new()).collect(),
240            surrogates: Vec::new(),
241            valid_from_ms: Vec::new(),
242            valid_until_ms: Vec::new(),
243            row_kinds: Vec::new(),
244            mbr: MbrBuilder::new(schema.arity(), schema.attrs.len()),
245        }
246    }
247
248    /// Push a row with full bitemporal metadata.
249    ///
250    /// Sentinel rows (`Tombstone`, `GdprErased`) may have an empty `attrs`
251    /// slice — the attribute columns receive no entry for these rows, which
252    /// is correct because sentinel rows carry no payload.
253    pub fn push_row(&mut self, row: SparseRow<'_>) -> ArrayResult<()> {
254        let SparseRow {
255            coord,
256            attrs,
257            surrogate,
258            valid_from_ms,
259            valid_until_ms,
260            kind,
261        } = row;
262        if coord.len() != self.schema.arity() {
263            return Err(ArrayError::CoordArityMismatch {
264                array: self.schema.name.clone(),
265                expected: self.schema.arity(),
266                got: coord.len(),
267            });
268        }
269        // Sentinel rows carry no payload — allow empty attrs. Live rows must
270        // match the schema attr count exactly.
271        if kind == RowKind::Live && attrs.len() != self.schema.attrs.len() {
272            return Err(ArrayError::CellTypeMismatch {
273                array: self.schema.name.clone(),
274                attr: "<all>".to_string(),
275                detail: format!(
276                    "expected {} attr columns, got {}",
277                    self.schema.attrs.len(),
278                    attrs.len()
279                ),
280            });
281        }
282        for (i, c) in coord.iter().enumerate() {
283            self.dim_dicts[i].push(c);
284        }
285        // For sentinel rows `attrs` is empty — no entries added to attr_cols.
286        // For live rows, every attr col gets one entry.
287        for (i, a) in attrs.iter().enumerate() {
288            self.attr_cols[i].push(a.clone());
289        }
290        self.surrogates.push(surrogate);
291        self.valid_from_ms.push(valid_from_ms);
292        self.valid_until_ms.push(valid_until_ms);
293        self.row_kinds.push(kind.as_u8());
294        // Only fold MBR for live rows; sentinel rows have no meaningful attrs
295        // and may have coords outside the declared domain.
296        if kind == RowKind::Live {
297            self.mbr.fold(coord, attrs);
298        }
299        Ok(())
300    }
301
302    /// Push a live row whose surrogate is unknown to the caller (recovery,
303    /// pure-shape ops). The slot is filled with [`Surrogate::ZERO`];
304    /// callers that have a real surrogate should use [`Self::push_row`].
305    pub fn push(&mut self, coord: &[CoordValue], attrs: &[CellValue]) -> ArrayResult<()> {
306        self.push_row(SparseRow {
307            coord,
308            attrs,
309            surrogate: Surrogate::ZERO,
310            valid_from_ms: 0,
311            valid_until_ms: OPEN_UPPER,
312            kind: RowKind::Live,
313        })
314    }
315
316    pub fn build(self) -> SparseTile {
317        SparseTile {
318            dim_dicts: self.dim_dicts,
319            attr_cols: self.attr_cols,
320            surrogates: self.surrogates,
321            valid_from_ms: self.valid_from_ms,
322            valid_until_ms: self.valid_until_ms,
323            row_kinds: self.row_kinds,
324            mbr: self.mbr.build(),
325        }
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::schema::ArraySchemaBuilder;
333    use crate::schema::attr_spec::{AttrSpec, AttrType};
334    use crate::schema::dim_spec::{DimSpec, DimType};
335    use crate::types::domain::{Domain, DomainBound};
336
337    fn schema() -> ArraySchema {
338        ArraySchemaBuilder::new("g")
339            .dim(DimSpec::new(
340                "chrom",
341                DimType::Int64,
342                Domain::new(DomainBound::Int64(0), DomainBound::Int64(24)),
343            ))
344            .dim(DimSpec::new(
345                "pos",
346                DimType::Int64,
347                Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
348            ))
349            .attr(AttrSpec::new("variant", AttrType::String, false))
350            .attr(AttrSpec::new("qual", AttrType::Float64, true))
351            .tile_extents(vec![1, 1_000_000])
352            .build()
353            .unwrap()
354    }
355
356    #[test]
357    fn sparse_tile_collects_columns_in_order() {
358        let s = schema();
359        let mut b = SparseTileBuilder::new(&s);
360        b.push(
361            &[CoordValue::Int64(1), CoordValue::Int64(100)],
362            &[CellValue::String("ALT".into()), CellValue::Float64(40.0)],
363        )
364        .unwrap();
365        b.push(
366            &[CoordValue::Int64(1), CoordValue::Int64(200)],
367            &[CellValue::String("REF".into()), CellValue::Float64(50.0)],
368        )
369        .unwrap();
370        let t = b.build();
371        assert_eq!(t.nnz(), 2);
372        assert_eq!(t.attr_cols.len(), 2);
373        assert_eq!(t.attr_cols[0].len(), 2);
374        assert_eq!(t.attr_cols[1].len(), 2);
375    }
376
377    #[test]
378    fn dim_dict_dedups_repeated_values() {
379        let s = schema();
380        let mut b = SparseTileBuilder::new(&s);
381        b.push(
382            &[CoordValue::Int64(1), CoordValue::Int64(100)],
383            &[CellValue::String("ALT".into()), CellValue::Float64(40.0)],
384        )
385        .unwrap();
386        b.push(
387            &[CoordValue::Int64(1), CoordValue::Int64(200)],
388            &[CellValue::String("REF".into()), CellValue::Float64(50.0)],
389        )
390        .unwrap();
391        let t = b.build();
392        // chrom dictionary should have one entry (1 repeated).
393        assert_eq!(t.dim_dicts[0].cardinality(), 1);
394        assert_eq!(t.dim_dicts[0].len(), 2);
395    }
396
397    #[test]
398    fn sparse_tile_rejects_bad_arity() {
399        let s = schema();
400        let mut b = SparseTileBuilder::new(&s);
401        let r = b.push(
402            &[CoordValue::Int64(1)],
403            &[CellValue::String("ALT".into()), CellValue::Float64(0.0)],
404        );
405        assert!(r.is_err());
406    }
407
408    #[test]
409    fn sparse_tile_rejects_bad_attr_count() {
410        let s = schema();
411        let mut b = SparseTileBuilder::new(&s);
412        let r = b.push(
413            &[CoordValue::Int64(1), CoordValue::Int64(0)],
414            &[CellValue::String("ALT".into())],
415        );
416        assert!(r.is_err());
417    }
418
419    #[test]
420    fn empty_sparse_tile_has_zero_nnz() {
421        let s = schema();
422        let t = SparseTile::empty(&s);
423        assert_eq!(t.nnz(), 0);
424        assert_eq!(t.dim_dicts.len(), 2);
425        assert_eq!(t.attr_cols.len(), 2);
426    }
427
428    #[test]
429    fn sparse_tile_msgpack_roundtrip_carries_valid_time_bounds() {
430        let s = schema();
431        let mut b = SparseTileBuilder::new(&s);
432        b.push_row(SparseRow {
433            coord: &[CoordValue::Int64(1), CoordValue::Int64(100)],
434            attrs: &[CellValue::String("A".into()), CellValue::Float64(1.0)],
435            surrogate: nodedb_types::Surrogate::ZERO,
436            valid_from_ms: 50,
437            valid_until_ms: 150,
438            kind: RowKind::Live,
439        })
440        .unwrap();
441        b.push_row(SparseRow {
442            coord: &[CoordValue::Int64(2), CoordValue::Int64(200)],
443            attrs: &[CellValue::String("B".into()), CellValue::Float64(2.0)],
444            surrogate: nodedb_types::Surrogate::ZERO,
445            valid_from_ms: 300,
446            valid_until_ms: 900,
447            kind: RowKind::Live,
448        })
449        .unwrap();
450        let tile = b.build();
451
452        let bytes = zerompk::to_msgpack_vec(&tile).unwrap();
453        let decoded: SparseTile = zerompk::from_msgpack(&bytes).unwrap();
454
455        assert_eq!(decoded.valid_from_ms, vec![50, 300]);
456        assert_eq!(decoded.valid_until_ms, vec![150, 900]);
457        assert_eq!(decoded.nnz(), 2);
458    }
459
460    #[test]
461    fn sparse_tile_msgpack_roundtrip_carries_row_kinds() {
462        let s = schema();
463        let mut b = SparseTileBuilder::new(&s);
464        // Live row
465        b.push_row(SparseRow {
466            coord: &[CoordValue::Int64(1), CoordValue::Int64(10)],
467            attrs: &[CellValue::String("X".into()), CellValue::Float64(1.0)],
468            surrogate: nodedb_types::Surrogate::ZERO,
469            valid_from_ms: 0,
470            valid_until_ms: OPEN_UPPER,
471            kind: RowKind::Live,
472        })
473        .unwrap();
474        // Tombstone row (no attrs)
475        b.push_row(SparseRow {
476            coord: &[CoordValue::Int64(2), CoordValue::Int64(20)],
477            attrs: &[],
478            surrogate: nodedb_types::Surrogate::ZERO,
479            valid_from_ms: 0,
480            valid_until_ms: OPEN_UPPER,
481            kind: RowKind::Tombstone,
482        })
483        .unwrap();
484        // GdprErased row (no attrs)
485        b.push_row(SparseRow {
486            coord: &[CoordValue::Int64(3), CoordValue::Int64(30)],
487            attrs: &[],
488            surrogate: nodedb_types::Surrogate::ZERO,
489            valid_from_ms: 0,
490            valid_until_ms: OPEN_UPPER,
491            kind: RowKind::GdprErased,
492        })
493        .unwrap();
494        let tile = b.build();
495
496        let bytes = zerompk::to_msgpack_vec(&tile).unwrap();
497        let decoded: SparseTile = zerompk::from_msgpack(&bytes).unwrap();
498
499        assert_eq!(decoded.row_kinds.len(), 3);
500        assert_eq!(
501            RowKind::from_u8(decoded.row_kinds[0]).unwrap(),
502            RowKind::Live
503        );
504        assert_eq!(
505            RowKind::from_u8(decoded.row_kinds[1]).unwrap(),
506            RowKind::Tombstone
507        );
508        assert_eq!(
509            RowKind::from_u8(decoded.row_kinds[2]).unwrap(),
510            RowKind::GdprErased
511        );
512    }
513}