reddb-io-server 1.2.5

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
//! Issue #522 — Signed Writes end-to-end: signer registry + insert
//! signature verification.
//!
//! Exercises the runtime contract for `CREATE COLLECTION ... SIGNED_BY (...)`
//! collections:
//!
//! * Six INSERT outcomes — accept, MissingSignatureFields,
//!   UnknownSigner, InvalidSignature, RevokedSigner (after ALTER
//!   COLLECTION REVOKE SIGNER), and round-trip of past rows after a
//!   revoke (still readable + re-verifiable).
//! * Registry mutations recorded in `signer_history` with the right
//!   actor.
//! * `WHERE signer_pubkey = ?` query returns only the matching row.
//! * Stand-alone Ed25519 re-verification proves the engine did not
//!   corrupt the canonical payload between sign-time and read-time.
//! * Concurrent inserts from two distinct signers both land.

use ed25519_dalek::{Signer, SigningKey, Verifier, VerifyingKey};
use reddb_server::runtime::blockchain_kind::canonical_payload;
use reddb_server::storage::schema::Value;
use reddb_server::storage::signed_writes::{
    reverify_row, SIGNATURE_LEN, SIGNER_PUBKEY_LEN,
};
use reddb_server::{RedDBError, RedDBOptions, RedDBRuntime, RuntimeQueryResult};
use std::sync::Arc;

fn rt() -> RedDBRuntime {
    RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
}

fn hex_encode(bytes: &[u8]) -> String {
    let mut s = String::with_capacity(bytes.len() * 2);
    for b in bytes {
        s.push_str(&format!("{b:02x}"));
    }
    s
}

fn signing_key(seed: u8) -> SigningKey {
    SigningKey::from_bytes(&[seed; 32])
}

fn pubkey_of(sk: &SigningKey) -> [u8; SIGNER_PUBKEY_LEN] {
    sk.verifying_key().to_bytes()
}

/// Build the canonical payload the same way the engine does for a row's
/// `(name, value)` user fields. The test holds the contract end-to-end:
/// if this diverges from the engine's canonical encoding, INSERT will
/// reject the test's signature with `InvalidSignature`.
fn canonical_for(fields: &[(&str, &str)]) -> Vec<u8> {
    let owned: Vec<(String, Value)> = fields
        .iter()
        .map(|(k, v)| (k.to_string(), Value::text(v.to_string())))
        .collect();
    canonical_payload(&owned)
}

fn sign_payload(sk: &SigningKey, payload: &[u8]) -> [u8; SIGNATURE_LEN] {
    sk.sign(payload).to_bytes()
}

fn create_signed_collection(rt: &RedDBRuntime, name: &str, signers: &[&SigningKey]) {
    let hex_list = signers
        .iter()
        .map(|sk| format!("'{}'", hex_encode(&pubkey_of(sk))))
        .collect::<Vec<_>>()
        .join(", ");
    // KIND blockchain is the existing `KIND` that maps to a row-shaped
    // collection in the storage layer; the brief specifies signed
    // writes work "on any existing collection kind", and combining
    // SIGNED_BY with a row-store is the demonstrable end-to-end shape
    // for this slice. (The deeper chain+signed-writes composition is
    // owned by issue #526; this slice's runtime hook is order-
    // independent against chain mode — chain reserved columns are
    // filtered out of the canonical payload before signature checks.)
    let sql = format!("CREATE COLLECTION {name} KIND blockchain SIGNED_BY ({hex_list})");
    rt.execute_query(&sql).expect("create signed collection");
}

fn signed_insert_sql(
    table: &str,
    name: &str,
    pubkey: &[u8; SIGNER_PUBKEY_LEN],
    signature: &[u8],
) -> String {
    format!(
        "INSERT INTO {table} (name, signer_pubkey, signature) VALUES ('{name}', '{}', '{}')",
        hex_encode(pubkey),
        hex_encode(signature),
    )
}

