reddb_server/runtime/
integrity_tombstone.rs1use crate::storage::schema::Value;
18use crate::storage::unified::{EntityData, UnifiedStore};
19
20const RED_CONFIG_COLLECTION: &str = "red_config";
22
23pub const TOMBSTONE_KEY: &str = "stream.integrity.tombstones";
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
31pub enum VerifyMode {
32 #[default]
33 None,
34 Sha256,
35}
36
37impl VerifyMode {
38 pub fn parse(token: &str) -> VerifyMode {
41 match token.trim().to_ascii_lowercase().as_str() {
42 "sha256" => VerifyMode::Sha256,
43 _ => VerifyMode::None,
44 }
45 }
46
47 pub fn is_enabled(self) -> bool {
48 matches!(self, VerifyMode::Sha256)
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TombstoneRange {
58 pub table: String,
59 pub lo: u64,
60 pub hi: u64,
61}
62
63impl TombstoneRange {
64 pub fn new(table: impl Into<String>, lo: u64, hi: u64) -> Self {
65 Self {
66 table: table.into(),
67 lo,
68 hi,
69 }
70 }
71
72 pub fn covers_rid(&self, rid: u64) -> bool {
79 self.lo <= rid && rid <= self.hi
80 }
81}
82
83pub fn serialize_ranges(ranges: &[TombstoneRange]) -> String {
88 let mut out = String::with_capacity(2 + ranges.len() * 40);
89 out.push('[');
90 for (i, r) in ranges.iter().enumerate() {
91 if i > 0 {
92 out.push(',');
93 }
94 out.push_str("{\"table\":\"");
95 for ch in r.table.chars() {
96 if ch == '"' || ch == '\\' {
97 out.push('\\');
98 }
99 out.push(ch);
100 }
101 out.push_str(&format!("\",\"lo\":{},\"hi\":{}}}", r.lo, r.hi));
102 }
103 out.push(']');
104 out
105}
106
107pub fn parse_ranges(json: &str) -> Vec<TombstoneRange> {
111 let value: crate::json::Value = match crate::json::from_slice(json.as_bytes()) {
112 Ok(v) => v,
113 Err(_) => return Vec::new(),
114 };
115 let Some(arr) = value.as_array() else {
116 return Vec::new();
117 };
118 let mut out = Vec::with_capacity(arr.len());
119 for entry in arr {
120 let (Some(table), Some(lo), Some(hi)) = (
121 entry.get("table").and_then(crate::json::Value::as_str),
122 entry.get("lo").and_then(crate::json::Value::as_u64),
123 entry.get("hi").and_then(crate::json::Value::as_u64),
124 ) else {
125 continue;
126 };
127 out.push(TombstoneRange::new(table.to_string(), lo, hi));
128 }
129 out
130}
131
132pub fn load_ranges(store: &UnifiedStore) -> Vec<TombstoneRange> {
136 let Some(manager) = store.get_collection(RED_CONFIG_COLLECTION) else {
137 return Vec::new();
138 };
139 let mut latest: Option<(u64, String)> = None;
140 for entity in manager.query_all(|_| true) {
141 let EntityData::Row(row) = &entity.data else {
142 continue;
143 };
144 let Some(named) = &row.named else { continue };
145 let key_match =
146 matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == TOMBSTONE_KEY);
147 if !key_match {
148 continue;
149 }
150 let Some(Value::Text(v)) = named.get("value") else {
151 continue;
152 };
153 let id = entity.id.raw();
154 if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
155 latest = Some((id, v.as_ref().to_string()));
156 }
157 }
158 latest
159 .map(|(_, json)| parse_ranges(&json))
160 .unwrap_or_default()
161}
162
163pub fn persist_ranges(store: &UnifiedStore, ranges: &[TombstoneRange]) {
166 let json = serialize_ranges(ranges);
167 store.set_config_tree(TOMBSTONE_KEY, &crate::serde_json::Value::String(json));
168}
169
170pub fn record_rid(record: &crate::storage::query::unified::UnifiedRecord) -> Option<u64> {
174 match record.get("rid")? {
175 Value::Integer(v) if *v >= 0 => Some(*v as u64),
176 Value::UnsignedInteger(v) => Some(*v),
177 _ => None,
178 }
179}
180
181pub fn record_tombstoned(
186 ranges: &[TombstoneRange],
187 record: &crate::storage::query::unified::UnifiedRecord,
188) -> bool {
189 let Some(rid) = record_rid(record) else {
190 return false;
191 };
192 ranges.iter().any(|r| r.covers_rid(rid))
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn serialize_parse_round_trip() {
201 let ranges = vec![
202 TombstoneRange::new("orders", 10, 12),
203 TombstoneRange::new("events", 5, 5),
204 ];
205 let json = serialize_ranges(&ranges);
206 let parsed = parse_ranges(&json);
207 assert_eq!(parsed, ranges);
208 }
209
210 #[test]
211 fn parse_skips_malformed_entries_but_keeps_valid_ones() {
212 let json = r#"[{"table":"a","lo":1,"hi":3},{"table":"b","lo":4}]"#;
215 let parsed = parse_ranges(json);
216 assert_eq!(parsed, vec![TombstoneRange::new("a", 1, 3)]);
217 }
218
219 #[test]
220 fn parse_garbage_yields_empty() {
221 assert!(parse_ranges("not json").is_empty());
222 assert!(parse_ranges("{}").is_empty());
223 }
224
225 #[test]
226 fn covers_rid_is_inclusive() {
227 let r = TombstoneRange::new("t", 4, 6);
228 assert!(r.covers_rid(4));
229 assert!(r.covers_rid(5));
230 assert!(r.covers_rid(6));
231 assert!(!r.covers_rid(3));
232 assert!(!r.covers_rid(7));
233 }
234
235 #[test]
236 fn verify_mode_parse_is_lenient() {
237 assert_eq!(VerifyMode::parse("sha256"), VerifyMode::Sha256);
238 assert_eq!(VerifyMode::parse("SHA256"), VerifyMode::Sha256);
239 assert_eq!(VerifyMode::parse("none"), VerifyMode::None);
240 assert_eq!(VerifyMode::parse(""), VerifyMode::None);
241 assert_eq!(VerifyMode::parse("bogus"), VerifyMode::None);
242 assert!(VerifyMode::Sha256.is_enabled());
243 assert!(!VerifyMode::None.is_enabled());
244 }
245
246 #[test]
247 fn table_name_with_quote_is_escaped() {
248 let ranges = vec![TombstoneRange::new("a\"b", 1, 2)];
251 let json = serialize_ranges(&ranges);
252 let parsed = parse_ranges(&json);
253 assert_eq!(parsed, ranges);
254 }
255}