Skip to main content

reddb_server/runtime/
blockchain_kind.rs

1//! Foundation for `KIND blockchain` collections (issue #523).
2//!
3//! Persists kind in `red_config`, exposes the reserved-column set used by
4//! every block row, scans the collection to derive the current chain tip,
5//! and wraps the hash helpers from [`crate::storage::blockchain`].
6//!
7//! This iteration does NOT validate user-supplied prev_hash/height on INSERT
8//! (the engine computes them) and does NOT enforce conflict-retry semantics
9//! — those land alongside chain-tip RPC and `verify_chain` in a later slice.
10
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::storage::blockchain::{compute_block_hash, verify_chain, Block, GENESIS_PREV_HASH, VerifyReport};
14use crate::storage::schema::Value;
15use crate::storage::unified::UnifiedStore;
16
17/// Value stored under `red.collection.{name}.kind` for blockchain collections.
18pub const CHAIN_KIND_TAG: &str = "chain";
19
20pub const COL_BLOCK_HEIGHT: &str = "block_height";
21pub const COL_PREV_HASH: &str = "prev_hash";
22pub const COL_TIMESTAMP: &str = "timestamp";
23pub const COL_HASH: &str = "hash";
24
25/// Reserved column names auto-filled by the engine; user INSERTs that supply
26/// them are silently overwritten so the chain remains engine-controlled.
27pub const RESERVED_COLUMNS: &[&str] = &[COL_BLOCK_HEIGHT, COL_PREV_HASH, COL_TIMESTAMP, COL_HASH];
28
29/// Chain tip derived by scanning the collection.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct ChainTip {
32    /// `block_height` of the highest block. `None` if the collection has no
33    /// rows yet (pre-genesis; callers should use [`GENESIS_PREV_HASH`] for
34    /// `prev_hash` and `0` for `block_height`).
35    pub height: Option<u64>,
36    pub hash: [u8; 32],
37}
38
39impl ChainTip {
40    pub fn empty() -> Self {
41        Self {
42            height: None,
43            hash: GENESIS_PREV_HASH,
44        }
45    }
46
47    /// Returns `(prev_hash, next_height)` to use when appending a new block.
48    pub fn next(&self) -> ([u8; 32], u64) {
49        let next_height = self.height.map(|h| h + 1).unwrap_or(0);
50        (self.hash, next_height)
51    }
52}
53
54fn kind_key(collection: &str) -> String {
55    format!("red.collection.{collection}.kind")
56}
57
58/// Persist the `chain` kind marker. Append-only — only call once at creation.
59pub fn mark_as_chain(store: &UnifiedStore, collection: &str) {
60    store.set_config_tree(
61        &kind_key(collection),
62        &crate::serde_json::Value::String(CHAIN_KIND_TAG.to_string()),
63    );
64}
65
66/// True if `mark_as_chain` was ever called for this collection.
67pub fn is_chain(store: &UnifiedStore, collection: &str) -> bool {
68    match store.get_config(&kind_key(collection)) {
69        Some(Value::Text(s)) => s.as_ref() == CHAIN_KIND_TAG,
70        _ => false,
71    }
72}
73
74/// Full chain tip including timestamp. Returned by `chain_tip_full` and
75/// `GET /collections/:name/chain-tip`.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct ChainTipFull {
78    pub height: u64,
79    pub hash: [u8; 32],
80    pub timestamp_ms: u64,
81}
82
83/// Scan-based tip with timestamp. `None` when the collection has no rows
84/// (pre-genesis). Used by the chain-tip endpoint and the chain-INSERT
85/// validation path (#524).
86pub fn chain_tip_full(store: &UnifiedStore, collection: &str) -> Option<ChainTipFull> {
87    let manager = store.get_collection(collection)?;
88    let mut best: Option<ChainTipFull> = None;
89    for entity in manager.query_all(|_| true) {
90        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
91            continue;
92        };
93        let Some(named) = &row.named else { continue };
94        let height = match named.get(COL_BLOCK_HEIGHT) {
95            Some(Value::UnsignedInteger(v)) => *v,
96            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
97            _ => continue,
98        };
99        let hash = match named.get(COL_HASH) {
100            Some(Value::Blob(b)) if b.len() == 32 => {
101                let mut out = [0u8; 32];
102                out.copy_from_slice(b);
103                out
104            }
105            _ => continue,
106        };
107        let timestamp_ms = match named.get(COL_TIMESTAMP) {
108            Some(Value::UnsignedInteger(v)) => *v,
109            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
110            _ => 0,
111        };
112        match &best {
113            None => {
114                best = Some(ChainTipFull {
115                    height,
116                    hash,
117                    timestamp_ms,
118                });
119            }
120            Some(cur) if height > cur.height => {
121                best = Some(ChainTipFull {
122                    height,
123                    hash,
124                    timestamp_ms,
125                });
126            }
127            _ => {}
128        }
129    }
130    best
131}
132
133/// Scan the collection for the highest `block_height` and return its row's
134/// `hash`. O(n) — replaced by a cached tip in a later iteration.
135pub fn chain_tip(store: &UnifiedStore, collection: &str) -> ChainTip {
136    let Some(manager) = store.get_collection(collection) else {
137        return ChainTip::empty();
138    };
139    let mut best: Option<(u64, [u8; 32])> = None;
140    for entity in manager.query_all(|_| true) {
141        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
142            continue;
143        };
144        let Some(named) = &row.named else {
145            continue;
146        };
147        let height = match named.get(COL_BLOCK_HEIGHT) {
148            Some(Value::UnsignedInteger(v)) => *v,
149            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
150            _ => continue,
151        };
152        let hash = match named.get(COL_HASH) {
153            Some(Value::Blob(b)) if b.len() == 32 => {
154                let mut out = [0u8; 32];
155                out.copy_from_slice(b);
156                out
157            }
158            _ => continue,
159        };
160        match best {
161            None => best = Some((height, hash)),
162            Some((h, _)) if height > h => best = Some((height, hash)),
163            _ => {}
164        }
165    }
166    match best {
167        Some((height, hash)) => ChainTip {
168            height: Some(height),
169            hash,
170        },
171        None => ChainTip::empty(),
172    }
173}
174
175fn hex32(bytes: &[u8; 32]) -> String {
176    let mut s = String::with_capacity(64);
177    for b in bytes {
178        s.push_str(&format!("{b:02x}"));
179    }
180    s
181}
182
183/// Build the `BlockchainConflict:<json>` error payload mapped to HTTP 409
184/// (#524). The JSON body carries the current tip so the caller can retry
185/// with the right `prev_hash` / `block_height`.
186pub fn chain_conflict_error(
187    tip_height: u64,
188    tip_hash: [u8; 32],
189    tip_timestamp_ms: u64,
190    server_now_ms: u64,
191    reason: &str,
192) -> crate::api::RedDBError {
193    let body = format!(
194        "{{\"block_height\":{},\"hash\":\"{}\",\"timestamp\":{},\"server_time\":{},\"reason\":\"{}\"}}",
195        tip_height,
196        hex32(&tip_hash),
197        tip_timestamp_ms,
198        server_now_ms,
199        reason.replace('"', "'")
200    );
201    crate::api::RedDBError::InvalidOperation(format!("BlockchainConflict:{body}"))
202}
203
204pub fn now_ms() -> u64 {
205    SystemTime::now()
206        .duration_since(UNIX_EPOCH)
207        .map(|d| d.as_millis() as u64)
208        .unwrap_or(0)
209}
210
211/// Canonicalize user-supplied fields for inclusion in the block hash. Sorted
212/// by key, `<key>=<plain_text>;` joined — stable across reorderings so the
213/// recomputed hash matches regardless of column order at INSERT-time.
214pub fn canonical_payload(fields: &[(String, Value)]) -> Vec<u8> {
215    let mut pairs: Vec<(&str, String)> = fields
216        .iter()
217        .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
218        .map(|(k, v)| (k.as_str(), v.plain_text()))
219        .collect();
220    pairs.sort_by(|a, b| a.0.cmp(b.0));
221    let mut out = Vec::new();
222    for (k, v) in pairs {
223        out.extend_from_slice(k.as_bytes());
224        out.push(b'=');
225        out.extend_from_slice(v.as_bytes());
226        out.push(b';');
227    }
228    out
229}
230
231/// Build the reserved-column key/value pairs for a new block. Caller appends
232/// these to the row's `fields` AFTER stripping any user-supplied reserved
233/// columns. The returned `hash` is also returned so callers can advance the
234/// tip without recomputing.
235pub fn make_block_reserved_fields(
236    prev_hash: [u8; 32],
237    height: u64,
238    timestamp_ms: u64,
239    payload_canonical: &[u8],
240) -> (Vec<(String, Value)>, [u8; 32]) {
241    let hash = compute_block_hash(&prev_hash, height, timestamp_ms, payload_canonical, None);
242    let fields = vec![
243        (
244            COL_BLOCK_HEIGHT.to_string(),
245            Value::UnsignedInteger(height),
246        ),
247        (COL_PREV_HASH.to_string(), Value::Blob(prev_hash.to_vec())),
248        (
249            COL_TIMESTAMP.to_string(),
250            Value::UnsignedInteger(timestamp_ms),
251        ),
252        (COL_HASH.to_string(), Value::Blob(hash.to_vec())),
253    ];
254    (fields, hash)
255}
256
257/// Issue #525 — walk a chain collection in `block_height` order and rebuild the
258/// `Block` list expected by [`verify_chain`]. Genesis is identified by
259/// `block_height == 0`.  Returns `None` when the collection is absent or not a
260/// chain.  Reserved-column extraction and canonical payload encoding mirror
261/// what the engine writes at INSERT time — this is the alignment of the two
262/// encoders referenced in #524.
263pub fn collect_blocks(store: &UnifiedStore, collection: &str) -> Option<Vec<Block>> {
264    if !is_chain(store, collection) {
265        return None;
266    }
267    let manager = store.get_collection(collection)?;
268    let mut blocks: Vec<Block> = Vec::new();
269    for entity in manager.query_all(|_| true) {
270        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
271            continue;
272        };
273        let Some(named) = &row.named else { continue };
274        let height = match named.get(COL_BLOCK_HEIGHT) {
275            Some(Value::UnsignedInteger(v)) => *v,
276            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
277            _ => continue,
278        };
279        let prev_hash = match named.get(COL_PREV_HASH) {
280            Some(Value::Blob(b)) if b.len() == 32 => {
281                let mut out = [0u8; 32];
282                out.copy_from_slice(b);
283                out
284            }
285            _ => continue,
286        };
287        let timestamp_ms = match named.get(COL_TIMESTAMP) {
288            Some(Value::UnsignedInteger(v)) => *v,
289            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
290            _ => continue,
291        };
292        let hash = match named.get(COL_HASH) {
293            Some(Value::Blob(b)) if b.len() == 32 => {
294                let mut out = [0u8; 32];
295                out.copy_from_slice(b);
296                out
297            }
298            _ => continue,
299        };
300        let user_fields: Vec<(String, Value)> = named
301            .iter()
302            .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
303            .map(|(k, v)| (k.clone(), v.clone()))
304            .collect();
305        let payload = canonical_payload(&user_fields);
306        blocks.push(Block {
307            block_height: height,
308            prev_hash,
309            timestamp_ms,
310            payload,
311            signed: None,
312            hash,
313        });
314    }
315    blocks.sort_by_key(|b| b.block_height);
316    Some(blocks)
317}
318
319/// Outcome of `POST /collections/:name/verify-chain`.
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct VerifyChainOutcome {
322    pub checked: u64,
323    pub ok: bool,
324    pub first_bad_height: Option<u64>,
325}
326
327/// Walk the chain end-to-end. Returns `(outcome, _)`.  Mismatches are detected
328/// via [`verify_chain`].  Callers persist the integrity flag on `ok == false`.
329pub fn verify_chain_outcome(store: &UnifiedStore, collection: &str) -> Option<VerifyChainOutcome> {
330    let blocks = collect_blocks(store, collection)?;
331    let checked = blocks.len() as u64;
332    match verify_chain(&blocks) {
333        VerifyReport::Ok => Some(VerifyChainOutcome {
334            checked,
335            ok: true,
336            first_bad_height: None,
337        }),
338        VerifyReport::Inconsistent { block_height, .. } => Some(VerifyChainOutcome {
339            checked,
340            ok: false,
341            first_bad_height: Some(block_height),
342        }),
343    }
344}
345
346fn integrity_key(collection: &str) -> String {
347    format!("red.collection.{collection}.integrity")
348}
349
350const INTEGRITY_BROKEN: &str = "broken";
351const INTEGRITY_OK: &str = "ok";
352
353/// Persist a chain's integrity flag in `red_config`. Append-only — readers use
354/// [`is_integrity_broken_persisted`] which picks the latest matching row by id.
355pub fn persist_integrity_flag(store: &UnifiedStore, collection: &str, broken: bool) {
356    let tag = if broken { INTEGRITY_BROKEN } else { INTEGRITY_OK };
357    store.set_config_tree(
358        &integrity_key(collection),
359        &crate::serde_json::Value::String(tag.to_string()),
360    );
361}
362
363/// Scan `red_config` for the latest persisted integrity flag for `collection`.
364/// `None` means no record (treated as ok).
365pub fn is_integrity_broken_persisted(store: &UnifiedStore, collection: &str) -> Option<bool> {
366    let manager = store.get_collection("red_config")?;
367    let key = integrity_key(collection);
368    let mut latest: Option<(u64, String)> = None;
369    for entity in manager.query_all(|_| true) {
370        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
371            continue;
372        };
373        let Some(named) = &row.named else { continue };
374        let k_match = matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == key.as_str());
375        if !k_match {
376            continue;
377        }
378        let Some(Value::Text(v)) = named.get("value") else {
379            continue;
380        };
381        let id = entity.id.raw();
382        if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
383            latest = Some((id, v.as_ref().to_string()));
384        }
385    }
386    latest.map(|(_, tag)| tag == INTEGRITY_BROKEN)
387}
388
389/// Convenience: produce the genesis row's full field list. Genesis carries
390/// an empty user payload — extra metadata is recorded in subsequent blocks.
391pub fn genesis_fields(timestamp_ms: u64) -> Vec<(String, Value)> {
392    make_block_reserved_fields(GENESIS_PREV_HASH, 0, timestamp_ms, &[]).0
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn reserved_columns_complete() {
401        assert_eq!(RESERVED_COLUMNS.len(), 4);
402        assert!(RESERVED_COLUMNS.contains(&COL_BLOCK_HEIGHT));
403        assert!(RESERVED_COLUMNS.contains(&COL_PREV_HASH));
404        assert!(RESERVED_COLUMNS.contains(&COL_TIMESTAMP));
405        assert!(RESERVED_COLUMNS.contains(&COL_HASH));
406    }
407
408    #[test]
409    fn empty_tip_advances_to_genesis_height() {
410        let tip = ChainTip::empty();
411        let (prev, height) = tip.next();
412        assert_eq!(prev, GENESIS_PREV_HASH);
413        assert_eq!(height, 0);
414    }
415
416    #[test]
417    fn tip_with_height_advances_by_one() {
418        let tip = ChainTip {
419            height: Some(7),
420            hash: [0xAB; 32],
421        };
422        let (prev, height) = tip.next();
423        assert_eq!(prev, [0xAB; 32]);
424        assert_eq!(height, 8);
425    }
426
427    #[test]
428    fn genesis_fields_carry_zero_prev_hash() {
429        let fields = genesis_fields(1_700_000_000_000);
430        let prev = fields.iter().find(|(k, _)| k == COL_PREV_HASH).unwrap();
431        match &prev.1 {
432            Value::Blob(b) => assert_eq!(&b[..], &[0u8; 32]),
433            _ => panic!("prev_hash must be Blob"),
434        }
435        let height = fields.iter().find(|(k, _)| k == COL_BLOCK_HEIGHT).unwrap();
436        assert_eq!(height.1, Value::UnsignedInteger(0));
437    }
438
439    #[test]
440    fn canonical_payload_is_order_independent() {
441        let a = vec![
442            ("user".to_string(), Value::text("alice")),
443            ("amount".to_string(), Value::Integer(100)),
444        ];
445        let b = vec![
446            ("amount".to_string(), Value::Integer(100)),
447            ("user".to_string(), Value::text("alice")),
448        ];
449        assert_eq!(canonical_payload(&a), canonical_payload(&b));
450    }
451
452    #[test]
453    fn canonical_payload_skips_reserved_columns() {
454        let fields = vec![
455            ("user".to_string(), Value::text("alice")),
456            (
457                COL_BLOCK_HEIGHT.to_string(),
458                Value::UnsignedInteger(42),
459            ),
460            (COL_HASH.to_string(), Value::Blob(vec![0xFF; 32])),
461        ];
462        let bytes = canonical_payload(&fields);
463        let s = String::from_utf8(bytes).unwrap();
464        assert_eq!(s, "user=alice;");
465    }
466
467    #[test]
468    fn block_hash_matches_recompute() {
469        let (fields, hash) = make_block_reserved_fields(
470            GENESIS_PREV_HASH,
471            0,
472            1_700_000_000_000,
473            b"user=alice;",
474        );
475        let recomputed =
476            compute_block_hash(&GENESIS_PREV_HASH, 0, 1_700_000_000_000, b"user=alice;", None);
477        assert_eq!(hash, recomputed);
478        let stored = fields.iter().find(|(k, _)| k == COL_HASH).unwrap();
479        match &stored.1 {
480            Value::Blob(b) => assert_eq!(&b[..], &hash[..]),
481            _ => panic!("hash must be Blob"),
482        }
483    }
484}