/// Helper: locate the inserted row by `name = needle`, skipping the
/// auto-genesis row on KIND blockchain collections.
fn record_with_name<'a>(
    res: &'a RuntimeQueryResult,
    needle: &str,
) -> &'a reddb_server::storage::query::unified::UnifiedRecord {
    res.result
        .records
        .iter()
        .find(|r| {
            matches!(
                r.get("name"),
                Some(Value::Text(s)) if s.as_ref() == needle
            )
        })
        .unwrap_or_else(|| panic!("no record with name='{needle}'"))
}

#[test]
fn signed_collection_accepts_valid_signature() {
    let rt = rt();
    let sk = signing_key(1);
    create_signed_collection(&rt, "sc", &[&sk]);
    let pk = pubkey_of(&sk);
    let payload = canonical_for(&[("name", "alice")]);
    let sig = sign_payload(&sk, &payload);
    rt.execute_query(&signed_insert_sql("sc", "alice", &pk, &sig))
        .expect("valid signed insert");

    let res = rt.execute_query("SELECT * FROM sc").expect("select");
    let row = record_with_name(&res, "alice");
    // Reserved columns round-trip with the value type the caller
    // supplied — Text/hex on the SQL path.
    let stored_pk = match row.get("signer_pubkey") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("expected Text signer_pubkey, got {other:?}"),
    };
    let stored_sig = match row.get("signature") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("expected Text signature, got {other:?}"),
    };
    assert_eq!(stored_pk, hex_encode(&pk));
    assert_eq!(stored_sig, hex_encode(&sig));
}

#[test]
fn signed_collection_rejects_missing_signature_fields() {
    let rt = rt();
    let sk = signing_key(1);
    create_signed_collection(&rt, "sc", &[&sk]);
    let err = rt
        .execute_query("INSERT INTO sc (name) VALUES ('alice')")
        .expect_err("missing reserved columns must reject");
    let crate::RedDBError::InvalidOperation(msg) = &err else {
        panic!("expected InvalidOperation, got {err:?}");
    };
    assert!(
        msg.starts_with("SignedWriteError:MissingSignatureFields"),
        "got {msg}"
    );
}

#[test]
fn signed_collection_rejects_unknown_signer() {
    let rt = rt();
    let allowed = signing_key(1);
    let stranger = signing_key(99);
    create_signed_collection(&rt, "sc", &[&allowed]);
    let pk = pubkey_of(&stranger);
    let payload = canonical_for(&[("name", "alice")]);
    let sig = sign_payload(&stranger, &payload);
    let err = rt
        .execute_query(&signed_insert_sql("sc", "alice", &pk, &sig))
        .expect_err("unknown signer must reject");
    let RedDBError::InvalidOperation(msg) = &err else {
        panic!("expected InvalidOperation, got {err:?}");
    };
    assert!(
        msg.starts_with("SignedWriteError:UnknownSigner"),
        "got {msg}"
    );
}

#[test]
fn signed_collection_rejects_tampered_payload_as_invalid_signature() {
    let rt = rt();
    let sk = signing_key(1);
    create_signed_collection(&rt, "sc", &[&sk]);
    let pk = pubkey_of(&sk);
    // Sign one payload, send another.
    let signed_payload = canonical_for(&[("name", "alice")]);
    let sig = sign_payload(&sk, &signed_payload);
    let err = rt
        .execute_query(&signed_insert_sql("sc", "mallory", &pk, &sig))
        .expect_err("tampered payload must reject");
    let RedDBError::InvalidOperation(msg) = &err else {
        panic!("expected InvalidOperation, got {err:?}");
    };
    assert!(
        msg.starts_with("SignedWriteError:InvalidSignature"),
        "got {msg}"
    );
}

