reddb-io-server 1.2.4

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
//! Foundation for `KIND blockchain` collections (issue #523).
//!
//! Persists kind in `red_config`, exposes the reserved-column set used by
//! every block row, scans the collection to derive the current chain tip,
//! and wraps the hash helpers from [`crate::storage::blockchain`].
//!
//! This iteration does NOT validate user-supplied prev_hash/height on INSERT
//! (the engine computes them) and does NOT enforce conflict-retry semantics
//! — those land alongside chain-tip RPC and `verify_chain` in a later slice.

use std::time::{SystemTime, UNIX_EPOCH};

use crate::storage::blockchain::{compute_block_hash, verify_chain, Block, GENESIS_PREV_HASH, VerifyReport};
use crate::storage::schema::Value;
use crate::storage::unified::UnifiedStore;

/// Value stored under `red.collection.{name}.kind` for blockchain collections.
pub const CHAIN_KIND_TAG: &str = "chain";

pub const COL_BLOCK_HEIGHT: &str = "block_height";
pub const COL_PREV_HASH: &str = "prev_hash";
pub const COL_TIMESTAMP: &str = "timestamp";
pub const COL_HASH: &str = "hash";

/// Reserved column names auto-filled by the engine; user INSERTs that supply
/// them are silently overwritten so the chain remains engine-controlled.
pub const RESERVED_COLUMNS: &[&str] = &[COL_BLOCK_HEIGHT, COL_PREV_HASH, COL_TIMESTAMP, COL_HASH];

/// Chain tip derived by scanning the collection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChainTip {
    /// `block_height` of the highest block. `None` if the collection has no
    /// rows yet (pre-genesis; callers should use [`GENESIS_PREV_HASH`] for
    /// `prev_hash` and `0` for `block_height`).
    pub height: Option<u64>,
    pub hash: [u8; 32],
}

impl ChainTip {
    pub fn empty() -> Self {
        Self {
            height: None,
            hash: GENESIS_PREV_HASH,
        }
    }

    /// Returns `(prev_hash, next_height)` to use when appending a new block.
    pub fn next(&self) -> ([u8; 32], u64) {
        let next_height = self.height.map(|h| h + 1).unwrap_or(0);
        (self.hash, next_height)
    }
}

fn kind_key(collection: &str) -> String {
    format!("red.collection.{collection}.kind")
}

/// Persist the `chain` kind marker. Append-only — only call once at creation.
pub fn mark_as_chain(store: &UnifiedStore, collection: &str) {
    store.set_config_tree(
        &kind_key(collection),
        &crate::serde_json::Value::String(CHAIN_KIND_TAG.to_string()),
    );
}

/// True if `mark_as_chain` was ever called for this collection.
pub fn is_chain(store: &UnifiedStore, collection: &str) -> bool {
    match store.get_config(&kind_key(collection)) {
        Some(Value::Text(s)) => s.as_ref() == CHAIN_KIND_TAG,
        _ => false,
    }
}

/// Full chain tip including timestamp. Returned by `chain_tip_full` and
/// `GET /collections/:name/chain-tip`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChainTipFull {
    pub height: u64,
    pub hash: [u8; 32],
    pub timestamp_ms: u64,
}

/// Scan-based tip with timestamp. `None` when the collection has no rows
/// (pre-genesis). Used by the chain-tip endpoint and the chain-INSERT
/// validation path (#524).
pub fn chain_tip_full(store: &UnifiedStore, collection: &str) -> Option<ChainTipFull> {
    let manager = store.get_collection(collection)?;
    let mut best: Option<ChainTipFull> = None;
    for entity in manager.query_all(|_| true) {
        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
            continue;
        };
        let Some(named) = &row.named else { continue };
        let height = match named.get(COL_BLOCK_HEIGHT) {
            Some(Value::UnsignedInteger(v)) => *v,
            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
            _ => continue,
        };
        let hash = match named.get(COL_HASH) {
            Some(Value::Blob(b)) if b.len() == 32 => {
                let mut out = [0u8; 32];
                out.copy_from_slice(b);
                out
            }
            _ => continue,
        };
        let timestamp_ms = match named.get(COL_TIMESTAMP) {
            Some(Value::UnsignedInteger(v)) => *v,
            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
            _ => 0,
        };
        match &best {
            None => {
                best = Some(ChainTipFull {
                    height,
                    hash,
                    timestamp_ms,
                });
            }
            Some(cur) if height > cur.height => {
                best = Some(ChainTipFull {
                    height,
                    hash,
                    timestamp_ms,
                });
            }
            _ => {}
        }
    }
    best
}

