Skip to main content

nodedb_array/segment/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! `SegmentReader` — zero-copy view over segment bytes.
4//!
5//! Designed to wrap an `mmap`'d file in production. The reader only
6//! borrows the slice and lazily decodes tile payloads on demand; the
7//! footer is decoded eagerly at construction so subsequent tile lookups
8//! are bounded reads + zerompk decode (no rescan).
9
10use super::format::{
11    SegmentFooter, SegmentHeader, TileEntry, TileKind, framing::BlockFraming, header::HEADER_SIZE,
12};
13use crate::codec::tag::peek_tag;
14use crate::codec::tile_decode::decode_sparse_tile;
15use crate::error::{ArrayError, ArrayResult};
16use crate::tile::cell_payload::{CELL_GDPR_ERASURE_SENTINEL, CELL_TOMBSTONE_SENTINEL, CellPayload};
17use crate::tile::dense_tile::DenseTile;
18use crate::tile::sparse_tile::{RowKind, SparseTile};
19use crate::types::coord::value::CoordValue;
20use nodedb_types::Surrogate;
21
22/// Extract the encoded `CellPayload` bytes for a specific `coord` from a
23/// `SparseTile`.
24///
25/// Returns `None` when the coord is not present in the tile. Encodes the
26/// matched row's attrs + valid-time bounds back into the `CellPayload` wire
27/// format so the bytes can be fed directly to
28/// [`crate::query::ceiling::ceiling_resolve_cell`].
29///
30/// The returned `Vec<u8>` is always freshly allocated (no lifetime coupling to
31/// the tile).
32pub fn extract_cell_bytes(tile: &SparseTile, coord: &[CoordValue]) -> ArrayResult<Option<Vec<u8>>> {
33    let n = tile.row_count();
34    'rows: for row in 0..n {
35        // Reconstruct the coordinate for this row.
36        for (dim_idx, c) in coord.iter().enumerate() {
37            let dict =
38                tile.dim_dicts
39                    .get(dim_idx)
40                    .ok_or_else(|| ArrayError::SegmentCorruption {
41                        detail: format!("extract_cell_bytes: dim {dim_idx} out of range"),
42                    })?;
43            let entry_idx = *dict
44                .indices
45                .get(row)
46                .ok_or_else(|| ArrayError::SegmentCorruption {
47                    detail: format!("extract_cell_bytes: row {row} index out of range"),
48                })? as usize;
49            let stored =
50                dict.values
51                    .get(entry_idx)
52                    .ok_or_else(|| ArrayError::SegmentCorruption {
53                        detail: format!("extract_cell_bytes: dict entry {entry_idx} out of range"),
54                    })?;
55            if stored != c {
56                continue 'rows;
57            }
58        }
59        // Coord matched — check row kind before decoding payload.
60        let kind = tile.row_kind(row)?;
61        match kind {
62            RowKind::Tombstone => return Ok(Some(CELL_TOMBSTONE_SENTINEL.to_vec())),
63            RowKind::GdprErased => return Ok(Some(CELL_GDPR_ERASURE_SENTINEL.to_vec())),
64            RowKind::Live => {}
65        }
66        // Live row — build attrs from all attr columns.
67        let attrs: Vec<_> = tile
68            .attr_cols
69            .iter()
70            .map(|col| {
71                col.get(row)
72                    .cloned()
73                    .ok_or_else(|| ArrayError::SegmentCorruption {
74                        detail: format!("extract_cell_bytes: attr col row {row} out of range"),
75                    })
76            })
77            .collect::<ArrayResult<Vec<_>>>()?;
78        let surrogate = tile.surrogates.get(row).copied().unwrap_or(Surrogate::ZERO);
79        let valid_from_ms =
80            tile.valid_from_ms
81                .get(row)
82                .copied()
83                .ok_or_else(|| ArrayError::SegmentCorruption {
84                    detail: format!("extract_cell_bytes: valid_from_ms row {row} out of range"),
85                })?;
86        let valid_until_ms =
87            tile.valid_until_ms
88                .get(row)
89                .copied()
90                .ok_or_else(|| ArrayError::SegmentCorruption {
91                    detail: format!("extract_cell_bytes: valid_until_ms row {row} out of range"),
92                })?;
93        let payload = CellPayload {
94            valid_from_ms,
95            valid_until_ms,
96            attrs,
97            surrogate,
98        };
99        return payload.encode().map(Some);
100    }
101    Ok(None)
102}
103
104/// Dispatch sparse tile decoding by peeking the tag byte.
105///
106/// - Recognized tag (0 or 1): delegate to `decode_sparse_tile`.
107/// - Empty payload or unrecognized byte: return a corruption error.
108fn read_sparse_tile(payload: &[u8]) -> ArrayResult<SparseTile> {
109    match peek_tag(payload) {
110        Some(_) => decode_sparse_tile(payload),
111        None => Err(ArrayError::SegmentCorruption {
112            detail: format!(
113                "sparse tile has unrecognized tag byte {:#04x}",
114                payload.first().copied().unwrap_or(0)
115            ),
116        }),
117    }
118}
119
120/// Decoded tile payload.
121#[derive(Debug, Clone, PartialEq)]
122pub enum TilePayload {
123    Sparse(SparseTile),
124    Dense(DenseTile),
125}
126
127pub struct SegmentReader<'a> {
128    bytes: &'a [u8],
129    header: SegmentHeader,
130    footer: SegmentFooter,
131}
132
133impl<'a> SegmentReader<'a> {
134    pub fn open(bytes: &'a [u8]) -> ArrayResult<Self> {
135        if bytes.len() < HEADER_SIZE {
136            return Err(ArrayError::SegmentCorruption {
137                detail: format!("segment too small: {} bytes", bytes.len()),
138            });
139        }
140        let header = SegmentHeader::decode(&bytes[..HEADER_SIZE])?;
141        let footer = SegmentFooter::decode(bytes)?;
142        if header.schema_hash != footer.schema_hash {
143            return Err(ArrayError::SegmentCorruption {
144                detail: format!(
145                    "header/footer schema_hash mismatch: header={:x} footer={:x}",
146                    header.schema_hash, footer.schema_hash
147                ),
148            });
149        }
150        Ok(Self {
151            bytes,
152            header,
153            footer,
154        })
155    }
156
157    pub fn header(&self) -> &SegmentHeader {
158        &self.header
159    }
160
161    pub fn schema_hash(&self) -> u64 {
162        self.header.schema_hash
163    }
164
165    pub fn tiles(&self) -> &[TileEntry] {
166        &self.footer.tiles
167    }
168
169    pub fn tile_count(&self) -> usize {
170        self.footer.tiles.len()
171    }
172
173    /// Reverse-scan tile entries with the given `hilbert_prefix` and
174    /// `system_from_ms <= system_as_of`, returning the newest qualifying
175    /// tile version.
176    ///
177    /// Returns `Ok(None)` if no version exists at or before the cutoff.
178    ///
179    /// The `valid_at_ms` parameter is reserved for the query layer (Tier
180    /// 9.2). At the reader level, valid-time filtering is NOT applied —
181    /// the whole tile is returned. This keeps the reader cell-shape-agnostic.
182    pub fn read_tile_as_of(
183        &self,
184        hilbert_prefix: u64,
185        system_as_of: i64,
186        _valid_at_ms: Option<i64>,
187    ) -> ArrayResult<Option<TilePayload>> {
188        let tiles = &self.footer.tiles;
189
190        // Binary search for the first entry with hilbert_prefix.
191        let first = tiles.partition_point(|e| e.tile_id.hilbert_prefix < hilbert_prefix);
192        // Binary search for the first entry past the range:
193        // hilbert_prefix matches and system_from_ms <= system_as_of.
194        // Upper bound: first entry where prefix > hilbert_prefix.
195        let past_prefix = tiles.partition_point(|e| e.tile_id.hilbert_prefix <= hilbert_prefix);
196
197        // Slice of entries with matching prefix.
198        let candidates = &tiles[first..past_prefix];
199        if candidates.is_empty() {
200            return Ok(None);
201        }
202
203        // Within the prefix slice, entries are ordered by system_from_ms ascending.
204        // Find the rightmost entry with system_from_ms <= system_as_of.
205        let cutoff_pos = candidates.partition_point(|e| e.tile_id.system_from_ms <= system_as_of);
206        if cutoff_pos == 0 {
207            return Ok(None);
208        }
209
210        // The entry at cutoff_pos - 1 is the newest qualifying version.
211        let entry_idx = first + cutoff_pos - 1;
212        self.read_tile(entry_idx).map(Some)
213    }
214
215    /// Returns an iterator over all tile versions for `hilbert_prefix` whose
216    /// `system_from_ms <= system_as_of`, ordered **newest-first** by
217    /// `system_from_ms`.
218    ///
219    /// Callers supply this to [`nodedb_array::query::ceiling`] to resolve the
220    /// bitemporal ceiling for every coordinate in the prefix.
221    pub fn iter_tile_versions(
222        &self,
223        hilbert_prefix: u64,
224        system_as_of: i64,
225    ) -> ArrayResult<impl Iterator<Item = ArrayResult<(crate::types::TileId, TilePayload)>> + '_>
226    {
227        let tiles = &self.footer.tiles;
228
229        // Find the contiguous slice of entries whose hilbert_prefix matches.
230        let first = tiles.partition_point(|e| e.tile_id.hilbert_prefix < hilbert_prefix);
231        let past_prefix = tiles.partition_point(|e| e.tile_id.hilbert_prefix <= hilbert_prefix);
232
233        // Within [first..past_prefix], entries are ascending by system_from_ms.
234        // Restrict to those at or before the cutoff.
235        let cutoff_pos =
236            tiles[first..past_prefix].partition_point(|e| e.tile_id.system_from_ms <= system_as_of);
237        // Global index range: [first .. first+cutoff_pos).
238        let qualifying_start = first;
239        let qualifying_end = first + cutoff_pos;
240
241        // Iterate in reverse (newest-first).
242        let indices: Vec<usize> = (qualifying_start..qualifying_end).rev().collect();
243        Ok(indices.into_iter().map(move |idx| {
244            let tile_id = tiles[idx].tile_id;
245            self.read_tile(idx).map(|payload| (tile_id, payload))
246        }))
247    }
248
249    /// Decode tile #`idx`. CRC is checked by the framing layer.
250    pub fn read_tile(&self, idx: usize) -> ArrayResult<TilePayload> {
251        let entry = self
252            .footer
253            .tiles
254            .get(idx)
255            .ok_or_else(|| ArrayError::SegmentCorruption {
256                detail: format!(
257                    "tile index {idx} out of range (have {})",
258                    self.footer.tiles.len()
259                ),
260            })?;
261        let off = entry.offset as usize;
262        let len = entry.length as usize;
263        let end = off
264            .checked_add(len)
265            .ok_or_else(|| ArrayError::SegmentCorruption {
266                detail: "tile entry offset+length overflows".into(),
267            })?;
268        if end > self.bytes.len() {
269            return Err(ArrayError::SegmentCorruption {
270                detail: format!(
271                    "tile {idx} block out of bounds: off={off} len={len} \
272                     file_size={}",
273                    self.bytes.len()
274                ),
275            });
276        }
277        let (payload, _) = BlockFraming::decode(&self.bytes[off..end])?;
278        match entry.kind {
279            TileKind::Sparse => {
280                let t = read_sparse_tile(payload)?;
281                Ok(TilePayload::Sparse(t))
282            }
283            TileKind::Dense => {
284                let t: DenseTile =
285                    zerompk::from_msgpack(payload).map_err(|e| ArrayError::SegmentCorruption {
286                        detail: format!("dense tile decode failed: {e}"),
287                    })?;
288                Ok(TilePayload::Dense(t))
289            }
290        }
291    }
292}
293
294/// An owned segment reader that holds decrypted segment bytes.
295///
296/// Exists because `SegmentReader<'a>` borrows its byte slice; for
297/// encrypted segments the plaintext lives in a `Vec<u8>` that must
298/// outlive the reader. `OwnedSegmentReader` ties the two together.
299#[derive(Debug)]
300pub struct OwnedSegmentReader {
301    /// Decrypted plaintext segment bytes.
302    plaintext: Vec<u8>,
303    header: SegmentHeader,
304    footer: SegmentFooter,
305}
306
307impl OwnedSegmentReader {
308    /// Open a segment with optional at-rest decryption.
309    ///
310    /// - `kek = None` → requires a plaintext (`NDAS`) segment; returns
311    ///   `Err(MissingKek)` if the blob starts with `SEGA`.
312    /// - `kek = Some(key)` → requires an encrypted (`SEGA`) segment; decrypts
313    ///   the blob, then parses the inner plaintext. Returns `Err(KekRequired)`
314    ///   if the blob starts with `NDAS`.
315    pub fn open_with_kek(
316        blob: &[u8],
317        kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
318    ) -> ArrayResult<Self> {
319        use super::encrypt::{decrypt_segment, detect_encryption};
320        let is_encrypted = detect_encryption(blob)?;
321        let plaintext = match (is_encrypted, kek) {
322            (true, Some(key)) => decrypt_segment(key, blob)?,
323            (true, None) => return Err(ArrayError::MissingKek),
324            (false, Some(_)) => return Err(ArrayError::KekRequired),
325            (false, None) => blob.to_vec(),
326        };
327        let header = SegmentHeader::decode(&plaintext[..HEADER_SIZE.min(plaintext.len())])?;
328        let footer = SegmentFooter::decode(&plaintext)?;
329        if header.schema_hash != footer.schema_hash {
330            return Err(ArrayError::SegmentCorruption {
331                detail: format!(
332                    "header/footer schema_hash mismatch: header={:x} footer={:x}",
333                    header.schema_hash, footer.schema_hash
334                ),
335            });
336        }
337        Ok(Self {
338            plaintext,
339            header,
340            footer,
341        })
342    }
343
344    /// Borrow a `SegmentReader` over the owned plaintext bytes.
345    pub fn reader(&self) -> SegmentReader<'_> {
346        SegmentReader {
347            bytes: &self.plaintext,
348            header: self.header,
349            footer: self.footer.clone(),
350        }
351    }
352
353    /// Consume the owned reader and return the inner plaintext buffer.
354    pub fn into_plaintext(self) -> Vec<u8> {
355        self.plaintext
356    }
357}
358
359impl<'a> SegmentReader<'a> {
360    /// Validate a segment blob with optional KEK, returning `Err` on mismatch.
361    ///
362    /// - `kek = None` + encrypted blob → `Err(MissingKek)`
363    /// - `kek = Some` + plaintext blob → `Err(KekRequired)`
364    ///
365    /// On success the segment is fully parsed but the owned bytes are
366    /// discarded. Use [`OwnedSegmentReader`] when you need to keep the reader.
367    pub fn open_with_kek(
368        blob: &[u8],
369        kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
370    ) -> ArrayResult<OwnedSegmentReader> {
371        OwnedSegmentReader::open_with_kek(blob, kek)
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use crate::schema::ArraySchemaBuilder;
379    use crate::schema::attr_spec::{AttrSpec, AttrType};
380    use crate::schema::dim_spec::{DimSpec, DimType};
381    use crate::segment::writer::SegmentWriter;
382    use crate::tile::dense_tile::DenseTile;
383    use crate::tile::sparse_tile::SparseTileBuilder;
384    use crate::types::TileId;
385    use crate::types::cell_value::value::CellValue;
386    use crate::types::coord::value::CoordValue;
387    use crate::types::domain::{Domain, DomainBound};
388
389    fn schema() -> crate::schema::ArraySchema {
390        ArraySchemaBuilder::new("g")
391            .dim(DimSpec::new(
392                "x",
393                DimType::Int64,
394                Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
395            ))
396            .dim(DimSpec::new(
397                "y",
398                DimType::Int64,
399                Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
400            ))
401            .attr(AttrSpec::new("v", AttrType::Int64, true))
402            .tile_extents(vec![4, 4])
403            .build()
404            .unwrap()
405    }
406
407    fn make_sparse(s: &crate::schema::ArraySchema, base: i64) -> SparseTile {
408        let mut b = SparseTileBuilder::new(s);
409        b.push(
410            &[CoordValue::Int64(base), CoordValue::Int64(base + 1)],
411            &[CellValue::Int64(base * 10)],
412        )
413        .unwrap();
414        b.build()
415    }
416
417    #[test]
418    fn reader_round_trips_sparse_tiles() {
419        let s = schema();
420        let mut w = SegmentWriter::new(0xCAFE);
421        w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
422            .unwrap();
423        w.append_sparse(TileId::snapshot(2), &make_sparse(&s, 2))
424            .unwrap();
425        let bytes = w.finish(None).unwrap();
426        let r = SegmentReader::open(&bytes).unwrap();
427        assert_eq!(r.tile_count(), 2);
428        let t0 = r.read_tile(0).unwrap();
429        match t0 {
430            TilePayload::Sparse(t) => assert_eq!(t.nnz(), 1),
431            _ => panic!("expected sparse"),
432        }
433    }
434
435    #[test]
436    fn reader_round_trips_dense_tile() {
437        let s = schema();
438        let mut w = SegmentWriter::new(0xBEEF);
439        w.append_dense(TileId::snapshot(1), &DenseTile::empty(&s))
440            .unwrap();
441        let bytes = w.finish(None).unwrap();
442        let r = SegmentReader::open(&bytes).unwrap();
443        match r.read_tile(0).unwrap() {
444            TilePayload::Dense(t) => assert_eq!(t.cell_count(), 16),
445            _ => panic!("expected dense"),
446        }
447    }
448
449    #[test]
450    fn reader_rejects_mismatched_schema_hash() {
451        // Build a valid segment, then flip a byte in the header
452        // schema_hash and re-CRC manually-detected mismatch by way of
453        // header CRC failure. (We can't cheaply forge a valid header
454        // with a mismatched footer hash, so this exercises the header
455        // CRC path which guards the same invariant.)
456        let s = schema();
457        let mut w = SegmentWriter::new(0x1);
458        w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
459            .unwrap();
460        let mut bytes = w.finish(None).unwrap();
461        bytes[12] ^= 0xFF; // corrupt header schema_hash
462        assert!(SegmentReader::open(&bytes).is_err());
463    }
464
465    #[test]
466    fn reader_rejects_out_of_range_tile() {
467        let s = schema();
468        let mut w = SegmentWriter::new(0x1);
469        w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
470            .unwrap();
471        let bytes = w.finish(None).unwrap();
472        let r = SegmentReader::open(&bytes).unwrap();
473        assert!(r.read_tile(99).is_err());
474    }
475
476    #[test]
477    fn read_tile_as_of_returns_newest_at_cutoff() {
478        let s = schema();
479        let mut w = SegmentWriter::new(0xCAFE);
480        // Three versions of prefix=1 at system_from_ms 100, 200, 300.
481        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
482            .unwrap();
483        w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
484            .unwrap();
485        w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
486            .unwrap();
487        let bytes = w.finish(None).unwrap();
488        let r = SegmentReader::open(&bytes).unwrap();
489        // Read at cutoff 250 — between v2 (200) and v3 (300). Should return v2.
490        let result = r.read_tile_as_of(1, 250, None).unwrap();
491        match result {
492            Some(TilePayload::Sparse(t)) => {
493                // v2 was built with base=2 — one non-zero entry.
494                assert_eq!(t.nnz(), 1);
495                // dim_dicts[0] holds the x-dim dictionary; values[0] = Int64(2).
496                assert_eq!(t.dim_dicts[0].values[0], CoordValue::Int64(2));
497            }
498            other => panic!("expected Some(Sparse), got {other:?}"),
499        }
500    }
501
502    #[test]
503    fn read_tile_as_of_returns_none_below_first_version() {
504        let s = schema();
505        let mut w = SegmentWriter::new(0xBEEF);
506        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
507            .unwrap();
508        let bytes = w.finish(None).unwrap();
509        let r = SegmentReader::open(&bytes).unwrap();
510        // Read at cutoff before any version (50 < 100).
511        let result = r.read_tile_as_of(1, 50, None).unwrap();
512        assert!(result.is_none());
513    }
514
515    #[test]
516    fn extract_cell_bytes_finds_coord() {
517        let s = schema();
518        let sparse = make_sparse(&s, 3);
519        // make_sparse builds one entry at coord (base, base+1) = (3, 4).
520        let coord = vec![CoordValue::Int64(3), CoordValue::Int64(4)];
521        let bytes = extract_cell_bytes(&sparse, &coord).unwrap();
522        assert!(bytes.is_some(), "should find coord (3,4)");
523
524        let absent = vec![CoordValue::Int64(9), CoordValue::Int64(9)];
525        let none = extract_cell_bytes(&sparse, &absent).unwrap();
526        assert!(none.is_none(), "absent coord must return None");
527    }
528
529    #[test]
530    fn extract_cell_bytes_carries_valid_time_bounds() {
531        use crate::tile::cell_payload::CellPayload;
532        use crate::tile::sparse_tile::{SparseRow, SparseTileBuilder};
533        use nodedb_types::Surrogate;
534
535        let s = schema();
536        let mut b = SparseTileBuilder::new(&s);
537        b.push_row(SparseRow {
538            coord: &[CoordValue::Int64(1), CoordValue::Int64(2)],
539            attrs: &[CellValue::Int64(99)],
540            surrogate: Surrogate::ZERO,
541            valid_from_ms: 100,
542            valid_until_ms: 200,
543            kind: crate::tile::sparse_tile::RowKind::Live,
544        })
545        .unwrap();
546        let tile = b.build();
547
548        let coord = vec![CoordValue::Int64(1), CoordValue::Int64(2)];
549        let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
550        let payload = CellPayload::decode(&bytes).unwrap();
551        assert_eq!(payload.valid_from_ms, 100);
552        assert_eq!(payload.valid_until_ms, 200);
553    }
554
555    #[test]
556    fn iter_tile_versions_newest_first() {
557        let s = schema();
558        let mut w = SegmentWriter::new(0xCAFE);
559        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
560            .unwrap();
561        w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
562            .unwrap();
563        w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
564            .unwrap();
565        let bytes = w.finish(None).unwrap();
566        let r = SegmentReader::open(&bytes).unwrap();
567        let versions: Vec<_> = r
568            .iter_tile_versions(1, i64::MAX)
569            .unwrap()
570            .map(|v| v.unwrap().0.system_from_ms)
571            .collect();
572        assert_eq!(versions, vec![300, 200, 100]);
573    }
574
575    #[test]
576    fn iter_tile_versions_respects_system_as_of() {
577        let s = schema();
578        let mut w = SegmentWriter::new(0xBEEF);
579        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
580            .unwrap();
581        w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
582            .unwrap();
583        w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
584            .unwrap();
585        let bytes = w.finish(None).unwrap();
586        let r = SegmentReader::open(&bytes).unwrap();
587        let versions: Vec<_> = r
588            .iter_tile_versions(1, 250)
589            .unwrap()
590            .map(|v| v.unwrap().0.system_from_ms)
591            .collect();
592        // Cutoff 250 → v2 (200) and v1 (100); v3 (300) excluded.
593        assert_eq!(versions, vec![200, 100]);
594    }
595
596    #[test]
597    fn extract_cell_bytes_returns_tombstone_sentinel_for_tombstone_row() {
598        use crate::tile::cell_payload::{CELL_TOMBSTONE_SENTINEL, is_cell_tombstone};
599        use crate::tile::sparse_tile::{RowKind, SparseTileBuilder};
600
601        let s = schema();
602        let mut b = SparseTileBuilder::new(&s);
603        b.push_row(crate::tile::sparse_tile::SparseRow {
604            coord: &[CoordValue::Int64(7), CoordValue::Int64(8)],
605            attrs: &[],
606            surrogate: Surrogate::ZERO,
607            valid_from_ms: 0,
608            valid_until_ms: nodedb_types::OPEN_UPPER,
609            kind: RowKind::Tombstone,
610        })
611        .unwrap();
612        let tile = b.build();
613        let coord = vec![CoordValue::Int64(7), CoordValue::Int64(8)];
614        let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
615        assert!(
616            is_cell_tombstone(&bytes),
617            "expected CELL_TOMBSTONE_SENTINEL, got {bytes:?}"
618        );
619        assert_eq!(bytes, CELL_TOMBSTONE_SENTINEL);
620    }
621
622    #[test]
623    fn extract_cell_bytes_returns_erasure_sentinel_for_erased_row() {
624        use crate::tile::cell_payload::{CELL_GDPR_ERASURE_SENTINEL, is_cell_gdpr_erasure};
625        use crate::tile::sparse_tile::{RowKind, SparseTileBuilder};
626
627        let s = schema();
628        let mut b = SparseTileBuilder::new(&s);
629        b.push_row(crate::tile::sparse_tile::SparseRow {
630            coord: &[CoordValue::Int64(4), CoordValue::Int64(5)],
631            attrs: &[],
632            surrogate: Surrogate::ZERO,
633            valid_from_ms: 0,
634            valid_until_ms: nodedb_types::OPEN_UPPER,
635            kind: RowKind::GdprErased,
636        })
637        .unwrap();
638        let tile = b.build();
639        let coord = vec![CoordValue::Int64(4), CoordValue::Int64(5)];
640        let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
641        assert!(
642            is_cell_gdpr_erasure(&bytes),
643            "expected CELL_GDPR_ERASURE_SENTINEL, got {bytes:?}"
644        );
645        assert_eq!(bytes, CELL_GDPR_ERASURE_SENTINEL);
646    }
647
648    #[test]
649    fn read_tile_as_of_finds_exact_match() {
650        let s = schema();
651        let mut w = SegmentWriter::new(0xDEAD);
652        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
653            .unwrap();
654        w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
655            .unwrap();
656        w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
657            .unwrap();
658        let bytes = w.finish(None).unwrap();
659        let r = SegmentReader::open(&bytes).unwrap();
660        // Read at cutoff exactly equal to v2's system_from_ms.
661        let result = r.read_tile_as_of(1, 200, None).unwrap();
662        match result {
663            Some(TilePayload::Sparse(t)) => {
664                assert_eq!(t.nnz(), 1);
665                assert_eq!(t.dim_dicts[0].values[0], CoordValue::Int64(2));
666            }
667            other => panic!("expected Some(Sparse), got {other:?}"),
668        }
669    }
670
671    fn test_kek() -> nodedb_wal::crypto::WalEncryptionKey {
672        nodedb_wal::crypto::WalEncryptionKey::from_bytes(&[0xA1u8; 32]).unwrap()
673    }
674
675    fn write_plain(s: &crate::schema::ArraySchema, id: TileId) -> Vec<u8> {
676        let mut w = SegmentWriter::new(0xCAFE);
677        w.append_sparse(id, &make_sparse(s, 1)).unwrap();
678        w.finish(None).unwrap()
679    }
680
681    fn write_encrypted(s: &crate::schema::ArraySchema, id: TileId) -> Vec<u8> {
682        let kek = test_kek();
683        let mut w = SegmentWriter::new(0xCAFE);
684        w.append_sparse(id, &make_sparse(s, 1)).unwrap();
685        w.finish(Some(&kek)).unwrap()
686    }
687
688    #[test]
689    fn array_segment_refuses_plaintext_with_kek() {
690        let s = schema();
691        let plain = write_plain(&s, TileId::snapshot(1));
692        let kek = test_kek();
693        let err = OwnedSegmentReader::open_with_kek(&plain, Some(&kek)).unwrap_err();
694        assert!(
695            matches!(err, crate::error::ArrayError::KekRequired),
696            "expected KekRequired, got {err:?}"
697        );
698    }
699
700    #[test]
701    fn array_segment_refuses_encrypted_without_kek() {
702        let s = schema();
703        let encrypted = write_encrypted(&s, TileId::snapshot(1));
704        let err = OwnedSegmentReader::open_with_kek(&encrypted, None).unwrap_err();
705        assert!(
706            matches!(err, crate::error::ArrayError::MissingKek),
707            "expected MissingKek, got {err:?}"
708        );
709    }
710
711    #[test]
712    fn array_segment_tampered_ciphertext_rejected() {
713        let s = schema();
714        let mut encrypted = write_encrypted(&s, TileId::snapshot(1));
715        // Flip a byte after the 16-byte preamble.
716        encrypted[nodedb_wal::crypto::SEGMENT_ENVELOPE_PREAMBLE_SIZE + 2] ^= 0xFF;
717        let kek = test_kek();
718        assert!(OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).is_err());
719    }
720
721    #[test]
722    fn array_segment_encrypted_at_rest() {
723        let s = schema();
724        let encrypted = write_encrypted(&s, TileId::snapshot(1));
725        // Encrypted blob must start with SEGA.
726        assert_eq!(&encrypted[..4], b"SEGA");
727        let kek = test_kek();
728        let owned = OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).unwrap();
729        let reader = owned.reader();
730        assert_eq!(reader.tile_count(), 1);
731    }
732
733    #[test]
734    fn array_segment_handle_decrypts_into_owned_buffer() {
735        let s = schema();
736        let kek = test_kek();
737        let mut w = SegmentWriter::new(0x1234);
738        w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
739            .unwrap();
740        w.append_sparse(TileId::new(2, 200), &make_sparse(&s, 2))
741            .unwrap();
742        let encrypted = w.finish(Some(&kek)).unwrap();
743        let owned = OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).unwrap();
744        let reader = owned.reader();
745        assert_eq!(reader.tile_count(), 2);
746        assert_eq!(reader.tiles()[0].tile_id, TileId::new(1, 100));
747    }
748}