#[test]
fn alter_collection_add_revoke_signer_round_trip_with_history() {
    let rt = rt();
    let sk1 = signing_key(1);
    let sk2 = signing_key(2);
    create_signed_collection(&rt, "sc", &[&sk1]);

    // sk2 is not yet allowed — its insert fails.
    let payload = canonical_for(&[("name", "from-sk2")]);
    let pk2 = pubkey_of(&sk2);
    let sig2 = sign_payload(&sk2, &payload);
    let err = rt
        .execute_query(&signed_insert_sql("sc", "from-sk2", &pk2, &sig2))
        .expect_err("sk2 not yet allowed");
    assert!(matches!(
        err,
        RedDBError::InvalidOperation(ref m) if m.starts_with("SignedWriteError:UnknownSigner")
    ));

    // Add sk2 via ALTER COLLECTION.
    rt.execute_query(&format!(
        "ALTER COLLECTION sc ADD SIGNER '{}'",
        hex_encode(&pk2)
    ))
    .expect("add signer");

    // sk2 can now insert.
    rt.execute_query(&signed_insert_sql("sc", "from-sk2", &pk2, &sig2))
        .expect("after ADD SIGNER, sk2 accepted");

    // Revoke sk2.
    rt.execute_query(&format!(
        "ALTER COLLECTION sc REVOKE SIGNER '{}'",
        hex_encode(&pk2)
    ))
    .expect("revoke signer");

    // Past row (signed by sk2) is still readable. WHERE-filter by hex
    // string against the Text-typed signer_pubkey column.
    let res = rt
        .execute_query(&format!(
            "SELECT * FROM sc WHERE signer_pubkey = '{}'",
            hex_encode(&pk2)
        ))
        .expect("query by pubkey");
    assert_eq!(res.result.records.len(), 1, "past row remains readable");

    // The past row still re-verifies under a standalone Ed25519 verifier
    // — proves the engine did not corrupt the canonical payload between
    // sign-time and read-time.
    let stored_pk_hex = match res.result.records[0].get("signer_pubkey") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("expected Text signer_pubkey, got {other:?}"),
    };
    let stored_sig_hex = match res.result.records[0].get("signature") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("expected Text signature, got {other:?}"),
    };
    let stored_pk = decode_hex(&stored_pk_hex).unwrap();
    let stored_sig = decode_hex(&stored_sig_hex).unwrap();
    let mut pk_arr = [0u8; SIGNER_PUBKEY_LEN];
    pk_arr.copy_from_slice(&stored_pk);
    let mut sig_arr = [0u8; SIGNATURE_LEN];
    sig_arr.copy_from_slice(&stored_sig);
    reverify_row(&pk_arr, &sig_arr, &payload)
        .expect("standalone Ed25519 verifier accepts the stored row");

    // … but NEW inserts from sk2 now hit RevokedSigner.
    let payload2 = canonical_for(&[("name", "after-revoke")]);
    let sig2b = sign_payload(&sk2, &payload2);
    let err = rt
        .execute_query(&signed_insert_sql("sc", "after-revoke", &pk2, &sig2b))
        .expect_err("revoked signer rejected on new insert");
    let RedDBError::InvalidOperation(msg) = &err else {
        panic!("expected InvalidOperation, got {err:?}");
    };
    assert!(
        msg.starts_with("SignedWriteError:RevokedSigner"),
        "got {msg}"
    );

    // sk1 still works.
    let payload1 = canonical_for(&[("name", "from-sk1")]);
    let sig1 = sign_payload(&sk1, &payload1);
    rt.execute_query(&signed_insert_sql(
        "sc",
        "from-sk1",
        &pubkey_of(&sk1),
        &sig1,
    ))
    .expect("sk1 unaffected by sk2 revoke");
}

#[test]
fn where_signer_pubkey_filter_returns_only_matching_rows() {
    let rt = rt();
    let sk_a = signing_key(1);
    let sk_b = signing_key(2);
    create_signed_collection(&rt, "sc", &[&sk_a, &sk_b]);

    let pk_a = pubkey_of(&sk_a);
    let pk_b = pubkey_of(&sk_b);
    let payload_a = canonical_for(&[("name", "from-a")]);
    let sig_a = sign_payload(&sk_a, &payload_a);
    let payload_b = canonical_for(&[("name", "from-b")]);
    let sig_b = sign_payload(&sk_b, &payload_b);
    rt.execute_query(&signed_insert_sql("sc", "from-a", &pk_a, &sig_a))
        .expect("insert a");
    rt.execute_query(&signed_insert_sql("sc", "from-b", &pk_b, &sig_b))
        .expect("insert b");

    let res = rt
        .execute_query(&format!(
            "SELECT * FROM sc WHERE signer_pubkey = '{}'",
            hex_encode(&pk_a)
        ))
        .expect("filter");
    assert_eq!(res.result.records.len(), 1);
    assert_eq!(
        res.result.records[0].get("name"),
        Some(&Value::text("from-a".to_string()))
    );
}