/// Scan the collection for the highest `block_height` and return its row's
/// `hash`. O(n) — replaced by a cached tip in a later iteration.
pub fn chain_tip(store: &UnifiedStore, collection: &str) -> ChainTip {
    let Some(manager) = store.get_collection(collection) else {
        return ChainTip::empty();
    };
    let mut best: Option<(u64, [u8; 32])> = None;
    for entity in manager.query_all(|_| true) {
        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
            continue;
        };
        let Some(named) = &row.named else {
            continue;
        };
        let height = match named.get(COL_BLOCK_HEIGHT) {
            Some(Value::UnsignedInteger(v)) => *v,
            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
            _ => continue,
        };
        let hash = match named.get(COL_HASH) {
            Some(Value::Blob(b)) if b.len() == 32 => {
                let mut out = [0u8; 32];
                out.copy_from_slice(b);
                out
            }
            _ => continue,
        };
        match best {
            None => best = Some((height, hash)),
            Some((h, _)) if height > h => best = Some((height, hash)),
            _ => {}
        }
    }
    match best {
        Some((height, hash)) => ChainTip {
            height: Some(height),
            hash,
        },
        None => ChainTip::empty(),
    }
}

fn hex32(bytes: &[u8; 32]) -> String {
    let mut s = String::with_capacity(64);
    for b in bytes {
        s.push_str(&format!("{b:02x}"));
    }
    s
}

/// Build the `BlockchainConflict:<json>` error payload mapped to HTTP 409
/// (#524). The JSON body carries the current tip so the caller can retry
/// with the right `prev_hash` / `block_height`.
pub fn chain_conflict_error(
    tip_height: u64,
    tip_hash: [u8; 32],
    tip_timestamp_ms: u64,
    server_now_ms: u64,
    reason: &str,
) -> crate::api::RedDBError {
    let body = format!(
        "{{\"block_height\":{},\"hash\":\"{}\",\"timestamp\":{},\"server_time\":{},\"reason\":\"{}\"}}",
        tip_height,
        hex32(&tip_hash),
        tip_timestamp_ms,
        server_now_ms,
        reason.replace('"', "'")
    );
    crate::api::RedDBError::InvalidOperation(format!("BlockchainConflict:{body}"))
}

pub fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}

/// Canonicalize user-supplied fields for inclusion in the block hash. Sorted
/// by key, `<key>=<plain_text>;` joined — stable across reorderings so the
/// recomputed hash matches regardless of column order at INSERT-time.
pub fn canonical_payload(fields: &[(String, Value)]) -> Vec<u8> {
    let mut pairs: Vec<(&str, String)> = fields
        .iter()
        .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
        .map(|(k, v)| (k.as_str(), v.plain_text()))
        .collect();
    pairs.sort_by(|a, b| a.0.cmp(b.0));
    let mut out = Vec::new();
    for (k, v) in pairs {
        out.extend_from_slice(k.as_bytes());
        out.push(b'=');
        out.extend_from_slice(v.as_bytes());
        out.push(b';');
    }
    out
}

/// Build the reserved-column key/value pairs for a new block. Caller appends
/// these to the row's `fields` AFTER stripping any user-supplied reserved
/// columns. The returned `hash` is also returned so callers can advance the
/// tip without recomputing.
pub fn make_block_reserved_fields(
    prev_hash: [u8; 32],
    height: u64,
    timestamp_ms: u64,
    payload_canonical: &[u8],
) -> (Vec<(String, Value)>, [u8; 32]) {
    let hash = compute_block_hash(&prev_hash, height, timestamp_ms, payload_canonical, None);
    let fields = vec![
        (
            COL_BLOCK_HEIGHT.to_string(),
            Value::UnsignedInteger(height),
        ),
        (COL_PREV_HASH.to_string(), Value::Blob(prev_hash.to_vec())),
        (
            COL_TIMESTAMP.to_string(),
            Value::UnsignedInteger(timestamp_ms),
        ),
        (COL_HASH.to_string(), Value::Blob(hash.to_vec())),
    ];
    (fields, hash)
}

