1use super::format::{
10 SegmentFooter, SegmentHeader, TileEntry, TileKind, framing::BlockFraming, header::HEADER_SIZE,
11};
12use crate::error::{ArrayError, ArrayResult};
13use crate::tile::dense_tile::DenseTile;
14use crate::tile::sparse_tile::SparseTile;
15use crate::types::TileId;
16
17pub struct SegmentWriter {
18 schema_hash: u64,
19 buf: Vec<u8>,
20 entries: Vec<TileEntry>,
21 finished: bool,
22}
23
24impl SegmentWriter {
25 pub fn new(schema_hash: u64) -> Self {
26 let mut buf = Vec::new();
27 SegmentHeader::new(schema_hash).encode_to(&mut buf);
28 Self {
29 schema_hash,
30 buf,
31 entries: Vec::new(),
32 finished: false,
33 }
34 }
35
36 pub fn append_sparse(&mut self, tile_id: TileId, tile: &SparseTile) -> ArrayResult<()> {
37 let mut payload = Vec::new();
38 crate::codec::tile_encode::encode_sparse_tile(tile, &mut payload)?;
39 self.append_framed(tile_id, TileKind::Sparse, &payload, tile.mbr.clone())
40 }
41
42 pub fn append_dense(&mut self, tile_id: TileId, tile: &DenseTile) -> ArrayResult<()> {
43 let payload = zerompk::to_msgpack_vec(tile).map_err(|e| ArrayError::SegmentCorruption {
44 detail: format!("dense tile encode failed: {e}"),
45 })?;
46 self.append_framed(tile_id, TileKind::Dense, &payload, tile.mbr.clone())
47 }
48
49 fn append_framed(
50 &mut self,
51 tile_id: TileId,
52 kind: TileKind,
53 payload: &[u8],
54 mbr: crate::tile::mbr::TileMBR,
55 ) -> ArrayResult<()> {
56 if let Some(last) = self.entries.last()
57 && tile_id <= last.tile_id
58 {
59 return Err(ArrayError::SegmentCorruption {
60 detail: format!(
61 "tile IDs not strictly monotonic ascending: \
62 prev=({},{}) new=({},{})",
63 last.tile_id.hilbert_prefix,
64 last.tile_id.system_from_ms,
65 tile_id.hilbert_prefix,
66 tile_id.system_from_ms,
67 ),
68 });
69 }
70 let offset = self.buf.len() as u64;
71 let framed_len = BlockFraming::encode(payload, &mut self.buf);
72 self.entries.push(TileEntry::new(
73 tile_id,
74 kind,
75 offset,
76 framed_len as u32,
77 mbr,
78 ));
79 Ok(())
80 }
81
82 pub fn tile_count(&self) -> usize {
83 self.entries.len()
84 }
85
86 pub fn finish(
93 mut self,
94 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
95 ) -> ArrayResult<Vec<u8>> {
96 if self.finished {
97 return Err(ArrayError::SegmentCorruption {
98 detail: "SegmentWriter::finish called twice".into(),
99 });
100 }
101 let footer = SegmentFooter::new(self.schema_hash, std::mem::take(&mut self.entries));
102 footer.encode_to(&mut self.buf)?;
103 self.finished = true;
104
105 if let Some(key) = kek {
106 return super::encrypt::encrypt_segment(key, &self.buf);
107 }
108
109 Ok(self.buf)
110 }
111
112 pub fn header_size(&self) -> usize {
113 HEADER_SIZE
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::schema::ArraySchemaBuilder;
121 use crate::schema::attr_spec::{AttrSpec, AttrType};
122 use crate::schema::dim_spec::{DimSpec, DimType};
123 use crate::tile::sparse_tile::SparseTileBuilder;
124 use crate::types::cell_value::value::CellValue;
125 use crate::types::coord::value::CoordValue;
126 use crate::types::domain::{Domain, DomainBound};
127
128 fn finish_plain(w: SegmentWriter) -> crate::error::ArrayResult<Vec<u8>> {
130 w.finish(None)
131 }
132
133 fn schema() -> crate::schema::ArraySchema {
134 ArraySchemaBuilder::new("g")
135 .dim(DimSpec::new(
136 "x",
137 DimType::Int64,
138 Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
139 ))
140 .dim(DimSpec::new(
141 "y",
142 DimType::Int64,
143 Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
144 ))
145 .attr(AttrSpec::new("v", AttrType::Int64, true))
146 .tile_extents(vec![4, 4])
147 .build()
148 .unwrap()
149 }
150
151 fn sparse_tile(s: &crate::schema::ArraySchema) -> SparseTile {
152 let mut b = SparseTileBuilder::new(s);
153 b.push(
154 &[CoordValue::Int64(1), CoordValue::Int64(2)],
155 &[CellValue::Int64(10)],
156 )
157 .unwrap();
158 b.push(
159 &[CoordValue::Int64(3), CoordValue::Int64(0)],
160 &[CellValue::Int64(20)],
161 )
162 .unwrap();
163 b.build()
164 }
165
166 #[test]
167 fn writer_round_trip_via_footer() {
168 let s = schema();
169 let mut w = SegmentWriter::new(0x1234);
170 w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
171 .unwrap();
172 w.append_sparse(TileId::snapshot(2), &sparse_tile(&s))
173 .unwrap();
174 let bytes = finish_plain(w).unwrap();
175 let footer = SegmentFooter::decode(&bytes).unwrap();
176 assert_eq!(footer.schema_hash, 0x1234);
177 assert_eq!(footer.tiles.len(), 2);
178 assert_eq!(footer.tiles[0].tile_id.hilbert_prefix, 1);
179 assert_eq!(footer.tiles[1].tile_id.hilbert_prefix, 2);
180 }
181
182 #[test]
183 fn writer_emits_header_first() {
184 let w = SegmentWriter::new(0xAA);
185 let bytes = finish_plain(w).unwrap();
186 let h = SegmentHeader::decode(&bytes).unwrap();
187 assert_eq!(h.schema_hash, 0xAA);
188 }
189
190 #[test]
191 fn writer_rejects_non_monotonic_tile_ids() {
192 let s = schema();
193 let mut w = SegmentWriter::new(0x1234);
194 w.append_sparse(TileId::new(2, 100), &sparse_tile(&s))
195 .unwrap();
196 let err = w
197 .append_sparse(TileId::new(1, 100), &sparse_tile(&s))
198 .unwrap_err();
199 assert!(
200 matches!(err, crate::error::ArrayError::SegmentCorruption { .. }),
201 "expected SegmentCorruption, got {err:?}"
202 );
203 }
204
205 #[test]
206 fn writer_allows_same_prefix_different_system_times() {
207 let s = schema();
208 let mut w = SegmentWriter::new(0x1234);
209 w.append_sparse(TileId::new(5, 100), &sparse_tile(&s))
210 .unwrap();
211 w.append_sparse(TileId::new(5, 200), &sparse_tile(&s))
212 .unwrap();
213 let bytes = finish_plain(w).unwrap();
214 let footer = SegmentFooter::decode(&bytes).unwrap();
215 assert_eq!(footer.tiles.len(), 2);
216 assert_eq!(footer.tiles[0].tile_id, TileId::new(5, 100));
217 assert_eq!(footer.tiles[1].tile_id, TileId::new(5, 200));
218 }
219
220 fn test_kek() -> nodedb_wal::crypto::WalEncryptionKey {
221 nodedb_wal::crypto::WalEncryptionKey::from_bytes(&[0x42u8; 32]).unwrap()
222 }
223
224 #[test]
225 fn array_segment_encrypted_at_rest() {
226 let s = schema();
227 let mut w = SegmentWriter::new(0xCAFE);
228 w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
229 .unwrap();
230 let kek = test_kek();
231 let bytes = w.finish(Some(&kek)).unwrap();
232 assert_eq!(&bytes[..4], b"SEGA");
234 assert_ne!(&bytes[..4], b"NDAS");
236 }
237
238 #[test]
239 fn array_segment_refuses_plaintext_with_kek() {
240 let s = schema();
243 let mut w = SegmentWriter::new(0xBEEF);
244 w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
245 .unwrap();
246 let kek = test_kek();
247 let encrypted = w.finish(Some(&kek)).unwrap();
248 let err = super::super::reader::SegmentReader::open_with_kek(&encrypted, None).unwrap_err();
250 assert!(
251 matches!(err, crate::error::ArrayError::MissingKek),
252 "expected MissingKek, got {err:?}"
253 );
254 }
255
256 #[test]
257 fn array_segment_refuses_encrypted_without_kek() {
258 let s = schema();
261 let mut w = SegmentWriter::new(0xDEAD);
262 w.append_sparse(TileId::snapshot(1), &sparse_tile(&s))
263 .unwrap();
264 let plaintext = w.finish(None).unwrap();
265 let kek = test_kek();
266 let err =
268 super::super::reader::SegmentReader::open_with_kek(&plaintext, Some(&kek)).unwrap_err();
269 assert!(
270 matches!(err, crate::error::ArrayError::KekRequired),
271 "expected KekRequired, got {err:?}"
272 );
273 }
274
275 #[test]
276 fn array_segment_handle_decrypts_into_owned_buffer() {
277 let s = schema();
280 let mut w = SegmentWriter::new(0x1234);
281 w.append_sparse(TileId::new(1, 100), &sparse_tile(&s))
282 .unwrap();
283 w.append_sparse(TileId::new(2, 200), &sparse_tile(&s))
284 .unwrap();
285 let kek = test_kek();
286 let encrypted = w.finish(Some(&kek)).unwrap();
287 let owned = super::super::reader::OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek))
288 .unwrap();
289 let reader = owned.reader();
290 assert_eq!(reader.tile_count(), 2);
291 }
292
293 #[test]
294 fn array_segment_encrypted_roundtrip_multiple_tiles() {
295 let s = schema();
296 let mut w = SegmentWriter::new(0xABCD);
297 for i in 1u64..=5 {
298 w.append_sparse(TileId::new(i, i as i64 * 100), &sparse_tile(&s))
299 .unwrap();
300 }
301 let kek = test_kek();
302 let encrypted = w.finish(Some(&kek)).unwrap();
303 let owned = super::super::reader::OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek))
304 .unwrap();
305 let reader = owned.reader();
306 assert_eq!(reader.tile_count(), 5);
307 for idx in 0..5 {
308 let tile = reader.read_tile(idx).unwrap();
309 assert!(matches!(tile, super::super::reader::TilePayload::Sparse(_)));
310 }
311 }
312}