Skip to main content

reddb_server/runtime/
integrity_tombstone.rs

1//! Issue #765 / S6 — opt-in SHA-256 integrity tombstones for input streams.
2//!
3//! Per ADR 0029 ("Integrity") and PRD #759: input streams support opt-in
4//! end-to-end SHA-256. The client streams a rolling hash over the row
5//! payloads and emits the expected digest in the terminal frame; the server
6//! recomputes the same hash and compares. Because S4 commits per chunk, rows
7//! are already durable when a mismatch is detected — rollback is impossible.
8//! Instead the server marks the affected RID range with an **integrity
9//! tombstone** in the collection metadata. Default reads filter tombstoned
10//! RIDs out of result sets.
11//!
12//! This module owns the durable representation (a JSON list persisted under
13//! a single `red_config` key) and the pure helpers the runtime + stream
14//! handler use. The runtime caches the parsed ranges in-memory so the common
15//! no-tombstone read path pays only a single relaxed atomic load.
16
17use crate::storage::schema::Value;
18use crate::storage::unified::{EntityData, UnifiedStore};
19
20/// `red_config` collection holding the durable tombstone list.
21const RED_CONFIG_COLLECTION: &str = "red_config";
22
23/// Single dot-notation key under which the whole tombstone list is stored.
24/// `set_config_tree` is append-only, so [`load_ranges`] picks the latest row
25/// for this key by entity id (mirrors `blockchain_kind`'s integrity flag).
26pub const TOMBSTONE_KEY: &str = "stream.integrity.tombstones";
27
28/// Verification mode requested for an input stream. `None` is the default
29/// (`stream.integrity.default_verify`) and incurs no hashing overhead.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
31pub enum VerifyMode {
32    #[default]
33    None,
34    Sha256,
35}
36
37impl VerifyMode {
38    /// Parse the wire token. Unknown / empty values fall back to `None` so a
39    /// malformed opt-in never terminates a stream that would otherwise run.
40    pub fn parse(token: &str) -> VerifyMode {
41        match token.trim().to_ascii_lowercase().as_str() {
42            "sha256" => VerifyMode::Sha256,
43            _ => VerifyMode::None,
44        }
45    }
46
47    pub fn is_enabled(self) -> bool {
48        matches!(self, VerifyMode::Sha256)
49    }
50}
51
52/// One tombstoned RID range, inclusive on both ends, scoped to a collection.
53/// RIDs (entity logical ids) are globally unique, but we still carry the
54/// table so reads filter precisely and forensic tooling can attribute the
55/// range to its origin.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TombstoneRange {
58    pub table: String,
59    pub lo: u64,
60    pub hi: u64,
61}
62
63impl TombstoneRange {
64    pub fn new(table: impl Into<String>, lo: u64, hi: u64) -> Self {
65        Self {
66            table: table.into(),
67            lo,
68            hi,
69        }
70    }
71
72    /// True when `rid` falls inside this tombstoned range. RIDs (entity
73    /// logical ids) are drawn from a single global counter and never reused,
74    /// so a RID identifies exactly one row across the whole store — the read
75    /// filter can therefore match on RID alone and stays correct even for
76    /// projections that drop the `collection` system field. The `table` is
77    /// retained for the error envelope and forensic attribution.
78    pub fn covers_rid(&self, rid: u64) -> bool {
79        self.lo <= rid && rid <= self.hi
80    }
81}
82
83/// Serialize the range list to the compact JSON array persisted in
84/// `red_config`. Table names are SQL identifiers (validated at OpenStream),
85/// so the only escaping needed is the JSON string quote — kept explicit here
86/// rather than pulling a serializer for a three-field record.
87pub fn serialize_ranges(ranges: &[TombstoneRange]) -> String {
88    let mut out = String::with_capacity(2 + ranges.len() * 40);
89    out.push('[');
90    for (i, r) in ranges.iter().enumerate() {
91        if i > 0 {
92            out.push(',');
93        }
94        out.push_str("{\"table\":\"");
95        for ch in r.table.chars() {
96            if ch == '"' || ch == '\\' {
97                out.push('\\');
98            }
99            out.push(ch);
100        }
101        out.push_str(&format!("\",\"lo\":{},\"hi\":{}}}", r.lo, r.hi));
102    }
103    out.push(']');
104    out
105}
106
107/// Parse the JSON array produced by [`serialize_ranges`]. Malformed entries
108/// are skipped rather than failing the whole load — a single corrupt row must
109/// not blind the reader to every other tombstone.
110pub fn parse_ranges(json: &str) -> Vec<TombstoneRange> {
111    let value: crate::json::Value = match crate::json::from_slice(json.as_bytes()) {
112        Ok(v) => v,
113        Err(_) => return Vec::new(),
114    };
115    let Some(arr) = value.as_array() else {
116        return Vec::new();
117    };
118    let mut out = Vec::with_capacity(arr.len());
119    for entry in arr {
120        let (Some(table), Some(lo), Some(hi)) = (
121            entry.get("table").and_then(crate::json::Value::as_str),
122            entry.get("lo").and_then(crate::json::Value::as_u64),
123            entry.get("hi").and_then(crate::json::Value::as_u64),
124        ) else {
125            continue;
126        };
127        out.push(TombstoneRange::new(table.to_string(), lo, hi));
128    }
129    out
130}
131
132/// Load every persisted tombstone range from `red_config`. Picks the latest
133/// row for [`TOMBSTONE_KEY`] by entity id (the key is rewritten in full on
134/// every append, so the highest-id row is the current list).
135pub fn load_ranges(store: &UnifiedStore) -> Vec<TombstoneRange> {
136    let Some(manager) = store.get_collection(RED_CONFIG_COLLECTION) else {
137        return Vec::new();
138    };
139    let mut latest: Option<(u64, String)> = None;
140    for entity in manager.query_all(|_| true) {
141        let EntityData::Row(row) = &entity.data else {
142            continue;
143        };
144        let Some(named) = &row.named else { continue };
145        let key_match =
146            matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == TOMBSTONE_KEY);
147        if !key_match {
148            continue;
149        }
150        let Some(Value::Text(v)) = named.get("value") else {
151            continue;
152        };
153        let id = entity.id.raw();
154        if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
155            latest = Some((id, v.as_ref().to_string()));
156        }
157    }
158    latest
159        .map(|(_, json)| parse_ranges(&json))
160        .unwrap_or_default()
161}
162
163/// Persist the full range list back to `red_config` under [`TOMBSTONE_KEY`].
164/// Durable via the store's WAL, so tombstones survive restart.
165pub fn persist_ranges(store: &UnifiedStore, ranges: &[TombstoneRange]) {
166    let json = serialize_ranges(ranges);
167    store.set_config_tree(TOMBSTONE_KEY, &crate::serde_json::Value::String(json));
168}
169
170/// Extract a record's RID (logical entity id) as exposed by SELECT scans.
171/// Mirrors the system-field convention in `record_search` where every
172/// scanned record carries `rid` as an unsigned integer.
173pub fn record_rid(record: &crate::storage::query::unified::UnifiedRecord) -> Option<u64> {
174    match record.get("rid")? {
175        Value::Integer(v) if *v >= 0 => Some(*v as u64),
176        Value::UnsignedInteger(v) => Some(*v),
177        _ => None,
178    }
179}
180
181/// True when a record is tombstoned by any range, matched on RID alone
182/// (RIDs are globally unique — see [`TombstoneRange::covers_rid`]). A record
183/// without a resolvable RID is never filtered (fail-open on read — a scan
184/// that cannot identify a row must not silently drop it).
185pub fn record_tombstoned(
186    ranges: &[TombstoneRange],
187    record: &crate::storage::query::unified::UnifiedRecord,
188) -> bool {
189    let Some(rid) = record_rid(record) else {
190        return false;
191    };
192    ranges.iter().any(|r| r.covers_rid(rid))
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn serialize_parse_round_trip() {
201        let ranges = vec![
202            TombstoneRange::new("orders", 10, 12),
203            TombstoneRange::new("events", 5, 5),
204        ];
205        let json = serialize_ranges(&ranges);
206        let parsed = parse_ranges(&json);
207        assert_eq!(parsed, ranges);
208    }
209
210    #[test]
211    fn parse_skips_malformed_entries_but_keeps_valid_ones() {
212        // Second entry is missing `hi` — it must be skipped, not abort the
213        // whole parse.
214        let json = r#"[{"table":"a","lo":1,"hi":3},{"table":"b","lo":4}]"#;
215        let parsed = parse_ranges(json);
216        assert_eq!(parsed, vec![TombstoneRange::new("a", 1, 3)]);
217    }
218
219    #[test]
220    fn parse_garbage_yields_empty() {
221        assert!(parse_ranges("not json").is_empty());
222        assert!(parse_ranges("{}").is_empty());
223    }
224
225    #[test]
226    fn covers_rid_is_inclusive() {
227        let r = TombstoneRange::new("t", 4, 6);
228        assert!(r.covers_rid(4));
229        assert!(r.covers_rid(5));
230        assert!(r.covers_rid(6));
231        assert!(!r.covers_rid(3));
232        assert!(!r.covers_rid(7));
233    }
234
235    #[test]
236    fn verify_mode_parse_is_lenient() {
237        assert_eq!(VerifyMode::parse("sha256"), VerifyMode::Sha256);
238        assert_eq!(VerifyMode::parse("SHA256"), VerifyMode::Sha256);
239        assert_eq!(VerifyMode::parse("none"), VerifyMode::None);
240        assert_eq!(VerifyMode::parse(""), VerifyMode::None);
241        assert_eq!(VerifyMode::parse("bogus"), VerifyMode::None);
242        assert!(VerifyMode::Sha256.is_enabled());
243        assert!(!VerifyMode::None.is_enabled());
244    }
245
246    #[test]
247    fn table_name_with_quote_is_escaped() {
248        // Identifiers are validated at OpenStream so this can't occur in
249        // practice, but the serializer must not produce invalid JSON.
250        let ranges = vec![TombstoneRange::new("a\"b", 1, 2)];
251        let json = serialize_ranges(&ranges);
252        let parsed = parse_ranges(&json);
253        assert_eq!(parsed, ranges);
254    }
255}