/// Issue #525 — walk a chain collection in `block_height` order and rebuild the
/// `Block` list expected by [`verify_chain`]. Genesis is identified by
/// `block_height == 0`.  Returns `None` when the collection is absent or not a
/// chain.  Reserved-column extraction and canonical payload encoding mirror
/// what the engine writes at INSERT time — this is the alignment of the two
/// encoders referenced in #524.
pub fn collect_blocks(store: &UnifiedStore, collection: &str) -> Option<Vec<Block>> {
    if !is_chain(store, collection) {
        return None;
    }
    let manager = store.get_collection(collection)?;
    let mut blocks: Vec<Block> = Vec::new();
    for entity in manager.query_all(|_| true) {
        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
            continue;
        };
        let Some(named) = &row.named else { continue };
        let height = match named.get(COL_BLOCK_HEIGHT) {
            Some(Value::UnsignedInteger(v)) => *v,
            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
            _ => continue,
        };
        let prev_hash = match named.get(COL_PREV_HASH) {
            Some(Value::Blob(b)) if b.len() == 32 => {
                let mut out = [0u8; 32];
                out.copy_from_slice(b);
                out
            }
            _ => continue,
        };
        let timestamp_ms = match named.get(COL_TIMESTAMP) {
            Some(Value::UnsignedInteger(v)) => *v,
            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
            _ => continue,
        };
        let hash = match named.get(COL_HASH) {
            Some(Value::Blob(b)) if b.len() == 32 => {
                let mut out = [0u8; 32];
                out.copy_from_slice(b);
                out
            }
            _ => continue,
        };
        let user_fields: Vec<(String, Value)> = named
            .iter()
            .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
            .map(|(k, v)| (k.clone(), v.clone()))
            .collect();
        let payload = canonical_payload(&user_fields);
        blocks.push(Block {
            block_height: height,
            prev_hash,
            timestamp_ms,
            payload,
            signed: None,
            hash,
        });
    }
    blocks.sort_by_key(|b| b.block_height);
    Some(blocks)
}

/// Outcome of `POST /collections/:name/verify-chain`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VerifyChainOutcome {
    pub checked: u64,
    pub ok: bool,
    pub first_bad_height: Option<u64>,
}

/// Walk the chain end-to-end. Returns `(outcome, _)`.  Mismatches are detected
/// via [`verify_chain`].  Callers persist the integrity flag on `ok == false`.
pub fn verify_chain_outcome(store: &UnifiedStore, collection: &str) -> Option<VerifyChainOutcome> {
    let blocks = collect_blocks(store, collection)?;
    let checked = blocks.len() as u64;
    match verify_chain(&blocks) {
        VerifyReport::Ok => Some(VerifyChainOutcome {
            checked,
            ok: true,
            first_bad_height: None,
        }),
        VerifyReport::Inconsistent { block_height, .. } => Some(VerifyChainOutcome {
            checked,
            ok: false,
            first_bad_height: Some(block_height),
        }),
    }
}

fn integrity_key(collection: &str) -> String {
    format!("red.collection.{collection}.integrity")
}

const INTEGRITY_BROKEN: &str = "broken";
const INTEGRITY_OK: &str = "ok";

/// Persist a chain's integrity flag in `red_config`. Append-only — readers use
/// [`is_integrity_broken_persisted`] which picks the latest matching row by id.
pub fn persist_integrity_flag(store: &UnifiedStore, collection: &str, broken: bool) {
    let tag = if broken { INTEGRITY_BROKEN } else { INTEGRITY_OK };
    store.set_config_tree(
        &integrity_key(collection),
        &crate::serde_json::Value::String(tag.to_string()),
    );
}

