Skip to main content

reddb_server/runtime/
blockchain_kind.rs

1//! Foundation for `KIND blockchain` collections (issue #523).
2//!
3//! Persists kind in `red_config`, exposes the reserved-column set used by
4//! every block row, scans the collection to derive the current chain tip,
5//! and wraps the hash helpers from [`crate::storage::blockchain`].
6//!
7//! This iteration does NOT validate user-supplied prev_hash/height on INSERT
8//! (the engine computes them) and does NOT enforce conflict-retry semantics
9//! — those land alongside chain-tip RPC and `verify_chain` in a later slice.
10
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::storage::blockchain::{
14    compute_block_hash, verify_chain, Block, VerifyReport, GENESIS_PREV_HASH,
15};
16use crate::storage::schema::Value;
17use crate::storage::unified::UnifiedStore;
18
19/// Value stored under `red.collection.{name}.kind` for blockchain collections.
20pub const CHAIN_KIND_TAG: &str = "chain";
21
22pub const COL_BLOCK_HEIGHT: &str = "block_height";
23pub const COL_PREV_HASH: &str = "prev_hash";
24pub const COL_TIMESTAMP: &str = "timestamp";
25pub const COL_HASH: &str = "hash";
26
27/// Reserved column names auto-filled by the engine; user INSERTs that supply
28/// them are silently overwritten so the chain remains engine-controlled.
29pub const RESERVED_COLUMNS: &[&str] = &[COL_BLOCK_HEIGHT, COL_PREV_HASH, COL_TIMESTAMP, COL_HASH];
30
31/// Chain tip derived by scanning the collection.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ChainTip {
34    /// `block_height` of the highest block. `None` if the collection has no
35    /// rows yet (pre-genesis; callers should use [`GENESIS_PREV_HASH`] for
36    /// `prev_hash` and `0` for `block_height`).
37    pub height: Option<u64>,
38    pub hash: [u8; 32],
39}
40
41impl ChainTip {
42    pub fn empty() -> Self {
43        Self {
44            height: None,
45            hash: GENESIS_PREV_HASH,
46        }
47    }
48
49    /// Returns `(prev_hash, next_height)` to use when appending a new block.
50    pub fn next(&self) -> ([u8; 32], u64) {
51        let next_height = self.height.map(|h| h + 1).unwrap_or(0);
52        (self.hash, next_height)
53    }
54}
55
56fn kind_key(collection: &str) -> String {
57    format!("red.collection.{collection}.kind")
58}
59
60/// Persist the `chain` kind marker. Append-only — only call once at creation.
61pub fn mark_as_chain(store: &UnifiedStore, collection: &str) {
62    store.set_config_tree(
63        &kind_key(collection),
64        &crate::serde_json::Value::String(CHAIN_KIND_TAG.to_string()),
65    );
66}
67
68/// True if `mark_as_chain` was ever called for this collection.
69pub fn is_chain(store: &UnifiedStore, collection: &str) -> bool {
70    match store.get_config(&kind_key(collection)) {
71        Some(Value::Text(s)) => s.as_ref() == CHAIN_KIND_TAG,
72        _ => false,
73    }
74}
75
76/// Full chain tip including timestamp. Returned by `chain_tip_full` and
77/// `GET /collections/:name/chain-tip`.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct ChainTipFull {
80    pub height: u64,
81    pub hash: [u8; 32],
82    pub timestamp_ms: u64,
83}
84
85/// Scan-based tip with timestamp. `None` when the collection has no rows
86/// (pre-genesis). Used by the chain-tip endpoint and the chain-INSERT
87/// validation path (#524).
88pub fn chain_tip_full(store: &UnifiedStore, collection: &str) -> Option<ChainTipFull> {
89    let manager = store.get_collection(collection)?;
90    let mut best: Option<ChainTipFull> = None;
91    for entity in manager.query_all(|_| true) {
92        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
93            continue;
94        };
95        let Some(named) = &row.named else { continue };
96        let height = match named.get(COL_BLOCK_HEIGHT) {
97            Some(Value::UnsignedInteger(v)) => *v,
98            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
99            _ => continue,
100        };
101        let hash = match named.get(COL_HASH) {
102            Some(Value::Blob(b)) if b.len() == 32 => {
103                let mut out = [0u8; 32];
104                out.copy_from_slice(b);
105                out
106            }
107            _ => continue,
108        };
109        let timestamp_ms = match named.get(COL_TIMESTAMP) {
110            Some(Value::UnsignedInteger(v)) => *v,
111            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
112            _ => 0,
113        };
114        match &best {
115            None => {
116                best = Some(ChainTipFull {
117                    height,
118                    hash,
119                    timestamp_ms,
120                });
121            }
122            Some(cur) if height > cur.height => {
123                best = Some(ChainTipFull {
124                    height,
125                    hash,
126                    timestamp_ms,
127                });
128            }
129            _ => {}
130        }
131    }
132    best
133}
134
135/// Scan the collection for the highest `block_height` and return its row's
136/// `hash`. O(n) — replaced by a cached tip in a later iteration.
137pub fn chain_tip(store: &UnifiedStore, collection: &str) -> ChainTip {
138    let Some(manager) = store.get_collection(collection) else {
139        return ChainTip::empty();
140    };
141    let mut best: Option<(u64, [u8; 32])> = None;
142    for entity in manager.query_all(|_| true) {
143        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
144            continue;
145        };
146        let Some(named) = &row.named else {
147            continue;
148        };
149        let height = match named.get(COL_BLOCK_HEIGHT) {
150            Some(Value::UnsignedInteger(v)) => *v,
151            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
152            _ => continue,
153        };
154        let hash = match named.get(COL_HASH) {
155            Some(Value::Blob(b)) if b.len() == 32 => {
156                let mut out = [0u8; 32];
157                out.copy_from_slice(b);
158                out
159            }
160            _ => continue,
161        };
162        match best {
163            None => best = Some((height, hash)),
164            Some((h, _)) if height > h => best = Some((height, hash)),
165            _ => {}
166        }
167    }
168    match best {
169        Some((height, hash)) => ChainTip {
170            height: Some(height),
171            hash,
172        },
173        None => ChainTip::empty(),
174    }
175}
176
177fn hex32(bytes: &[u8; 32]) -> String {
178    let mut s = String::with_capacity(64);
179    for b in bytes {
180        s.push_str(&format!("{b:02x}"));
181    }
182    s
183}
184
185/// Build the `BlockchainConflict:<json>` error payload mapped to HTTP 409
186/// (#524). The JSON body carries the current tip so the caller can retry
187/// with the right `prev_hash` / `block_height`.
188pub fn chain_conflict_error(
189    tip_height: u64,
190    tip_hash: [u8; 32],
191    tip_timestamp_ms: u64,
192    server_now_ms: u64,
193    reason: &str,
194) -> crate::api::RedDBError {
195    let body = format!(
196        "{{\"block_height\":{},\"hash\":\"{}\",\"timestamp\":{},\"server_time\":{},\"reason\":\"{}\"}}",
197        tip_height,
198        hex32(&tip_hash),
199        tip_timestamp_ms,
200        server_now_ms,
201        reason.replace('"', "'")
202    );
203    crate::api::RedDBError::InvalidOperation(format!("BlockchainConflict:{body}"))
204}
205
206pub fn now_ms() -> u64 {
207    SystemTime::now()
208        .duration_since(UNIX_EPOCH)
209        .map(|d| d.as_millis() as u64)
210        .unwrap_or(0)
211}
212
213/// Canonicalize user-supplied fields for inclusion in the block hash. Sorted
214/// by key, `<key>=<plain_text>;` joined — stable across reorderings so the
215/// recomputed hash matches regardless of column order at INSERT-time.
216pub fn canonical_payload(fields: &[(String, Value)]) -> Vec<u8> {
217    let mut pairs: Vec<(&str, String)> = fields
218        .iter()
219        .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
220        .map(|(k, v)| (k.as_str(), v.plain_text()))
221        .collect();
222    pairs.sort_by(|a, b| a.0.cmp(b.0));
223    let mut out = Vec::new();
224    for (k, v) in pairs {
225        out.extend_from_slice(k.as_bytes());
226        out.push(b'=');
227        out.extend_from_slice(v.as_bytes());
228        out.push(b';');
229    }
230    out
231}
232
233/// Build the reserved-column key/value pairs for a new block. Caller appends
234/// these to the row's `fields` AFTER stripping any user-supplied reserved
235/// columns. The returned `hash` is also returned so callers can advance the
236/// tip without recomputing.
237pub fn make_block_reserved_fields(
238    prev_hash: [u8; 32],
239    height: u64,
240    timestamp_ms: u64,
241    payload_canonical: &[u8],
242) -> (Vec<(String, Value)>, [u8; 32]) {
243    let hash = compute_block_hash(&prev_hash, height, timestamp_ms, payload_canonical, None);
244    let fields = vec![
245        (COL_BLOCK_HEIGHT.to_string(), Value::UnsignedInteger(height)),
246        (COL_PREV_HASH.to_string(), Value::Blob(prev_hash.to_vec())),
247        (
248            COL_TIMESTAMP.to_string(),
249            Value::UnsignedInteger(timestamp_ms),
250        ),
251        (COL_HASH.to_string(), Value::Blob(hash.to_vec())),
252    ];
253    (fields, hash)
254}
255
256/// Issue #525 — walk a chain collection in `block_height` order and rebuild the
257/// `Block` list expected by [`verify_chain`]. Genesis is identified by
258/// `block_height == 0`.  Returns `None` when the collection is absent or not a
259/// chain.  Reserved-column extraction and canonical payload encoding mirror
260/// what the engine writes at INSERT time — this is the alignment of the two
261/// encoders referenced in #524.
262pub fn collect_blocks(store: &UnifiedStore, collection: &str) -> Option<Vec<Block>> {
263    if !is_chain(store, collection) {
264        return None;
265    }
266    let manager = store.get_collection(collection)?;
267    let mut blocks: Vec<Block> = Vec::new();
268    for entity in manager.query_all(|_| true) {
269        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
270            continue;
271        };
272        let Some(named) = &row.named else { continue };
273        let height = match named.get(COL_BLOCK_HEIGHT) {
274            Some(Value::UnsignedInteger(v)) => *v,
275            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
276            _ => continue,
277        };
278        let prev_hash = match named.get(COL_PREV_HASH) {
279            Some(Value::Blob(b)) if b.len() == 32 => {
280                let mut out = [0u8; 32];
281                out.copy_from_slice(b);
282                out
283            }
284            _ => continue,
285        };
286        let timestamp_ms = match named.get(COL_TIMESTAMP) {
287            Some(Value::UnsignedInteger(v)) => *v,
288            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
289            _ => continue,
290        };
291        let hash = match named.get(COL_HASH) {
292            Some(Value::Blob(b)) if b.len() == 32 => {
293                let mut out = [0u8; 32];
294                out.copy_from_slice(b);
295                out
296            }
297            _ => continue,
298        };
299        let user_fields: Vec<(String, Value)> = named
300            .iter()
301            .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
302            .map(|(k, v)| (k.clone(), v.clone()))
303            .collect();
304        let payload = canonical_payload(&user_fields);
305        blocks.push(Block {
306            block_height: height,
307            prev_hash,
308            timestamp_ms,
309            payload,
310            signed: None,
311            hash,
312        });
313    }
314    blocks.sort_by_key(|b| b.block_height);
315    Some(blocks)
316}
317
318/// Outcome of `POST /collections/:name/verify-chain`.
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct VerifyChainOutcome {
321    pub checked: u64,
322    pub ok: bool,
323    pub first_bad_height: Option<u64>,
324}
325
326/// Walk the chain end-to-end. Returns `(outcome, _)`.  Mismatches are detected
327/// via [`verify_chain`].  Callers persist the integrity flag on `ok == false`.
328pub fn verify_chain_outcome(store: &UnifiedStore, collection: &str) -> Option<VerifyChainOutcome> {
329    let blocks = collect_blocks(store, collection)?;
330    let checked = blocks.len() as u64;
331    match verify_chain(&blocks) {
332        VerifyReport::Ok => Some(VerifyChainOutcome {
333            checked,
334            ok: true,
335            first_bad_height: None,
336        }),
337        VerifyReport::Inconsistent { block_height, .. } => Some(VerifyChainOutcome {
338            checked,
339            ok: false,
340            first_bad_height: Some(block_height),
341        }),
342    }
343}
344
345fn integrity_key(collection: &str) -> String {
346    format!("red.collection.{collection}.integrity")
347}
348
349const INTEGRITY_BROKEN: &str = "broken";
350const INTEGRITY_OK: &str = "ok";
351
352/// Persist a chain's integrity flag in `red_config`. Append-only — readers use
353/// [`is_integrity_broken_persisted`] which picks the latest matching row by id.
354pub fn persist_integrity_flag(store: &UnifiedStore, collection: &str, broken: bool) {
355    let tag = if broken {
356        INTEGRITY_BROKEN
357    } else {
358        INTEGRITY_OK
359    };
360    store.set_config_tree(
361        &integrity_key(collection),
362        &crate::serde_json::Value::String(tag.to_string()),
363    );
364}
365
366/// Scan `red_config` for the latest persisted integrity flag for `collection`.
367/// `None` means no record (treated as ok).
368pub fn is_integrity_broken_persisted(store: &UnifiedStore, collection: &str) -> Option<bool> {
369    let manager = store.get_collection("red_config")?;
370    let key = integrity_key(collection);
371    let mut latest: Option<(u64, String)> = None;
372    for entity in manager.query_all(|_| true) {
373        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
374            continue;
375        };
376        let Some(named) = &row.named else { continue };
377        let k_match =
378            matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == key.as_str());
379        if !k_match {
380            continue;
381        }
382        let Some(Value::Text(v)) = named.get("value") else {
383            continue;
384        };
385        let id = entity.id.raw();
386        if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
387            latest = Some((id, v.as_ref().to_string()));
388        }
389    }
390    latest.map(|(_, tag)| tag == INTEGRITY_BROKEN)
391}
392
393/// Convenience: produce the genesis row's full field list. Genesis carries
394/// an empty user payload — extra metadata is recorded in subsequent blocks.
395pub fn genesis_fields(timestamp_ms: u64) -> Vec<(String, Value)> {
396    make_block_reserved_fields(GENESIS_PREV_HASH, 0, timestamp_ms, &[]).0
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn reserved_columns_complete() {
405        assert_eq!(RESERVED_COLUMNS.len(), 4);
406        assert!(RESERVED_COLUMNS.contains(&COL_BLOCK_HEIGHT));
407        assert!(RESERVED_COLUMNS.contains(&COL_PREV_HASH));
408        assert!(RESERVED_COLUMNS.contains(&COL_TIMESTAMP));
409        assert!(RESERVED_COLUMNS.contains(&COL_HASH));
410    }
411
412    #[test]
413    fn empty_tip_advances_to_genesis_height() {
414        let tip = ChainTip::empty();
415        let (prev, height) = tip.next();
416        assert_eq!(prev, GENESIS_PREV_HASH);
417        assert_eq!(height, 0);
418    }
419
420    #[test]
421    fn tip_with_height_advances_by_one() {
422        let tip = ChainTip {
423            height: Some(7),
424            hash: [0xAB; 32],
425        };
426        let (prev, height) = tip.next();
427        assert_eq!(prev, [0xAB; 32]);
428        assert_eq!(height, 8);
429    }
430
431    #[test]
432    fn genesis_fields_carry_zero_prev_hash() {
433        let fields = genesis_fields(1_700_000_000_000);
434        let prev = fields.iter().find(|(k, _)| k == COL_PREV_HASH).unwrap();
435        match &prev.1 {
436            Value::Blob(b) => assert_eq!(&b[..], &[0u8; 32]),
437            _ => panic!("prev_hash must be Blob"),
438        }
439        let height = fields.iter().find(|(k, _)| k == COL_BLOCK_HEIGHT).unwrap();
440        assert_eq!(height.1, Value::UnsignedInteger(0));
441    }
442
443    #[test]
444    fn canonical_payload_is_order_independent() {
445        let a = vec![
446            ("user".to_string(), Value::text("alice")),
447            ("amount".to_string(), Value::Integer(100)),
448        ];
449        let b = vec![
450            ("amount".to_string(), Value::Integer(100)),
451            ("user".to_string(), Value::text("alice")),
452        ];
453        assert_eq!(canonical_payload(&a), canonical_payload(&b));
454    }
455
456    #[test]
457    fn canonical_payload_skips_reserved_columns() {
458        let fields = vec![
459            ("user".to_string(), Value::text("alice")),
460            (COL_BLOCK_HEIGHT.to_string(), Value::UnsignedInteger(42)),
461            (COL_HASH.to_string(), Value::Blob(vec![0xFF; 32])),
462        ];
463        let bytes = canonical_payload(&fields);
464        let s = String::from_utf8(bytes).unwrap();
465        assert_eq!(s, "user=alice;");
466    }
467
468    #[test]
469    fn block_hash_matches_recompute() {
470        let (fields, hash) =
471            make_block_reserved_fields(GENESIS_PREV_HASH, 0, 1_700_000_000_000, b"user=alice;");
472        let recomputed = compute_block_hash(
473            &GENESIS_PREV_HASH,
474            0,
475            1_700_000_000_000,
476            b"user=alice;",
477            None,
478        );
479        assert_eq!(hash, recomputed);
480        let stored = fields.iter().find(|(k, _)| k == COL_HASH).unwrap();
481        match &stored.1 {
482            Value::Blob(b) => assert_eq!(&b[..], &hash[..]),
483            _ => panic!("hash must be Blob"),
484        }
485    }
486}