use std::collections::{HashMap, HashSet};
use crate::error::{ArrayError, ArrayResult};
use crate::schema::ArraySchema;
use crate::segment::TileEntry;
use crate::segment::reader::{SegmentReader, TilePayload};
use crate::tile::cell_payload::CellPayload;
use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTile, SparseTileBuilder};
use crate::types::TileId;
use crate::types::coord::value::CoordValue;
use nodedb_types::{OPEN_UPPER, Surrogate};
pub struct RetentionMergeResult {
pub ceiling_tile: Option<SparseTile>,
pub keep_inhorizon: Vec<TileId>,
pub dropped_tile_ids: Vec<TileId>,
pub cells_carried_forward: usize,
}
pub fn encode_coord_key(coord: &[CoordValue]) -> ArrayResult<Vec<u8>> {
let owned = coord.to_vec();
zerompk::to_msgpack_vec(&owned).map_err(|e| ArrayError::SegmentCorruption {
detail: format!("encode_coord_key: {e}"),
})
}
pub struct DecodedRow {
pub coord_key: Vec<u8>,
pub coord: Vec<CoordValue>,
pub kind: RowKind,
pub payload: Option<CellPayload>,
}
pub fn decode_sparse_rows(tile: &SparseTile) -> ArrayResult<Vec<DecodedRow>> {
let n = tile.row_count();
let arity = tile.dim_dicts.len();
let mut rows = Vec::with_capacity(n);
let mut live_idx: usize = 0;
for row in 0..n {
let mut coord = Vec::with_capacity(arity);
for dim_idx in 0..arity {
let dict =
tile.dim_dicts
.get(dim_idx)
.ok_or_else(|| ArrayError::SegmentCorruption {
detail: format!("decode_sparse_rows: dim {dim_idx} missing"),
})?;
let entry_idx = *dict
.indices
.get(row)
.ok_or_else(|| ArrayError::SegmentCorruption {
detail: format!("decode_sparse_rows: row {row} index out of range"),
})? as usize;
let val = dict
.values
.get(entry_idx)
.ok_or_else(|| ArrayError::SegmentCorruption {
detail: format!("decode_sparse_rows: dict entry {entry_idx} out of range"),
})?;
coord.push(val.clone());
}
let coord_key = encode_coord_key(&coord)?;
let kind = tile.row_kind(row)?;
let payload = match kind {
RowKind::Live => {
let attrs: Vec<_> = tile
.attr_cols
.iter()
.map(|col| {
col.get(live_idx)
.cloned()
.ok_or_else(|| ArrayError::SegmentCorruption {
detail: format!(
"decode_sparse_rows: attr col live_idx {live_idx} out of range"
),
})
})
.collect::<ArrayResult<Vec<_>>>()?;
let surrogate = tile.surrogates.get(row).copied().unwrap_or(Surrogate::ZERO);
let valid_from_ms = tile.valid_from_ms.get(row).copied().ok_or_else(|| {
ArrayError::SegmentCorruption {
detail: format!("decode_sparse_rows: valid_from_ms row {row} out of range"),
}
})?;
let valid_until_ms = tile.valid_until_ms.get(row).copied().ok_or_else(|| {
ArrayError::SegmentCorruption {
detail: format!(
"decode_sparse_rows: valid_until_ms row {row} out of range"
),
}
})?;
live_idx += 1;
Some(CellPayload {
valid_from_ms,
valid_until_ms,
attrs,
surrogate,
})
}
RowKind::Tombstone | RowKind::GdprErased => None,
};
rows.push(DecodedRow {
coord_key,
coord,
kind,
payload,
});
}
Ok(rows)
}
pub fn merge_for_retention(
versions: &[TileEntry],
reader: &SegmentReader<'_>,
schema: &ArraySchema,
horizon_ms: i64,
) -> ArrayResult<RetentionMergeResult> {
if versions.is_empty() {
return Ok(RetentionMergeResult {
ceiling_tile: None,
keep_inhorizon: Vec::new(),
dropped_tile_ids: Vec::new(),
cells_carried_forward: 0,
});
}
let mut inside: Vec<&TileEntry> = Vec::new();
let mut outside: Vec<&TileEntry> = Vec::new();
for entry in versions {
if entry.tile_id.system_from_ms >= horizon_ms {
inside.push(entry);
} else {
outside.push(entry);
}
}
let keep_inhorizon: Vec<TileId> = inside.iter().map(|e| e.tile_id).collect();
if outside.is_empty() {
return Ok(RetentionMergeResult {
ceiling_tile: None,
keep_inhorizon,
dropped_tile_ids: Vec::new(),
cells_carried_forward: 0,
});
}
let mut inhorizon_coords: HashSet<Vec<u8>> = HashSet::new();
for entry in &inside {
let tile_idx = find_tile_index(reader, entry.tile_id)?;
let payload = reader.read_tile(tile_idx)?;
if let TilePayload::Sparse(ref tile) = payload {
for trow in decode_sparse_rows(tile)? {
inhorizon_coords.insert(trow.coord_key);
}
}
}
outside.sort_by_key(|e| std::cmp::Reverse(e.tile_id.system_from_ms));
let mut ceiling: HashMap<Vec<u8>, (Vec<CoordValue>, RowKind, Option<CellPayload>)> =
HashMap::new();
for entry in &outside {
let tile_idx = find_tile_index(reader, entry.tile_id)?;
let payload = reader.read_tile(tile_idx)?;
if let TilePayload::Sparse(ref tile) = payload {
for trow in decode_sparse_rows(tile)? {
if inhorizon_coords.contains(&trow.coord_key) {
continue;
}
ceiling
.entry(trow.coord_key)
.or_insert((trow.coord, trow.kind, trow.payload));
}
}
}
let dropped_tile_ids: Vec<TileId> = outside.iter().map(|e| e.tile_id).collect();
let mut builder = SparseTileBuilder::new(schema);
let mut cells_carried_forward: usize = 0;
type CeilingRow = (Vec<u8>, Vec<CoordValue>, RowKind, Option<CellPayload>);
let mut ceiling_rows: Vec<CeilingRow> = ceiling
.into_iter()
.map(|(k, (coord, kind, payload))| (k, coord, kind, payload))
.collect();
ceiling_rows.sort_by(|a, b| a.0.cmp(&b.0));
for (_key, coord, kind, payload) in ceiling_rows {
match kind {
RowKind::GdprErased => {
}
RowKind::Tombstone => {
builder.push_row(SparseRow {
coord: &coord,
attrs: &[],
surrogate: Surrogate::ZERO,
valid_from_ms: 0,
valid_until_ms: OPEN_UPPER,
kind: RowKind::Tombstone,
})?;
cells_carried_forward += 1;
}
RowKind::Live => {
let p = payload.ok_or_else(|| ArrayError::SegmentCorruption {
detail: "Live row in ceiling has no CellPayload".into(),
})?;
builder.push_row(SparseRow {
coord: &coord,
attrs: &p.attrs,
surrogate: p.surrogate,
valid_from_ms: p.valid_from_ms,
valid_until_ms: p.valid_until_ms,
kind: RowKind::Live,
})?;
cells_carried_forward += 1;
}
}
}
let ceiling_tile = if cells_carried_forward == 0 {
None
} else {
Some(builder.build())
};
Ok(RetentionMergeResult {
ceiling_tile,
keep_inhorizon,
dropped_tile_ids,
cells_carried_forward,
})
}
fn find_tile_index(reader: &SegmentReader<'_>, tile_id: TileId) -> ArrayResult<usize> {
reader
.tiles()
.iter()
.position(|e| e.tile_id == tile_id)
.ok_or_else(|| ArrayError::SegmentCorruption {
detail: format!("find_tile_index: TileId {tile_id:?} not found in segment"),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::ArraySchemaBuilder;
use crate::schema::attr_spec::{AttrSpec, AttrType};
use crate::schema::dim_spec::{DimSpec, DimType};
use crate::segment::writer::SegmentWriter;
use crate::types::cell_value::value::CellValue;
use crate::types::coord::value::CoordValue;
use crate::types::domain::{Domain, DomainBound};
fn schema() -> ArraySchema {
ArraySchemaBuilder::new("t")
.dim(DimSpec::new(
"x",
DimType::Int64,
Domain::new(DomainBound::Int64(0), DomainBound::Int64(1000)),
))
.attr(AttrSpec::new("v", AttrType::Int64, true))
.tile_extents(vec![100])
.build()
.unwrap()
}
fn build_segment(pairs: Vec<(TileId, SparseTile)>) -> Vec<u8> {
let mut w = SegmentWriter::new(0xBEEF);
for (id, tile) in pairs {
w.append_sparse(id, &tile).unwrap();
}
w.finish(None).unwrap()
}
fn live_tile(schema: &ArraySchema, x: i64, v: i64) -> SparseTile {
let mut b = SparseTileBuilder::new(schema);
b.push_row(SparseRow {
coord: &[CoordValue::Int64(x)],
attrs: &[CellValue::Int64(v)],
surrogate: Surrogate::ZERO,
valid_from_ms: 0,
valid_until_ms: OPEN_UPPER,
kind: RowKind::Live,
})
.unwrap();
b.build()
}
fn tombstone_tile(schema: &ArraySchema, x: i64) -> SparseTile {
let mut b = SparseTileBuilder::new(schema);
b.push_row(SparseRow {
coord: &[CoordValue::Int64(x)],
attrs: &[],
surrogate: Surrogate::ZERO,
valid_from_ms: 0,
valid_until_ms: OPEN_UPPER,
kind: RowKind::Tombstone,
})
.unwrap();
b.build()
}
fn gdpr_tile(schema: &ArraySchema, x: i64) -> SparseTile {
let mut b = SparseTileBuilder::new(schema);
b.push_row(SparseRow {
coord: &[CoordValue::Int64(x)],
attrs: &[],
surrogate: Surrogate::ZERO,
valid_from_ms: 0,
valid_until_ms: OPEN_UPPER,
kind: RowKind::GdprErased,
})
.unwrap();
b.build()
}
fn ceiling_x_values(tile: &SparseTile) -> Vec<i64> {
let rows = decode_sparse_rows(tile).unwrap();
let mut xs: Vec<i64> = rows
.iter()
.map(|r| match r.coord[0] {
CoordValue::Int64(x) => x,
_ => panic!("unexpected coord type"),
})
.collect();
xs.sort_unstable();
xs
}
fn ceiling_kinds(tile: &SparseTile) -> HashMap<i64, RowKind> {
decode_sparse_rows(tile)
.unwrap()
.into_iter()
.map(|r| {
let x = match r.coord[0] {
CoordValue::Int64(x) => x,
_ => panic!("unexpected coord type"),
};
(x, r.kind)
})
.collect()
}
#[test]
fn cell_preservation_across_sparse_writes() {
let s = schema();
let t100 = TileId::new(0, 100);
let t200 = TileId::new(0, 200);
let bytes = build_segment(vec![
(t100, live_tile(&s, 1, 10)),
(t200, live_tile(&s, 2, 20)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
let ceiling = result.ceiling_tile.expect("ceiling must be non-None");
let xs = ceiling_x_values(&ceiling);
assert_eq!(xs, vec![1, 2], "both cells must survive in ceiling");
assert_eq!(result.cells_carried_forward, 2);
assert!(result.keep_inhorizon.is_empty());
assert_eq!(result.dropped_tile_ids.len(), 2);
}
#[test]
fn inhorizon_versions_pass_through_unchanged() {
let s = schema();
let t400 = TileId::new(0, 400);
let t500 = TileId::new(0, 500);
let bytes = build_segment(vec![
(t400, live_tile(&s, 1, 10)),
(t500, live_tile(&s, 1, 20)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
assert!(result.ceiling_tile.is_none());
let mut keep: Vec<i64> = result
.keep_inhorizon
.iter()
.map(|id| id.system_from_ms)
.collect();
keep.sort_unstable();
assert_eq!(keep, vec![400, 500]);
assert!(result.dropped_tile_ids.is_empty());
}
#[test]
fn mixed_inhorizon_and_outhorizon() {
let s = schema();
let bytes = build_segment(vec![
(TileId::new(0, 100), live_tile(&s, 1, 10)),
(TileId::new(0, 200), live_tile(&s, 2, 20)),
(TileId::new(0, 400), live_tile(&s, 3, 30)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
let ceiling = result.ceiling_tile.expect("ceiling must have A and B");
let xs = ceiling_x_values(&ceiling);
assert_eq!(xs, vec![1, 2]);
assert_eq!(result.cells_carried_forward, 2);
assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
let mut dropped: Vec<i64> = result
.dropped_tile_ids
.iter()
.map(|id| id.system_from_ms)
.collect();
dropped.sort_unstable();
assert_eq!(dropped, vec![100, 200]);
}
#[test]
fn tombstone_collapses_into_ceiling() {
let s = schema();
let bytes = build_segment(vec![
(TileId::new(0, 100), live_tile(&s, 1, 10)),
(TileId::new(0, 200), tombstone_tile(&s, 1)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
let ceiling = result.ceiling_tile.expect("ceiling must contain tombstone");
let kinds = ceiling_kinds(&ceiling);
assert_eq!(kinds.get(&1), Some(&RowKind::Tombstone));
assert_eq!(result.cells_carried_forward, 1);
}
#[test]
fn tombstone_below_with_inhorizon_live_succeeds() {
let s = schema();
let bytes = build_segment(vec![
(TileId::new(0, 100), tombstone_tile(&s, 1)),
(TileId::new(0, 400), live_tile(&s, 1, 99)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
assert!(
result.ceiling_tile.is_none(),
"inside version covers x=1; ceiling must be empty"
);
assert_eq!(result.cells_carried_forward, 0);
assert_eq!(result.keep_inhorizon, vec![TileId::new(0, 400)]);
}
#[test]
fn gdpr_erasure_drops_cell_outright() {
let s = schema();
let bytes = build_segment(vec![
(TileId::new(0, 100), live_tile(&s, 1, 10)),
(TileId::new(0, 200), gdpr_tile(&s, 1)),
]);
let reader = SegmentReader::open(&bytes).unwrap();
let versions: Vec<TileEntry> = reader.tiles().to_vec();
let result = merge_for_retention(&versions, &reader, &s, 300).unwrap();
assert!(
result.ceiling_tile.is_none(),
"GdprErased newest version must suppress cell from ceiling"
);
assert_eq!(result.cells_carried_forward, 0);
assert_eq!(result.dropped_tile_ids.len(), 2);
}
#[test]
fn empty_input_returns_empty() {
let s = schema();
let bytes = build_segment(vec![]);
let reader = SegmentReader::open(&bytes).unwrap();
let result = merge_for_retention(&[], &reader, &s, 300).unwrap();
assert!(result.ceiling_tile.is_none());
assert!(result.keep_inhorizon.is_empty());
assert!(result.dropped_tile_ids.is_empty());
assert_eq!(result.cells_carried_forward, 0);
}
}