/// Scan `red_config` for the latest persisted integrity flag for `collection`.
/// `None` means no record (treated as ok).
pub fn is_integrity_broken_persisted(store: &UnifiedStore, collection: &str) -> Option<bool> {
    let manager = store.get_collection("red_config")?;
    let key = integrity_key(collection);
    let mut latest: Option<(u64, String)> = None;
    for entity in manager.query_all(|_| true) {
        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
            continue;
        };
        let Some(named) = &row.named else { continue };
        let k_match = matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == key.as_str());
        if !k_match {
            continue;
        }
        let Some(Value::Text(v)) = named.get("value") else {
            continue;
        };
        let id = entity.id.raw();
        if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
            latest = Some((id, v.as_ref().to_string()));
        }
    }
    latest.map(|(_, tag)| tag == INTEGRITY_BROKEN)
}

/// Convenience: produce the genesis row's full field list. Genesis carries
/// an empty user payload — extra metadata is recorded in subsequent blocks.
pub fn genesis_fields(timestamp_ms: u64) -> Vec<(String, Value)> {
    make_block_reserved_fields(GENESIS_PREV_HASH, 0, timestamp_ms, &[]).0
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn reserved_columns_complete() {
        assert_eq!(RESERVED_COLUMNS.len(), 4);
        assert!(RESERVED_COLUMNS.contains(&COL_BLOCK_HEIGHT));
        assert!(RESERVED_COLUMNS.contains(&COL_PREV_HASH));
        assert!(RESERVED_COLUMNS.contains(&COL_TIMESTAMP));
        assert!(RESERVED_COLUMNS.contains(&COL_HASH));
    }

    #[test]
    fn empty_tip_advances_to_genesis_height() {
        let tip = ChainTip::empty();
        let (prev, height) = tip.next();
        assert_eq!(prev, GENESIS_PREV_HASH);
        assert_eq!(height, 0);
    }

    #[test]
    fn tip_with_height_advances_by_one() {
        let tip = ChainTip {
            height: Some(7),
            hash: [0xAB; 32],
        };
        let (prev, height) = tip.next();
        assert_eq!(prev, [0xAB; 32]);
        assert_eq!(height, 8);
    }

    #[test]
    fn genesis_fields_carry_zero_prev_hash() {
        let fields = genesis_fields(1_700_000_000_000);
        let prev = fields.iter().find(|(k, _)| k == COL_PREV_HASH).unwrap();
        match &prev.1 {
            Value::Blob(b) => assert_eq!(&b[..], &[0u8; 32]),
            _ => panic!("prev_hash must be Blob"),
        }
        let height = fields.iter().find(|(k, _)| k == COL_BLOCK_HEIGHT).unwrap();
        assert_eq!(height.1, Value::UnsignedInteger(0));
    }

    #[test]
    fn canonical_payload_is_order_independent() {
        let a = vec![
            ("user".to_string(), Value::text("alice")),
            ("amount".to_string(), Value::Integer(100)),
        ];
        let b = vec![
            ("amount".to_string(), Value::Integer(100)),
            ("user".to_string(), Value::text("alice")),
        ];
        assert_eq!(canonical_payload(&a), canonical_payload(&b));
    }

    #[test]
    fn canonical_payload_skips_reserved_columns() {
        let fields = vec![
            ("user".to_string(), Value::text("alice")),
            (
                COL_BLOCK_HEIGHT.to_string(),
                Value::UnsignedInteger(42),
            ),
            (COL_HASH.to_string(), Value::Blob(vec![0xFF; 32])),
        ];
        let bytes = canonical_payload(&fields);
        let s = String::from_utf8(bytes).unwrap();
        assert_eq!(s, "user=alice;");
    }

    #[test]
    fn block_hash_matches_recompute() {
        let (fields, hash) = make_block_reserved_fields(
            GENESIS_PREV_HASH,
            0,
            1_700_000_000_000,
            b"user=alice;",
        );
        let recomputed =
            compute_block_hash(&GENESIS_PREV_HASH, 0, 1_700_000_000_000, b"user=alice;", None);
        assert_eq!(hash, recomputed);
        let stored = fields.iter().find(|(k, _)| k == COL_HASH).unwrap();
        match &stored.1 {
            Value::Blob(b) => assert_eq!(&b[..], &hash[..]),
            _ => panic!("hash must be Blob"),
        }
    }
}