Skip to main content

nodedb_array/segment/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Streaming `SegmentWriter` — appends tile payloads, finalises with footer.
4//!
5//! Layout: `[ HEADER | framed_tile_0 | framed_tile_1 | ... | footer_block | trailer ]`.
6//! Each tile is encoded as zerompk + framed (length + CRC). Tile
7//! offsets and MBRs are recorded in-memory and flushed in the footer.
8
9use super::format::{
10    SegmentFooter, SegmentHeader, TileEntry, TileKind, framing::BlockFraming, header::HEADER_SIZE,
11};
12use crate::error::{ArrayError, ArrayResult};
13use crate::tile::dense_tile::DenseTile;
14use crate::tile::sparse_tile::SparseTile;
15use crate::types::TileId;
16
17pub struct SegmentWriter {
18    schema_hash: u64,
19    buf: Vec<u8>,
20    entries: Vec<TileEntry>,
21    finished: bool,
22}
23
24impl SegmentWriter {
25    pub fn new(schema_hash: u64) -> Self {
26        let mut buf = Vec::new();
27        SegmentHeader::new(schema_hash).encode_to(&mut buf);
28        Self {
29            schema_hash,
30            buf,
31            entries: Vec::new(),
32            finished: false,
33        }
34    }
35
36    pub fn append_sparse(&mut self, tile_id: TileId, tile: &SparseTile) -> ArrayResult<()> {
37        let mut payload = Vec::new();
38        crate::codec::tile_encode::encode_sparse_tile(tile, &mut payload)?;
39        self.append_framed(tile_id, TileKind::Sparse, &payload, tile.mbr.clone())
40    }
41
42    pub fn append_dense(&mut self, tile_id: TileId, tile: &DenseTile) -> ArrayResult<()> {
43        let payload = zerompk::to_msgpack_vec(tile).map_err(|e| ArrayError::SegmentCorruption {
44            detail: format!("dense tile encode failed: {e}"),
45        })?;
46        self.append_framed(tile_id, TileKind::Dense, &payload, tile.mbr.clone())
47    }
48
49    fn append_framed(
50        &mut self,
51        tile_id: TileId,
52        kind: TileKind,
53        payload: &[u8],
54        mbr: crate::tile::mbr::TileMBR,
55    ) -> ArrayResult<()> {
56        if let Some(last) = self.entries.last()
57            && tile_id <= last.tile_id
58        {
59            return Err(ArrayError::SegmentCorruption {
60                detail: format!(
61                    "tile IDs not strictly monotonic ascending: \
62                     prev=({},{}) new=({},{})",
63                    last.tile_id.hilbert_prefix,
64                    last.tile_id.system_from_ms,
65                    tile_id.hilbert_prefix,
66                    tile_id.system_from_ms,
67                ),
68            });
69        }
70        let offset = self.buf.len() as u64;
71        let framed_len = BlockFraming::encode(payload, &mut self.buf);
72        self.entries.push(TileEntry::new(
73            tile_id,
74            kind,
75            offset,
76            framed_len as u32,
77            mbr,
78        ));
79        Ok(())
80    }
81
82    pub fn tile_count(&self) -> usize {
83        self.entries.len()
84    }
85
86    /// Finalise the segment by appending the footer + trailer. Consumes
87    /// the writer and returns the complete byte vector.
88    ///
89    /// When `kek` is `Some`, the assembled plaintext buffer is wrapped in an
90    /// AES-256-GCM `SEGA` envelope before being returned. When `None`, the
91    /// raw `NDAS` segment bytes are returned.
92    pub fn finish(
93        mut self,
94        kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
95    ) -> ArrayResult<Vec<u8>> {
96        if self.finished {
97            return Err(ArrayError::SegmentCorruption {
98                detail: "SegmentWriter::finish called twice".into(),
99            });
100        }
101        let footer = SegmentFooter::new(self.schema_hash, std::mem::take(&mut self.entries));
102        footer.encode_to(&mut self.buf)?;
103        self.finished = true;
104
105        if let Some(key) = kek {
106            return super::encrypt::encrypt_segment(key, &self.buf);
107        }
108
109        Ok(self.buf)
110    }
111
112    pub fn header_size(&self) -> usize {
113        HEADER_SIZE
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::schema::ArraySchemaBuilder;
121    use crate::schema::attr_spec::{AttrSpec, AttrType};
122    use crate::schema::dim_spec::{DimSpec, DimType};
123    use crate::tile::sparse_tile::SparseTileBuilder;
124    use crate::types::cell_value::value::CellValue;
125    use crate::types::coord::value::CoordValue;
126    use crate::types::domain::{Domain, DomainBound};
127
128    /// Finish a writer without encryption — convenience for existing tests.
129    fn finish_plain(w: SegmentWriter) -> crate::error::ArrayResult<Vec<u8>> {
130        w.finish(None)
131    }
132
133    fn schema() -> crate::schema::ArraySchema {
134        ArraySchemaBuilder::new("g")
135            .dim(DimSpec::new(
136                "x",
137                DimType::Int64,
138                Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
139            ))
140            .dim(DimSpec::new(
141                "y",
142                DimType::Int64,
143                Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
144            ))
145            .attr(AttrSpec::new("v", AttrType::Int64, true))
146            .tile_extents(vec![4, 4])
147            .build()
148            .unwrap()
149    }
150
151    fn sparse_tile(s: &crate::schema::ArraySchema) -> SparseTile {
152        let mut b = SparseTileBuilder::new(s);
153        b.push(
154            &[CoordValue::Int64(1), CoordValue::Int64(2)],
155            &[CellValue::Int64(10)],
156        )
157        .unwrap();
158        b.push(
159            &[CoordValue::Int64(3), CoordValue::Int64(0)],
160            &[CellValue::Int64(20)],
161        )
162        .unwrap();
163        b.build()
164    }
165
166    #[test]
167    fn writer_round_trip_via_footer() {
168        let s = schema();
169        let mut w = SegmentWriter::new(0x1234);
170        w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
171            .unwrap();
172        w.append_sparse(TileId::snapshot(2), &sparse_tile(&s))
173            .unwrap();
174        let bytes = finish_plain(w).unwrap();
175        let footer = SegmentFooter::decode(&bytes).unwrap();
176        assert_eq!(footer.schema_hash, 0x1234);
177        assert_eq!(footer.tiles.len(), 2);
178        assert_eq!(footer.tiles[0].tile_id.hilbert_prefix, 1);
179        assert_eq!(footer.tiles[1].tile_id.hilbert_prefix, 2);
180    }
181
182    #[test]
183    fn writer_emits_header_first() {
184        let w = SegmentWriter::new(0xAA);
185        let bytes = finish_plain(w).unwrap();
186        let h = SegmentHeader::decode(&bytes).unwrap();
187        assert_eq!(h.schema_hash, 0xAA);
188    }
189
190    #[test]
191    fn writer_rejects_non_monotonic_tile_ids() {
192        let s = schema();
193        let mut w = SegmentWriter::new(0x1234);
194        w.append_sparse(TileId::new(2, 100), &sparse_tile(&s))
195            .unwrap();
196        let err = w
197            .append_sparse(TileId::new(1, 100), &sparse_tile(&s))
198            .unwrap_err();
199        assert!(
200            matches!(err, crate::error::ArrayError::SegmentCorruption { .. }),
201            "expected SegmentCorruption, got {err:?}"
202        );
203    }
204
205    #[test]
206    fn writer_allows_same_prefix_different_system_times() {
207        let s = schema();
208        let mut w = SegmentWriter::new(0x1234);
209        w.append_sparse(TileId::new(5, 100), &sparse_tile(&s))
210            .unwrap();
211        w.append_sparse(TileId::new(5, 200), &sparse_tile(&s))
212            .unwrap();
213        let bytes = finish_plain(w).unwrap();
214        let footer = SegmentFooter::decode(&bytes).unwrap();
215        assert_eq!(footer.tiles.len(), 2);
216        assert_eq!(footer.tiles[0].tile_id, TileId::new(5, 100));
217        assert_eq!(footer.tiles[1].tile_id, TileId::new(5, 200));
218    }
219
220    fn test_kek() -> nodedb_wal::crypto::WalEncryptionKey {
221        nodedb_wal::crypto::WalEncryptionKey::from_bytes(&[0x42u8; 32]).unwrap()
222    }
223
224    #[test]
225    fn array_segment_encrypted_at_rest() {
226        let s = schema();
227        let mut w = SegmentWriter::new(0xCAFE);
228        w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
229            .unwrap();
230        let kek = test_kek();
231        let bytes = w.finish(Some(&kek)).unwrap();
232        // Output must start with SEGA, not NDAS.
233        assert_eq!(&bytes[..4], b"SEGA");
234        // Must not start with plaintext NDAS magic.
235        assert_ne!(&bytes[..4], b"NDAS");
236    }
237
238    #[test]
239    fn array_segment_refuses_plaintext_with_kek() {
240        // Write an encrypted segment, then verify the reader rejects it
241        // when presented without a KEK.
242        let s = schema();
243        let mut w = SegmentWriter::new(0xBEEF);
244        w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
245            .unwrap();
246        let kek = test_kek();
247        let encrypted = w.finish(Some(&kek)).unwrap();
248        // Attempt to open without KEK — must return MissingKek.
249        let err = super::super::reader::SegmentReader::open_with_kek(&encrypted, None).unwrap_err();
250        assert!(
251            matches!(err, crate::error::ArrayError::MissingKek),
252            "expected MissingKek, got {err:?}"
253        );
254    }
255
256    #[test]
257    fn array_segment_refuses_encrypted_without_kek() {
258        // Write a plaintext segment, then verify the reader rejects it
259        // when presented with a KEK (encryption enforcement).
260        let s = schema();
261        let mut w = SegmentWriter::new(0xDEAD);
262        w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
263            .unwrap();
264        let plaintext = w.finish(None).unwrap();
265        let kek = test_kek();
266        // Attempt to open plaintext with KEK — must return KekRequired.
267        let err =
268            super::super::reader::SegmentReader::open_with_kek(&plaintext, Some(&kek)).unwrap_err();
269        assert!(
270            matches!(err, crate::error::ArrayError::KekRequired),
271            "expected KekRequired, got {err:?}"
272        );
273    }
274
275    #[test]
276    fn array_segment_handle_decrypts_into_owned_buffer() {
277        // Encrypt a multi-tile segment and round-trip it through the full
278        // SegmentReader::open_with_kek path.
279        let s = schema();
280        let mut w = SegmentWriter::new(0x1234);
281        w.append_sparse(TileId::new(1, 100), &sparse_tile(&s))
282            .unwrap();
283        w.append_sparse(TileId::new(2, 200), &sparse_tile(&s))
284            .unwrap();
285        let kek = test_kek();
286        let encrypted = w.finish(Some(&kek)).unwrap();
287        let owned = super::super::reader::OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek))
288            .unwrap();
289        let reader = owned.reader();
290        assert_eq!(reader.tile_count(), 2);
291    }
292
293    #[test]
294    fn array_segment_encrypted_roundtrip_multiple_tiles() {
295        let s = schema();
296        let mut w = SegmentWriter::new(0xABCD);
297        for i in 1u64..=5 {
298            w.append_sparse(TileId::new(i, i as i64 * 100), &sparse_tile(&s))
299                .unwrap();
300        }
301        let kek = test_kek();
302        let encrypted = w.finish(Some(&kek)).unwrap();
303        let owned = super::super::reader::OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek))
304            .unwrap();
305        let reader = owned.reader();
306        assert_eq!(reader.tile_count(), 5);
307        for idx in 0..5 {
308            let tile = reader.read_tile(idx).unwrap();
309            assert!(matches!(tile, super::super::reader::TilePayload::Sparse(_)));
310        }
311    }
312}