use crate::storage::schema::Value;
use crate::storage::unified::{EntityData, UnifiedStore};
const RED_CONFIG_COLLECTION: &str = "red_config";
pub const TOMBSTONE_KEY: &str = "stream.integrity.tombstones";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VerifyMode {
#[default]
None,
Sha256,
}
impl VerifyMode {
pub fn parse(token: &str) -> VerifyMode {
match token.trim().to_ascii_lowercase().as_str() {
"sha256" => VerifyMode::Sha256,
_ => VerifyMode::None,
}
}
pub fn is_enabled(self) -> bool {
matches!(self, VerifyMode::Sha256)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TombstoneRange {
pub table: String,
pub lo: u64,
pub hi: u64,
}
impl TombstoneRange {
pub fn new(table: impl Into<String>, lo: u64, hi: u64) -> Self {
Self {
table: table.into(),
lo,
hi,
}
}
pub fn covers_rid(&self, rid: u64) -> bool {
self.lo <= rid && rid <= self.hi
}
}
pub fn serialize_ranges(ranges: &[TombstoneRange]) -> String {
let mut out = String::with_capacity(2 + ranges.len() * 40);
out.push('[');
for (i, r) in ranges.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push_str("{\"table\":\"");
for ch in r.table.chars() {
if ch == '"' || ch == '\\' {
out.push('\\');
}
out.push(ch);
}
out.push_str(&format!("\",\"lo\":{},\"hi\":{}}}", r.lo, r.hi));
}
out.push(']');
out
}
pub fn parse_ranges(json: &str) -> Vec<TombstoneRange> {
let value: crate::json::Value = match crate::json::from_slice(json.as_bytes()) {
Ok(v) => v,
Err(_) => return Vec::new(),
};
let Some(arr) = value.as_array() else {
return Vec::new();
};
let mut out = Vec::with_capacity(arr.len());
for entry in arr {
let (Some(table), Some(lo), Some(hi)) = (
entry.get("table").and_then(crate::json::Value::as_str),
entry.get("lo").and_then(crate::json::Value::as_u64),
entry.get("hi").and_then(crate::json::Value::as_u64),
) else {
continue;
};
out.push(TombstoneRange::new(table.to_string(), lo, hi));
}
out
}
pub fn load_ranges(store: &UnifiedStore) -> Vec<TombstoneRange> {
let Some(manager) = store.get_collection(RED_CONFIG_COLLECTION) else {
return Vec::new();
};
let mut latest: Option<(u64, String)> = None;
for entity in manager.query_all(|_| true) {
let EntityData::Row(row) = &entity.data else {
continue;
};
let Some(named) = &row.named else { continue };
let key_match =
matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == TOMBSTONE_KEY);
if !key_match {
continue;
}
let Some(Value::Text(v)) = named.get("value") else {
continue;
};
let id = entity.id.raw();
if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
latest = Some((id, v.as_ref().to_string()));
}
}
latest
.map(|(_, json)| parse_ranges(&json))
.unwrap_or_default()
}
pub fn persist_ranges(store: &UnifiedStore, ranges: &[TombstoneRange]) {
let json = serialize_ranges(ranges);
store.set_config_tree(TOMBSTONE_KEY, &crate::serde_json::Value::String(json));
}
pub fn record_rid(record: &crate::storage::query::unified::UnifiedRecord) -> Option<u64> {
match record.get("rid")? {
Value::Integer(v) if *v >= 0 => Some(*v as u64),
Value::UnsignedInteger(v) => Some(*v),
_ => None,
}
}
pub fn record_tombstoned(
ranges: &[TombstoneRange],
record: &crate::storage::query::unified::UnifiedRecord,
) -> bool {
let Some(rid) = record_rid(record) else {
return false;
};
ranges.iter().any(|r| r.covers_rid(rid))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serialize_parse_round_trip() {
let ranges = vec![
TombstoneRange::new("orders", 10, 12),
TombstoneRange::new("events", 5, 5),
];
let json = serialize_ranges(&ranges);
let parsed = parse_ranges(&json);
assert_eq!(parsed, ranges);
}
#[test]
fn parse_skips_malformed_entries_but_keeps_valid_ones() {
let json = r#"[{"table":"a","lo":1,"hi":3},{"table":"b","lo":4}]"#;
let parsed = parse_ranges(json);
assert_eq!(parsed, vec![TombstoneRange::new("a", 1, 3)]);
}
#[test]
fn parse_garbage_yields_empty() {
assert!(parse_ranges("not json").is_empty());
assert!(parse_ranges("{}").is_empty());
}
#[test]
fn covers_rid_is_inclusive() {
let r = TombstoneRange::new("t", 4, 6);
assert!(r.covers_rid(4));
assert!(r.covers_rid(5));
assert!(r.covers_rid(6));
assert!(!r.covers_rid(3));
assert!(!r.covers_rid(7));
}
#[test]
fn verify_mode_parse_is_lenient() {
assert_eq!(VerifyMode::parse("sha256"), VerifyMode::Sha256);
assert_eq!(VerifyMode::parse("SHA256"), VerifyMode::Sha256);
assert_eq!(VerifyMode::parse("none"), VerifyMode::None);
assert_eq!(VerifyMode::parse(""), VerifyMode::None);
assert_eq!(VerifyMode::parse("bogus"), VerifyMode::None);
assert!(VerifyMode::Sha256.is_enabled());
assert!(!VerifyMode::None.is_enabled());
}
#[test]
fn table_name_with_quote_is_escaped() {
let ranges = vec![TombstoneRange::new("a\"b", 1, 2)];
let json = serialize_ranges(&ranges);
let parsed = parse_ranges(&json);
assert_eq!(parsed, ranges);
}
}