1use std::collections::{HashMap, HashSet};
12
13use crate::error::{ArrayError, ArrayResult};
14use crate::schema::ArraySchema;
15use crate::segment::TileEntry;
16use crate::segment::reader::{SegmentReader, TilePayload};
17use crate::tile::cell_payload::CellPayload;
18use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTile, SparseTileBuilder};
19use crate::types::TileId;
20use crate::types::coord::value::CoordValue;
21use nodedb_types::{OPEN_UPPER, Surrogate};
22
23pub struct RetentionMergeResult {
27 pub ceiling_tile: Option<SparseTile>,
32 pub keep_inhorizon: Vec<TileId>,
34 pub dropped_tile_ids: Vec<TileId>,
38 pub cells_carried_forward: usize,
40}
41
42pub fn encode_coord_key(coord: &[CoordValue]) -> ArrayResult<Vec<u8>> {
49 let owned = coord.to_vec();
50 zerompk::to_msgpack_vec(&owned).map_err(|e| ArrayError::SegmentCorruption {
51 detail: format!("encode_coord_key: {e}"),
52 })
53}
54
55pub struct DecodedRow {
59 pub coord_key: Vec<u8>,
60 pub coord: Vec<CoordValue>,
61 pub kind: RowKind,
62 pub payload: Option<CellPayload>,
64}
65
66pub fn decode_sparse_rows(tile: &SparseTile) -> ArrayResult<Vec<DecodedRow>> {
72 let n = tile.row_count();
73 let arity = tile.dim_dicts.len();
74 let mut rows = Vec::with_capacity(n);
75 let mut live_idx: usize = 0;
76
77 for row in 0..n {
78 let mut coord = Vec::with_capacity(arity);
80 for dim_idx in 0..arity {
81 let dict =
82 tile.dim_dicts
83 .get(dim_idx)
84 .ok_or_else(|| ArrayError::SegmentCorruption {
85 detail: format!("decode_sparse_rows: dim {dim_idx} missing"),
86 })?;
87 let entry_idx = *dict
88 .indices
89 .get(row)
90 .ok_or_else(|| ArrayError::SegmentCorruption {
91 detail: format!("decode_sparse_rows: row {row} index out of range"),
92 })? as usize;
93 let val = dict
94 .values
95 .get(entry_idx)
96 .ok_or_else(|| ArrayError::SegmentCorruption {
97 detail: format!("decode_sparse_rows: dict entry {entry_idx} out of range"),
98 })?;
99 coord.push(val.clone());
100 }
101
102 let coord_key = encode_coord_key(&coord)?;
103 let kind = tile.row_kind(row)?;
104
105 let payload = match kind {
106 RowKind::Live => {
107 let attrs: Vec<_> = tile
109 .attr_cols
110 .iter()
111 .map(|col| {
112 col.get(live_idx)
113 .cloned()
114 .ok_or_else(|| ArrayError::SegmentCorruption {
115 detail: format!(
116 "decode_sparse_rows: attr col live_idx {live_idx} out of range"
117 ),
118 })
119 })
120 .collect::<ArrayResult<Vec<_>>>()?;
121
122 let surrogate = tile.surrogates.get(row).copied().unwrap_or(Surrogate::ZERO);
123 let valid_from_ms = tile.valid_from_ms.get(row).copied().ok_or_else(|| {
124 ArrayError::SegmentCorruption {
125 detail: format!("decode_sparse_rows: valid_from_ms row {row} out of range"),
126 }
127 })?;
128 let valid_until_ms = tile.valid_until_ms.get(row).copied().ok_or_else(|| {
129 ArrayError::SegmentCorruption {
130 detail: format!(
131 "decode_sparse_rows: valid_until_ms row {row} out of range"
132 ),
133 }
134 })?;
135
136 live_idx += 1;
137 Some(CellPayload {
138 valid_from_ms,
139 valid_until_ms,
140 attrs,
141 surrogate,
142 })
143 }
144 RowKind::Tombstone | RowKind::GdprErased => None,
145 };
146
147 rows.push(DecodedRow {
148 coord_key,
149 coord,
150 kind,
151 payload,
152 });
153 }
154
155 Ok(rows)
156}
157
158pub fn merge_for_retention(
171 versions: &[TileEntry],
172 reader: &SegmentReader<'_>,
173 schema: &ArraySchema,
174 horizon_ms: i64,
175) -> ArrayResult<RetentionMergeResult> {
176 if versions.is_empty() {
177 return Ok(RetentionMergeResult {
178 ceiling_tile: None,
179 keep_inhorizon: Vec::new(),
180 dropped_tile_ids: Vec::new(),
181 cells_carried_forward: 0,
182 });
183 }
184
185 let mut inside: Vec<&TileEntry> = Vec::new();
187 let mut outside: Vec<&TileEntry> = Vec::new();
188 for entry in versions {
189 if entry.tile_id.system_from_ms >= horizon_ms {
190 inside.push(entry);
191 } else {
192 outside.push(entry);
193 }
194 }
195
196 let keep_inhorizon: Vec<TileId> = inside.iter().map(|e| e.tile_id).collect();
198
199 if outside.is_empty() {
201 return Ok(RetentionMergeResult {
202 ceiling_tile: None,
203 keep_inhorizon,
204 dropped_tile_ids: Vec::new(),
205 cells_carried_forward: 0,
206 });
207 }
208
209 let mut inhorizon_coords: HashSet<Vec<u8>> = HashSet::new();
213 for entry in &inside {
214 let tile_idx = find_tile_index(reader, entry.tile_id)?;
215 let payload = reader.read_tile(tile_idx)?;
216 if let TilePayload::Sparse(ref tile) = payload {
217 for trow in decode_sparse_rows(tile)? {
218 inhorizon_coords.insert(trow.coord_key);
219 }
220 }
221 }
225
226 outside.sort_by_key(|e| std::cmp::Reverse(e.tile_id.system_from_ms));
229
230 let mut ceiling: HashMap<Vec<u8>, (Vec<CoordValue>, RowKind, Option<CellPayload>)> =
232 HashMap::new();
233
234 for entry in &outside {
235 let tile_idx = find_tile_index(reader, entry.tile_id)?;
236 let payload = reader.read_tile(tile_idx)?;
237 if let TilePayload::Sparse(ref tile) = payload {
238 for trow in decode_sparse_rows(tile)? {
239 if inhorizon_coords.contains(&trow.coord_key) {
241 continue;
242 }
243 ceiling
245 .entry(trow.coord_key)
246 .or_insert((trow.coord, trow.kind, trow.payload));
247 }
248 }
249 }
250
251 let dropped_tile_ids: Vec<TileId> = outside.iter().map(|e| e.tile_id).collect();
254
255 let mut builder = SparseTileBuilder::new(schema);
259 let mut cells_carried_forward: usize = 0;
260
261 type CeilingRow = (Vec<u8>, Vec<CoordValue>, RowKind, Option<CellPayload>);
263 let mut ceiling_rows: Vec<CeilingRow> = ceiling
264 .into_iter()
265 .map(|(k, (coord, kind, payload))| (k, coord, kind, payload))
266 .collect();
267 ceiling_rows.sort_by(|a, b| a.0.cmp(&b.0));
268
269 for (_key, coord, kind, payload) in ceiling_rows {
270 match kind {
271 RowKind::GdprErased => {
272 }
274 RowKind::Tombstone => {
275 builder.push_row(SparseRow {
276 coord: &coord,
277 attrs: &[],
278 surrogate: Surrogate::ZERO,
279 valid_from_ms: 0,
280 valid_until_ms: OPEN_UPPER,
281 kind: RowKind::Tombstone,
282 })?;
283 cells_carried_forward += 1;
284 }
285 RowKind::Live => {
286 let p = payload.ok_or_else(|| ArrayError::SegmentCorruption {
287 detail: "Live row in ceiling has no CellPayload".into(),
288 })?;
289 builder.push_row(SparseRow {
290 coord: &coord,
291 attrs: &p.attrs,
292 surrogate: p.surrogate,
293 valid_from_ms: p.valid_from_ms,
294 valid_until_ms: p.valid_until_ms,
295 kind: RowKind::Live,
296 })?;
297 cells_carried_forward += 1;
298 }
299 }
300 }
301
302 let ceiling_tile = if cells_carried_forward == 0 {
303 None
304 } else {
305 Some(builder.build())
306 };
307
308 Ok(RetentionMergeResult {
309 ceiling_tile,
310 keep_inhorizon,
311 dropped_tile_ids,
312 cells_carried_forward,
313 })
314}
315
316fn find_tile_index(reader: &SegmentReader<'_>, tile_id: TileId) -> ArrayResult<usize> {
320 reader
321 .tiles()
322 .iter()
323 .position(|e| e.tile_id == tile_id)
324 .ok_or_else(|| ArrayError::SegmentCorruption {
325 detail: format!("find_tile_index: TileId {tile_id:?} not found in segment"),
326 })
327}
328
329#[cfg(test)]
332mod tests {
333 use super::*;
334 use crate::schema::ArraySchemaBuilder;
335 use crate::schema::attr_spec::{AttrSpec, AttrType};
336 use crate::schema::dim_spec::{DimSpec, DimType};
337 use crate::segment::writer::SegmentWriter;
338 use crate::types::cell_value::value::CellValue;
339 use crate::types::coord::value::CoordValue;
340 use crate::types::domain::{Domain, DomainBound};
341
342 fn schema() -> ArraySchema {
343 ArraySchemaBuilder::new("t")
344 .dim(DimSpec::new(
345 "x",
346 DimType::Int64,
347 Domain::new(DomainBound::Int64(0), DomainBound::Int64(1000)),
348 ))
349 .attr(AttrSpec::new("v", AttrType::Int64, true))
350 .tile_extents(vec![100])
351 .build()
352 .unwrap()
353 }
354
355 fn build_segment(pairs: Vec<(TileId, SparseTile)>) -> Vec<u8> {
358 let mut w = SegmentWriter::new(0xBEEF);
359 for (id, tile) in pairs {
360 w.append_sparse(id, &tile).unwrap();
361 }
362 w.finish(None).unwrap()
363 }
364
365 fn live_tile(schema: &ArraySchema, x: i64, v: i64) -> SparseTile {
367 let mut b = SparseTileBuilder::new(schema);
368 b.push_row(SparseRow {
369 coord: &[CoordValue::Int64(x)],
370 attrs: &[CellValue::Int64(v)],
371 surrogate: Surrogate::ZERO,
372 valid_from_ms: 0,
373 valid_until_ms: OPEN_UPPER,
374 kind: RowKind::Live,
375 })
376 .unwrap();
377 b.build()
378 }
379
380 fn tombstone_tile(schema: &ArraySchema, x: i64) -> SparseTile {
382 let mut b = SparseTileBuilder::new(schema);
383 b.push_row(SparseRow {
384 coord: &[CoordValue::Int64(x)],
385 attrs: &[],
386 surrogate: Surrogate::ZERO,
387 valid_from_ms: 0,
388 valid_until_ms: OPEN_UPPER,
389 kind: RowKind::Tombstone,
390 })
391 .unwrap();
392 b.build()
393 }
394
395 fn gdpr_tile(schema: &ArraySchema, x: i64) -> SparseTile {
397 let mut b = SparseTileBuilder::new(schema);
398 b.push_row(SparseRow {
399 coord: &[CoordValue::Int64(x)],
400 attrs: &[],
401 surrogate: Surrogate::ZERO,
402 valid_from_ms: 0,
403 valid_until_ms: OPEN_UPPER,
404 kind: RowKind::GdprErased,
405 })
406 .unwrap();
407 b.build()
408 }
409
410 fn ceiling_x_values(tile: &SparseTile) -> Vec<i64> {
412 let rows = decode_sparse_rows(tile).unwrap();
413 let mut xs: Vec<i64> = rows
414 .iter()
415 .map(|r| match r.coord[0] {
416 CoordValue::Int64(x) => x,
417 _ => panic!("unexpected coord type"),
418 })
419 .collect();
420 xs.sort_unstable();
421 xs
422 }
423
424 fn ceiling_kinds(tile: &SparseTile) -> HashMap<i64, RowKind> {
426 decode_sparse_rows(tile)
427 .unwrap()
428 .into_iter()
429 .map(|r| {
430 let x = match r.coord[0] {
431 CoordValue::Int64(x) => x,
432 _ => panic!("unexpected coord type"),
433 };
434 (x, r.kind)
435 })
436 .collect()
437 }
438
439 #[test]
444 fn cell_preservation_across_sparse_writes() {
445 let s = schema();
446 let t100 = TileId::new(0, 100);
448 let t200 = TileId::new(0, 200);
449 let bytes = build_segment(vec![
450 (t100, live_tile(&s, 1, 10)),
451 (t200, live_tile(&s, 2, 20)),
452 ]);
453 let reader = SegmentReader::open(&bytes).unwrap();
454 let versions: Vec<TileEntry> = reader.tiles().to_vec();
455 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
456 let ceiling = result.ceiling_tile.expect("ceiling must be non-None");
457 let xs = ceiling_x_values(&ceiling);
458 assert_eq!(xs, vec![1, 2], "both cells must survive in ceiling");
459 assert_eq!(result.cells_carried_forward, 2);
460 assert!(result.keep_inhorizon.is_empty());
461 assert_eq!(result.dropped_tile_ids.len(), 2);
462 }
463
464 #[test]
468 fn inhorizon_versions_pass_through_unchanged() {
469 let s = schema();
470 let t400 = TileId::new(0, 400);
471 let t500 = TileId::new(0, 500);
472 let bytes = build_segment(vec![
473 (t400, live_tile(&s, 1, 10)),
474 (t500, live_tile(&s, 1, 20)),
475 ]);
476 let reader = SegmentReader::open(&bytes).unwrap();
477 let versions: Vec<TileEntry> = reader.tiles().to_vec();
478 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
479 assert!(result.ceiling_tile.is_none());
480 let mut keep: Vec<i64> = result
481 .keep_inhorizon
482 .iter()
483 .map(|id| id.system_from_ms)
484 .collect();
485 keep.sort_unstable();
486 assert_eq!(keep, vec![400, 500]);
487 assert!(result.dropped_tile_ids.is_empty());
488 }
489
490 #[test]
495 fn mixed_inhorizon_and_outhorizon() {
496 let s = schema();
497 let bytes = build_segment(vec![
498 (TileId::new(0, 100), live_tile(&s, 1, 10)),
499 (TileId::new(0, 200), live_tile(&s, 2, 20)),
500 (TileId::new(0, 400), live_tile(&s, 3, 30)),
501 ]);
502 let reader = SegmentReader::open(&bytes).unwrap();
503 let versions: Vec<TileEntry> = reader.tiles().to_vec();
504 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
505 let ceiling = result.ceiling_tile.expect("ceiling must have A and B");
506 let xs = ceiling_x_values(&ceiling);
507 assert_eq!(xs, vec![1, 2]);
508 assert_eq!(result.cells_carried_forward, 2);
509 assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
510 let mut dropped: Vec<i64> = result
511 .dropped_tile_ids
512 .iter()
513 .map(|id| id.system_from_ms)
514 .collect();
515 dropped.sort_unstable();
516 assert_eq!(dropped, vec![100, 200]);
517 }
518
519 #[test]
524 fn tombstone_collapses_into_ceiling() {
525 let s = schema();
526 let bytes = build_segment(vec![
527 (TileId::new(0, 100), live_tile(&s, 1, 10)),
528 (TileId::new(0, 200), tombstone_tile(&s, 1)),
529 ]);
530 let reader = SegmentReader::open(&bytes).unwrap();
531 let versions: Vec<TileEntry> = reader.tiles().to_vec();
532 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
533 let ceiling = result.ceiling_tile.expect("ceiling must contain tombstone");
534 let kinds = ceiling_kinds(&ceiling);
535 assert_eq!(kinds.get(&1), Some(&RowKind::Tombstone));
536 assert_eq!(result.cells_carried_forward, 1);
537 }
538
539 #[test]
544 fn tombstone_below_with_inhorizon_live_succeeds() {
545 let s = schema();
546 let bytes = build_segment(vec![
548 (TileId::new(0, 100), tombstone_tile(&s, 1)),
549 (TileId::new(0, 400), live_tile(&s, 1, 99)),
550 ]);
551 let reader = SegmentReader::open(&bytes).unwrap();
552 let versions: Vec<TileEntry> = reader.tiles().to_vec();
553 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
554 assert!(
556 result.ceiling_tile.is_none(),
557 "inside version covers x=1; ceiling must be empty"
558 );
559 assert_eq!(result.cells_carried_forward, 0);
560 assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
561 }
562
563 #[test]
568 fn gdpr_erasure_drops_cell_outright() {
569 let s = schema();
570 let bytes = build_segment(vec![
571 (TileId::new(0, 100), live_tile(&s, 1, 10)),
572 (TileId::new(0, 200), gdpr_tile(&s, 1)),
573 ]);
574 let reader = SegmentReader::open(&bytes).unwrap();
575 let versions: Vec<TileEntry> = reader.tiles().to_vec();
576 let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
577 assert!(
578 result.ceiling_tile.is_none(),
579 "GdprErased newest version must suppress cell from ceiling"
580 );
581 assert_eq!(result.cells_carried_forward, 0);
582 assert_eq!(result.dropped_tile_ids.len(), 2);
583 }
584
585 #[test]
589 fn empty_input_returns_empty() {
590 let s = schema();
591 let bytes = build_segment(vec![]);
592 let reader = SegmentReader::open(&bytes).unwrap();
593 let result = merge_for_retention(&[], &reader, &s, 300).unwrap();
594 assert!(result.ceiling_tile.is_none());
595 assert!(result.keep_inhorizon.is_empty());
596 assert!(result.dropped_tile_ids.is_empty());
597 assert_eq!(result.cells_carried_forward, 0);
598 }
599}