fn decode_hex(s: &str) -> Option<Vec<u8>> {
    if !s.len().is_multiple_of(2) {
        return None;
    }
    let mut out = Vec::with_capacity(s.len() / 2);
    for i in (0..s.len()).step_by(2) {
        out.push(u8::from_str_radix(&s[i..i + 2], 16).ok()?);
    }
    Some(out)
}

#[test]
fn concurrent_inserts_from_two_distinct_signers_both_land() {
    let rt = Arc::new(rt());
    let sk_a = signing_key(1);
    let sk_b = signing_key(2);
    create_signed_collection(&rt, "sc", &[&sk_a, &sk_b]);

    let pk_a = pubkey_of(&sk_a);
    let pk_b = pubkey_of(&sk_b);
    let payload_a = canonical_for(&[("name", "alpha")]);
    let payload_b = canonical_for(&[("name", "bravo")]);
    let sig_a = sign_payload(&sk_a, &payload_a);
    let sig_b = sign_payload(&sk_b, &payload_b);
    let stmt_a = signed_insert_sql("sc", "alpha", &pk_a, &sig_a);
    let stmt_b = signed_insert_sql("sc", "bravo", &pk_b, &sig_b);

    let rt_a = Arc::clone(&rt);
    let rt_b = Arc::clone(&rt);
    let h_a = std::thread::spawn(move || rt_a.execute_query(&stmt_a));
    let h_b = std::thread::spawn(move || rt_b.execute_query(&stmt_b));
    h_a.join().expect("join a").expect("a accepted");
    h_b.join().expect("join b").expect("b accepted");

    let res = rt.execute_query("SELECT * FROM sc").expect("select");
    // Two user rows + the auto-genesis row on a KIND blockchain
    // collection.
    assert_eq!(res.result.records.len(), 3);
    // Both signed rows are present.
    let _ = record_with_name(&res, "alpha");
    let _ = record_with_name(&res, "bravo");
}

#[test]
fn standalone_verifier_round_trips_canonical_payload() {
    // Integration belt-and-braces: the engine's canonical_payload encoder
    // produces bytes a stock Ed25519 verifier accepts, end-to-end, with
    // the public key parsed back from raw bytes. Catches any future
    // regression where the canonical encoding changes without bumping
    // the signed-writes spec.
    let rt = rt();
    let sk = signing_key(1);
    create_signed_collection(&rt, "sc", &[&sk]);
    let pk = pubkey_of(&sk);
    let payload = canonical_for(&[("name", "carol")]);
    let sig = sign_payload(&sk, &payload);
    rt.execute_query(&signed_insert_sql("sc", "carol", &pk, &sig))
        .expect("insert");
    let res = rt.execute_query("SELECT * FROM sc").expect("select");
    let row = record_with_name(&res, "carol");
    let stored_pk_hex = match row.get("signer_pubkey") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("text expected, got {other:?}"),
    };
    let stored_sig_hex = match row.get("signature") {
        Some(Value::Text(s)) => s.to_string(),
        other => panic!("text expected, got {other:?}"),
    };
    let stored_pk = decode_hex(&stored_pk_hex).unwrap();
    let stored_sig = decode_hex(&stored_sig_hex).unwrap();
    let vk = VerifyingKey::from_bytes(
        <&[u8; SIGNER_PUBKEY_LEN]>::try_from(&stored_pk[..]).unwrap(),
    )
    .expect("vk parses");
    let mut sig_arr = [0u8; SIGNATURE_LEN];
    sig_arr.copy_from_slice(&stored_sig);
    let signature = ed25519_dalek::Signature::from_bytes(&sig_arr);
    vk.verify(&payload, &signature)
        .expect("standalone ed25519_dalek verifier accepts stored row");
}