1use crate::error::{ArrayError, ArrayResult};
11use crate::tile::cell_payload::{CellPayload, is_cell_gdpr_erasure, is_cell_tombstone};
12use crate::types::TileId;
13use crate::types::coord::value::CoordValue;
14
15#[derive(Debug, Clone, PartialEq)]
17pub enum CeilingResult {
18 Live(CellPayload),
20 Tombstoned,
22 Erased,
24 NotFound,
27}
28
29pub struct CeilingParams {
31 pub system_as_of: i64,
34 pub valid_at_ms: Option<i64>,
38}
39
40pub fn ceiling_resolve_cell<'a, I>(
57 tile_versions_newest_first: I,
58 _coord: &[CoordValue],
59 params: &CeilingParams,
60) -> ArrayResult<CeilingResult>
61where
62 I: IntoIterator<Item = (TileId, &'a [u8])>,
63{
64 for (tile_id, bytes) in tile_versions_newest_first {
65 if tile_id.system_from_ms > params.system_as_of {
66 continue;
67 }
68 if is_cell_tombstone(bytes) {
69 return Ok(CeilingResult::Tombstoned);
70 }
71 if is_cell_gdpr_erasure(bytes) {
72 return Ok(CeilingResult::Erased);
73 }
74 let payload = CellPayload::decode(bytes).map_err(|e| ArrayError::SegmentCorruption {
75 detail: format!("ceiling_resolve_cell: {e}"),
76 })?;
77 match params.valid_at_ms {
78 Some(vt) if !(payload.valid_from_ms <= vt && vt < payload.valid_until_ms) => {
79 continue;
80 }
81 _ => return Ok(CeilingResult::Live(payload)),
82 }
83 }
84 Ok(CeilingResult::NotFound)
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use crate::tile::cell_payload::{
91 CELL_GDPR_ERASURE_SENTINEL, CELL_TOMBSTONE_SENTINEL, CellPayload, OPEN_UPPER,
92 };
93 use crate::types::TileId;
94 use crate::types::cell_value::value::CellValue;
95 use nodedb_types::Surrogate;
96
97 fn payload(valid_from: i64, valid_until: i64, val: i64) -> Vec<u8> {
98 CellPayload {
99 valid_from_ms: valid_from,
100 valid_until_ms: valid_until,
101 attrs: vec![CellValue::Int64(val)],
102 surrogate: Surrogate::ZERO,
103 }
104 .encode()
105 .unwrap()
106 }
107
108 fn live_params(system_as_of: i64, valid_at_ms: Option<i64>) -> CeilingParams {
109 CeilingParams {
110 system_as_of,
111 valid_at_ms,
112 }
113 }
114
115 #[test]
116 fn live_ceiling_at_cutoff() {
117 let v1 = payload(0, OPEN_UPPER, 1);
120 let v2 = payload(0, OPEN_UPPER, 2);
121 let v3 = payload(0, OPEN_UPPER, 3);
122
123 let versions = vec![
124 (TileId::new(1, 300), v3.as_slice()),
125 (TileId::new(1, 200), v2.as_slice()),
126 (TileId::new(1, 100), v1.as_slice()),
127 ];
128
129 let result = ceiling_resolve_cell(versions, &[], &live_params(250, None)).unwrap();
130 match result {
131 CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(2)),
132 other => panic!("expected Live(2), got {other:?}"),
133 }
134 }
135
136 #[test]
137 fn tombstone_shadows_earlier_versions() {
138 let v1 = payload(0, OPEN_UPPER, 1);
139 let versions_at_300 = vec![
140 (TileId::new(1, 200), CELL_TOMBSTONE_SENTINEL),
141 (TileId::new(1, 100), v1.as_slice()),
142 ];
143 let result = ceiling_resolve_cell(versions_at_300, &[], &live_params(300, None)).unwrap();
144 assert!(matches!(result, CeilingResult::Tombstoned));
145
146 let v1b = payload(0, OPEN_UPPER, 1);
148 let versions_at_200 = vec![
149 (TileId::new(1, 200), CELL_TOMBSTONE_SENTINEL),
150 (TileId::new(1, 100), v1b.as_slice()),
151 ];
152 let result2 = ceiling_resolve_cell(versions_at_200, &[], &live_params(200, None)).unwrap();
153 assert!(matches!(result2, CeilingResult::Tombstoned));
154 }
155
156 #[test]
157 fn gdpr_erasure_distinct_from_tombstone() {
158 let v1 = payload(0, OPEN_UPPER, 1);
159 let versions = vec![
160 (TileId::new(1, 200), CELL_GDPR_ERASURE_SENTINEL),
161 (TileId::new(1, 100), v1.as_slice()),
162 ];
163 let result = ceiling_resolve_cell(versions, &[], &live_params(300, None)).unwrap();
164 assert!(matches!(result, CeilingResult::Erased));
165 assert!(!matches!(result, CeilingResult::Tombstoned));
166 }
167
168 #[test]
169 fn valid_time_filter_falls_back_to_older_version() {
170 let v1 = payload(0, 100, 1);
172 let v2 = payload(200, 300, 2);
173
174 let make_versions = || {
175 vec![
176 (TileId::new(1, 200), v2.clone()),
177 (TileId::new(1, 100), v1.clone()),
178 ]
179 };
180
181 let r1 = ceiling_resolve_cell(
183 make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
184 &[],
185 &live_params(300, Some(50)),
186 )
187 .unwrap();
188 match r1 {
189 CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(1)),
190 other => panic!("expected Live(1) for valid_at=50, got {other:?}"),
191 }
192
193 let r2 = ceiling_resolve_cell(
195 make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
196 &[],
197 &live_params(300, Some(150)),
198 )
199 .unwrap();
200 assert!(matches!(r2, CeilingResult::NotFound));
201
202 let r3 = ceiling_resolve_cell(
204 make_versions().iter().map(|(id, b)| (*id, b.as_slice())),
205 &[],
206 &live_params(300, Some(250)),
207 )
208 .unwrap();
209 match r3 {
210 CeilingResult::Live(p) => assert_eq!(p.attrs[0], CellValue::Int64(2)),
211 other => panic!("expected Live(2) for valid_at=250, got {other:?}"),
212 }
213 }
214
215 #[test]
216 fn not_found_below_horizon() {
217 let v1 = payload(0, OPEN_UPPER, 1);
218 let versions = vec![(TileId::new(1, 100), v1.as_slice())];
220 let result = ceiling_resolve_cell(versions, &[], &live_params(50, None)).unwrap();
221 assert!(matches!(result, CeilingResult::NotFound));
222 }
223}