1use super::format::{
11 SegmentFooter, SegmentHeader, TileEntry, TileKind, framing::BlockFraming, header::HEADER_SIZE,
12};
13use crate::codec::tag::peek_tag;
14use crate::codec::tile_decode::decode_sparse_tile;
15use crate::error::{ArrayError, ArrayResult};
16use crate::tile::cell_payload::{CELL_GDPR_ERASURE_SENTINEL, CELL_TOMBSTONE_SENTINEL, CellPayload};
17use crate::tile::dense_tile::DenseTile;
18use crate::tile::sparse_tile::{RowKind, SparseTile};
19use crate::types::coord::value::CoordValue;
20use nodedb_types::Surrogate;
21
22pub fn extract_cell_bytes(tile: &SparseTile, coord: &[CoordValue]) -> ArrayResult<Option<Vec<u8>>> {
33 let n = tile.row_count();
34 'rows: for row in 0..n {
35 for (dim_idx, c) in coord.iter().enumerate() {
37 let dict =
38 tile.dim_dicts
39 .get(dim_idx)
40 .ok_or_else(|| ArrayError::SegmentCorruption {
41 detail: format!("extract_cell_bytes: dim {dim_idx} out of range"),
42 })?;
43 let entry_idx = *dict
44 .indices
45 .get(row)
46 .ok_or_else(|| ArrayError::SegmentCorruption {
47 detail: format!("extract_cell_bytes: row {row} index out of range"),
48 })? as usize;
49 let stored =
50 dict.values
51 .get(entry_idx)
52 .ok_or_else(|| ArrayError::SegmentCorruption {
53 detail: format!("extract_cell_bytes: dict entry {entry_idx} out of range"),
54 })?;
55 if stored != c {
56 continue 'rows;
57 }
58 }
59 let kind = tile.row_kind(row)?;
61 match kind {
62 RowKind::Tombstone => return Ok(Some(CELL_TOMBSTONE_SENTINEL.to_vec())),
63 RowKind::GdprErased => return Ok(Some(CELL_GDPR_ERASURE_SENTINEL.to_vec())),
64 RowKind::Live => {}
65 }
66 let attrs: Vec<_> = tile
68 .attr_cols
69 .iter()
70 .map(|col| {
71 col.get(row)
72 .cloned()
73 .ok_or_else(|| ArrayError::SegmentCorruption {
74 detail: format!("extract_cell_bytes: attr col row {row} out of range"),
75 })
76 })
77 .collect::<ArrayResult<Vec<_>>>()?;
78 let surrogate = tile.surrogates.get(row).copied().unwrap_or(Surrogate::ZERO);
79 let valid_from_ms =
80 tile.valid_from_ms
81 .get(row)
82 .copied()
83 .ok_or_else(|| ArrayError::SegmentCorruption {
84 detail: format!("extract_cell_bytes: valid_from_ms row {row} out of range"),
85 })?;
86 let valid_until_ms =
87 tile.valid_until_ms
88 .get(row)
89 .copied()
90 .ok_or_else(|| ArrayError::SegmentCorruption {
91 detail: format!("extract_cell_bytes: valid_until_ms row {row} out of range"),
92 })?;
93 let payload = CellPayload {
94 valid_from_ms,
95 valid_until_ms,
96 attrs,
97 surrogate,
98 };
99 return payload.encode().map(Some);
100 }
101 Ok(None)
102}
103
104fn read_sparse_tile(payload: &[u8]) -> ArrayResult<SparseTile> {
109 match peek_tag(payload) {
110 Some(_) => decode_sparse_tile(payload),
111 None => Err(ArrayError::SegmentCorruption {
112 detail: format!(
113 "sparse tile has unrecognized tag byte {:#04x}",
114 payload.first().copied().unwrap_or(0)
115 ),
116 }),
117 }
118}
119
120#[derive(Debug, Clone, PartialEq)]
122pub enum TilePayload {
123 Sparse(SparseTile),
124 Dense(DenseTile),
125}
126
127pub struct SegmentReader<'a> {
128 bytes: &'a [u8],
129 header: SegmentHeader,
130 footer: SegmentFooter,
131}
132
133impl<'a> SegmentReader<'a> {
134 pub fn open(bytes: &'a [u8]) -> ArrayResult<Self> {
135 if bytes.len() < HEADER_SIZE {
136 return Err(ArrayError::SegmentCorruption {
137 detail: format!("segment too small: {} bytes", bytes.len()),
138 });
139 }
140 let header = SegmentHeader::decode(&bytes[..HEADER_SIZE])?;
141 let footer = SegmentFooter::decode(bytes)?;
142 if header.schema_hash != footer.schema_hash {
143 return Err(ArrayError::SegmentCorruption {
144 detail: format!(
145 "header/footer schema_hash mismatch: header={:x} footer={:x}",
146 header.schema_hash, footer.schema_hash
147 ),
148 });
149 }
150 Ok(Self {
151 bytes,
152 header,
153 footer,
154 })
155 }
156
157 pub fn header(&self) -> &SegmentHeader {
158 &self.header
159 }
160
161 pub fn schema_hash(&self) -> u64 {
162 self.header.schema_hash
163 }
164
165 pub fn tiles(&self) -> &[TileEntry] {
166 &self.footer.tiles
167 }
168
169 pub fn tile_count(&self) -> usize {
170 self.footer.tiles.len()
171 }
172
173 pub fn read_tile_as_of(
183 &self,
184 hilbert_prefix: u64,
185 system_as_of: i64,
186 _valid_at_ms: Option<i64>,
187 ) -> ArrayResult<Option<TilePayload>> {
188 let tiles = &self.footer.tiles;
189
190 let first = tiles.partition_point(|e| e.tile_id.hilbert_prefix < hilbert_prefix);
192 let past_prefix = tiles.partition_point(|e| e.tile_id.hilbert_prefix <= hilbert_prefix);
196
197 let candidates = &tiles[first..past_prefix];
199 if candidates.is_empty() {
200 return Ok(None);
201 }
202
203 let cutoff_pos = candidates.partition_point(|e| e.tile_id.system_from_ms <= system_as_of);
206 if cutoff_pos == 0 {
207 return Ok(None);
208 }
209
210 let entry_idx = first + cutoff_pos - 1;
212 self.read_tile(entry_idx).map(Some)
213 }
214
215 pub fn iter_tile_versions(
222 &self,
223 hilbert_prefix: u64,
224 system_as_of: i64,
225 ) -> ArrayResult<impl Iterator<Item = ArrayResult<(crate::types::TileId, TilePayload)>> + '_>
226 {
227 let tiles = &self.footer.tiles;
228
229 let first = tiles.partition_point(|e| e.tile_id.hilbert_prefix < hilbert_prefix);
231 let past_prefix = tiles.partition_point(|e| e.tile_id.hilbert_prefix <= hilbert_prefix);
232
233 let cutoff_pos =
236 tiles[first..past_prefix].partition_point(|e| e.tile_id.system_from_ms <= system_as_of);
237 let qualifying_start = first;
239 let qualifying_end = first + cutoff_pos;
240
241 let indices: Vec<usize> = (qualifying_start..qualifying_end).rev().collect();
243 Ok(indices.into_iter().map(move |idx| {
244 let tile_id = tiles[idx].tile_id;
245 self.read_tile(idx).map(|payload| (tile_id, payload))
246 }))
247 }
248
249 pub fn read_tile(&self, idx: usize) -> ArrayResult<TilePayload> {
251 let entry = self
252 .footer
253 .tiles
254 .get(idx)
255 .ok_or_else(|| ArrayError::SegmentCorruption {
256 detail: format!(
257 "tile index {idx} out of range (have {})",
258 self.footer.tiles.len()
259 ),
260 })?;
261 let off = entry.offset as usize;
262 let len = entry.length as usize;
263 let end = off
264 .checked_add(len)
265 .ok_or_else(|| ArrayError::SegmentCorruption {
266 detail: "tile entry offset+length overflows".into(),
267 })?;
268 if end > self.bytes.len() {
269 return Err(ArrayError::SegmentCorruption {
270 detail: format!(
271 "tile {idx} block out of bounds: off={off} len={len} \
272 file_size={}",
273 self.bytes.len()
274 ),
275 });
276 }
277 let (payload, _) = BlockFraming::decode(&self.bytes[off..end])?;
278 match entry.kind {
279 TileKind::Sparse => {
280 let t = read_sparse_tile(payload)?;
281 Ok(TilePayload::Sparse(t))
282 }
283 TileKind::Dense => {
284 let t: DenseTile =
285 zerompk::from_msgpack(payload).map_err(|e| ArrayError::SegmentCorruption {
286 detail: format!("dense tile decode failed: {e}"),
287 })?;
288 Ok(TilePayload::Dense(t))
289 }
290 }
291 }
292}
293
294#[derive(Debug)]
300pub struct OwnedSegmentReader {
301 plaintext: Vec<u8>,
303 header: SegmentHeader,
304 footer: SegmentFooter,
305}
306
307impl OwnedSegmentReader {
308 pub fn open_with_kek(
316 blob: &[u8],
317 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
318 ) -> ArrayResult<Self> {
319 use super::encrypt::{decrypt_segment, detect_encryption};
320 let is_encrypted = detect_encryption(blob)?;
321 let plaintext = match (is_encrypted, kek) {
322 (true, Some(key)) => decrypt_segment(key, blob)?,
323 (true, None) => return Err(ArrayError::MissingKek),
324 (false, Some(_)) => return Err(ArrayError::KekRequired),
325 (false, None) => blob.to_vec(),
326 };
327 let header = SegmentHeader::decode(&plaintext[..HEADER_SIZE.min(plaintext.len())])?;
328 let footer = SegmentFooter::decode(&plaintext)?;
329 if header.schema_hash != footer.schema_hash {
330 return Err(ArrayError::SegmentCorruption {
331 detail: format!(
332 "header/footer schema_hash mismatch: header={:x} footer={:x}",
333 header.schema_hash, footer.schema_hash
334 ),
335 });
336 }
337 Ok(Self {
338 plaintext,
339 header,
340 footer,
341 })
342 }
343
344 pub fn reader(&self) -> SegmentReader<'_> {
346 SegmentReader {
347 bytes: &self.plaintext,
348 header: self.header,
349 footer: self.footer.clone(),
350 }
351 }
352
353 pub fn into_plaintext(self) -> Vec<u8> {
355 self.plaintext
356 }
357}
358
359impl<'a> SegmentReader<'a> {
360 pub fn open_with_kek(
368 blob: &[u8],
369 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
370 ) -> ArrayResult<OwnedSegmentReader> {
371 OwnedSegmentReader::open_with_kek(blob, kek)
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use crate::schema::ArraySchemaBuilder;
379 use crate::schema::attr_spec::{AttrSpec, AttrType};
380 use crate::schema::dim_spec::{DimSpec, DimType};
381 use crate::segment::writer::SegmentWriter;
382 use crate::tile::dense_tile::DenseTile;
383 use crate::tile::sparse_tile::SparseTileBuilder;
384 use crate::types::TileId;
385 use crate::types::cell_value::value::CellValue;
386 use crate::types::coord::value::CoordValue;
387 use crate::types::domain::{Domain, DomainBound};
388
389 fn schema() -> crate::schema::ArraySchema {
390 ArraySchemaBuilder::new("g")
391 .dim(DimSpec::new(
392 "x",
393 DimType::Int64,
394 Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
395 ))
396 .dim(DimSpec::new(
397 "y",
398 DimType::Int64,
399 Domain::new(DomainBound::Int64(0), DomainBound::Int64(15)),
400 ))
401 .attr(AttrSpec::new("v", AttrType::Int64, true))
402 .tile_extents(vec![4, 4])
403 .build()
404 .unwrap()
405 }
406
407 fn make_sparse(s: &crate::schema::ArraySchema, base: i64) -> SparseTile {
408 let mut b = SparseTileBuilder::new(s);
409 b.push(
410 &[CoordValue::Int64(base), CoordValue::Int64(base + 1)],
411 &[CellValue::Int64(base * 10)],
412 )
413 .unwrap();
414 b.build()
415 }
416
417 #[test]
418 fn reader_round_trips_sparse_tiles() {
419 let s = schema();
420 let mut w = SegmentWriter::new(0xCAFE);
421 w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
422 .unwrap();
423 w.append_sparse(TileId::snapshot(2), &make_sparse(&s, 2))
424 .unwrap();
425 let bytes = w.finish(None).unwrap();
426 let r = SegmentReader::open(&bytes).unwrap();
427 assert_eq!(r.tile_count(), 2);
428 let t0 = r.read_tile(0).unwrap();
429 match t0 {
430 TilePayload::Sparse(t) => assert_eq!(t.nnz(), 1),
431 _ => panic!("expected sparse"),
432 }
433 }
434
435 #[test]
436 fn reader_round_trips_dense_tile() {
437 let s = schema();
438 let mut w = SegmentWriter::new(0xBEEF);
439 w.append_dense(TileId::snapshot(1), &DenseTile::empty(&s))
440 .unwrap();
441 let bytes = w.finish(None).unwrap();
442 let r = SegmentReader::open(&bytes).unwrap();
443 match r.read_tile(0).unwrap() {
444 TilePayload::Dense(t) => assert_eq!(t.cell_count(), 16),
445 _ => panic!("expected dense"),
446 }
447 }
448
449 #[test]
450 fn reader_rejects_mismatched_schema_hash() {
451 let s = schema();
457 let mut w = SegmentWriter::new(0x1);
458 w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
459 .unwrap();
460 let mut bytes = w.finish(None).unwrap();
461 bytes[12] ^= 0xFF; assert!(SegmentReader::open(&bytes).is_err());
463 }
464
465 #[test]
466 fn reader_rejects_out_of_range_tile() {
467 let s = schema();
468 let mut w = SegmentWriter::new(0x1);
469 w.append_sparse(TileId::snapshot(1), &make_sparse(&s, 1))
470 .unwrap();
471 let bytes = w.finish(None).unwrap();
472 let r = SegmentReader::open(&bytes).unwrap();
473 assert!(r.read_tile(99).is_err());
474 }
475
476 #[test]
477 fn read_tile_as_of_returns_newest_at_cutoff() {
478 let s = schema();
479 let mut w = SegmentWriter::new(0xCAFE);
480 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
482 .unwrap();
483 w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
484 .unwrap();
485 w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
486 .unwrap();
487 let bytes = w.finish(None).unwrap();
488 let r = SegmentReader::open(&bytes).unwrap();
489 let result = r.read_tile_as_of(1, 250, None).unwrap();
491 match result {
492 Some(TilePayload::Sparse(t)) => {
493 assert_eq!(t.nnz(), 1);
495 assert_eq!(t.dim_dicts[0].values[0], CoordValue::Int64(2));
497 }
498 other => panic!("expected Some(Sparse), got {other:?}"),
499 }
500 }
501
502 #[test]
503 fn read_tile_as_of_returns_none_below_first_version() {
504 let s = schema();
505 let mut w = SegmentWriter::new(0xBEEF);
506 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
507 .unwrap();
508 let bytes = w.finish(None).unwrap();
509 let r = SegmentReader::open(&bytes).unwrap();
510 let result = r.read_tile_as_of(1, 50, None).unwrap();
512 assert!(result.is_none());
513 }
514
515 #[test]
516 fn extract_cell_bytes_finds_coord() {
517 let s = schema();
518 let sparse = make_sparse(&s, 3);
519 let coord = vec![CoordValue::Int64(3), CoordValue::Int64(4)];
521 let bytes = extract_cell_bytes(&sparse, &coord).unwrap();
522 assert!(bytes.is_some(), "should find coord (3,4)");
523
524 let absent = vec![CoordValue::Int64(9), CoordValue::Int64(9)];
525 let none = extract_cell_bytes(&sparse, &absent).unwrap();
526 assert!(none.is_none(), "absent coord must return None");
527 }
528
529 #[test]
530 fn extract_cell_bytes_carries_valid_time_bounds() {
531 use crate::tile::cell_payload::CellPayload;
532 use crate::tile::sparse_tile::{SparseRow, SparseTileBuilder};
533 use nodedb_types::Surrogate;
534
535 let s = schema();
536 let mut b = SparseTileBuilder::new(&s);
537 b.push_row(SparseRow {
538 coord: &[CoordValue::Int64(1), CoordValue::Int64(2)],
539 attrs: &[CellValue::Int64(99)],
540 surrogate: Surrogate::ZERO,
541 valid_from_ms: 100,
542 valid_until_ms: 200,
543 kind: crate::tile::sparse_tile::RowKind::Live,
544 })
545 .unwrap();
546 let tile = b.build();
547
548 let coord = vec![CoordValue::Int64(1), CoordValue::Int64(2)];
549 let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
550 let payload = CellPayload::decode(&bytes).unwrap();
551 assert_eq!(payload.valid_from_ms, 100);
552 assert_eq!(payload.valid_until_ms, 200);
553 }
554
555 #[test]
556 fn iter_tile_versions_newest_first() {
557 let s = schema();
558 let mut w = SegmentWriter::new(0xCAFE);
559 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
560 .unwrap();
561 w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
562 .unwrap();
563 w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
564 .unwrap();
565 let bytes = w.finish(None).unwrap();
566 let r = SegmentReader::open(&bytes).unwrap();
567 let versions: Vec<_> = r
568 .iter_tile_versions(1, i64::MAX)
569 .unwrap()
570 .map(|v| v.unwrap().0.system_from_ms)
571 .collect();
572 assert_eq!(versions, vec![300, 200, 100]);
573 }
574
575 #[test]
576 fn iter_tile_versions_respects_system_as_of() {
577 let s = schema();
578 let mut w = SegmentWriter::new(0xBEEF);
579 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
580 .unwrap();
581 w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
582 .unwrap();
583 w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
584 .unwrap();
585 let bytes = w.finish(None).unwrap();
586 let r = SegmentReader::open(&bytes).unwrap();
587 let versions: Vec<_> = r
588 .iter_tile_versions(1, 250)
589 .unwrap()
590 .map(|v| v.unwrap().0.system_from_ms)
591 .collect();
592 assert_eq!(versions, vec![200, 100]);
594 }
595
596 #[test]
597 fn extract_cell_bytes_returns_tombstone_sentinel_for_tombstone_row() {
598 use crate::tile::cell_payload::{CELL_TOMBSTONE_SENTINEL, is_cell_tombstone};
599 use crate::tile::sparse_tile::{RowKind, SparseTileBuilder};
600
601 let s = schema();
602 let mut b = SparseTileBuilder::new(&s);
603 b.push_row(crate::tile::sparse_tile::SparseRow {
604 coord: &[CoordValue::Int64(7), CoordValue::Int64(8)],
605 attrs: &[],
606 surrogate: Surrogate::ZERO,
607 valid_from_ms: 0,
608 valid_until_ms: nodedb_types::OPEN_UPPER,
609 kind: RowKind::Tombstone,
610 })
611 .unwrap();
612 let tile = b.build();
613 let coord = vec![CoordValue::Int64(7), CoordValue::Int64(8)];
614 let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
615 assert!(
616 is_cell_tombstone(&bytes),
617 "expected CELL_TOMBSTONE_SENTINEL, got {bytes:?}"
618 );
619 assert_eq!(bytes, CELL_TOMBSTONE_SENTINEL);
620 }
621
622 #[test]
623 fn extract_cell_bytes_returns_erasure_sentinel_for_erased_row() {
624 use crate::tile::cell_payload::{CELL_GDPR_ERASURE_SENTINEL, is_cell_gdpr_erasure};
625 use crate::tile::sparse_tile::{RowKind, SparseTileBuilder};
626
627 let s = schema();
628 let mut b = SparseTileBuilder::new(&s);
629 b.push_row(crate::tile::sparse_tile::SparseRow {
630 coord: &[CoordValue::Int64(4), CoordValue::Int64(5)],
631 attrs: &[],
632 surrogate: Surrogate::ZERO,
633 valid_from_ms: 0,
634 valid_until_ms: nodedb_types::OPEN_UPPER,
635 kind: RowKind::GdprErased,
636 })
637 .unwrap();
638 let tile = b.build();
639 let coord = vec![CoordValue::Int64(4), CoordValue::Int64(5)];
640 let bytes = extract_cell_bytes(&tile, &coord).unwrap().unwrap();
641 assert!(
642 is_cell_gdpr_erasure(&bytes),
643 "expected CELL_GDPR_ERASURE_SENTINEL, got {bytes:?}"
644 );
645 assert_eq!(bytes, CELL_GDPR_ERASURE_SENTINEL);
646 }
647
648 #[test]
649 fn read_tile_as_of_finds_exact_match() {
650 let s = schema();
651 let mut w = SegmentWriter::new(0xDEAD);
652 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
653 .unwrap();
654 w.append_sparse(TileId::new(1, 200), &make_sparse(&s, 2))
655 .unwrap();
656 w.append_sparse(TileId::new(1, 300), &make_sparse(&s, 3))
657 .unwrap();
658 let bytes = w.finish(None).unwrap();
659 let r = SegmentReader::open(&bytes).unwrap();
660 let result = r.read_tile_as_of(1, 200, None).unwrap();
662 match result {
663 Some(TilePayload::Sparse(t)) => {
664 assert_eq!(t.nnz(), 1);
665 assert_eq!(t.dim_dicts[0].values[0], CoordValue::Int64(2));
666 }
667 other => panic!("expected Some(Sparse), got {other:?}"),
668 }
669 }
670
671 fn test_kek() -> nodedb_wal::crypto::WalEncryptionKey {
672 nodedb_wal::crypto::WalEncryptionKey::from_bytes(&[0xA1u8; 32]).unwrap()
673 }
674
675 fn write_plain(s: &crate::schema::ArraySchema, id: TileId) -> Vec<u8> {
676 let mut w = SegmentWriter::new(0xCAFE);
677 w.append_sparse(id, &make_sparse(s, 1)).unwrap();
678 w.finish(None).unwrap()
679 }
680
681 fn write_encrypted(s: &crate::schema::ArraySchema, id: TileId) -> Vec<u8> {
682 let kek = test_kek();
683 let mut w = SegmentWriter::new(0xCAFE);
684 w.append_sparse(id, &make_sparse(s, 1)).unwrap();
685 w.finish(Some(&kek)).unwrap()
686 }
687
688 #[test]
689 fn array_segment_refuses_plaintext_with_kek() {
690 let s = schema();
691 let plain = write_plain(&s, TileId::snapshot(1));
692 let kek = test_kek();
693 let err = OwnedSegmentReader::open_with_kek(&plain, Some(&kek)).unwrap_err();
694 assert!(
695 matches!(err, crate::error::ArrayError::KekRequired),
696 "expected KekRequired, got {err:?}"
697 );
698 }
699
700 #[test]
701 fn array_segment_refuses_encrypted_without_kek() {
702 let s = schema();
703 let encrypted = write_encrypted(&s, TileId::snapshot(1));
704 let err = OwnedSegmentReader::open_with_kek(&encrypted, None).unwrap_err();
705 assert!(
706 matches!(err, crate::error::ArrayError::MissingKek),
707 "expected MissingKek, got {err:?}"
708 );
709 }
710
711 #[test]
712 fn array_segment_tampered_ciphertext_rejected() {
713 let s = schema();
714 let mut encrypted = write_encrypted(&s, TileId::snapshot(1));
715 encrypted[nodedb_wal::crypto::SEGMENT_ENVELOPE_PREAMBLE_SIZE + 2] ^= 0xFF;
717 let kek = test_kek();
718 assert!(OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).is_err());
719 }
720
721 #[test]
722 fn array_segment_encrypted_at_rest() {
723 let s = schema();
724 let encrypted = write_encrypted(&s, TileId::snapshot(1));
725 assert_eq!(&encrypted[..4], b"SEGA");
727 let kek = test_kek();
728 let owned = OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).unwrap();
729 let reader = owned.reader();
730 assert_eq!(reader.tile_count(), 1);
731 }
732
733 #[test]
734 fn array_segment_handle_decrypts_into_owned_buffer() {
735 let s = schema();
736 let kek = test_kek();
737 let mut w = SegmentWriter::new(0x1234);
738 w.append_sparse(TileId::new(1, 100), &make_sparse(&s, 1))
739 .unwrap();
740 w.append_sparse(TileId::new(2, 200), &make_sparse(&s, 2))
741 .unwrap();
742 let encrypted = w.finish(Some(&kek)).unwrap();
743 let owned = OwnedSegmentReader::open_with_kek(&encrypted, Some(&kek)).unwrap();
744 let reader = owned.reader();
745 assert_eq!(reader.tile_count(), 2);
746 assert_eq!(reader.tiles()[0].tile_id, TileId::new(1, 100));
747 }
748}