Skip to main content

reddb_server/runtime/
signed_writes_kind.rs

1//! Issue #522 — runtime wiring for `CREATE COLLECTION ... SIGNED_BY (...)`
2//! collections.
3//!
4//! The pure logic — registry + verify_insert + error taxonomy — lives in
5//! [`crate::storage::signed_writes`]. This module is the thin adapter
6//! that:
7//!
8//! 1. Persists the per-collection signer registry on the existing
9//!    `red_config` config tree under
10//!    `red.collection.{name}.signed_writes.*` so it survives restarts.
11//! 2. Loads the registry on demand for the INSERT-time verification
12//!    path and the `ALTER COLLECTION ... ADD|REVOKE SIGNER` executor.
13//! 3. Builds the canonical bytes the client must have signed by
14//!    reusing the engine's existing canonical-payload encoding
15//!    ([`super::blockchain_kind::canonical_payload`]) with the two
16//!    signed-writes reserved columns stripped — same encoding the
17//!    blockchain hash binds, so no new on-the-wire spec is introduced.
18
19use crate::storage::schema::Value;
20use crate::storage::signed_writes::{
21    verify_insert, InsertSignatureFields, SignedWriteError, SignerHistoryAction,
22    SignerHistoryEntry, SignerRegistry, RESERVED_SIGNATURE_COL, RESERVED_SIGNER_PUBKEY_COL,
23    SIGNATURE_LEN, SIGNER_PUBKEY_LEN,
24};
25use crate::storage::unified::UnifiedStore;
26
27use std::time::{SystemTime, UNIX_EPOCH};
28
29/// Marker stored at `red.collection.{name}.signed_writes.enabled = true`
30/// when the collection was created with a non-empty `SIGNED_BY` list.
31const ENABLED_SUFFIX: &str = "signed_writes.enabled";
32
33/// Single JSON-encoded array of currently-allowed Ed25519 pubkeys
34/// (lowercase hex). Stored as one text value to keep the read path one
35/// `get_config` hop instead of a tree scan.
36const ALLOWED_SUFFIX: &str = "signed_writes.allowed_json";
37
38/// Single JSON-encoded array of [`SignerHistoryEntry`] records.
39/// Append-only; revoke pushes a `Revoke` entry rather than rewriting the
40/// `Add` row, so the audit trail is preserved across registry
41/// mutations.
42const HISTORY_SUFFIX: &str = "signed_writes.history_json";
43
44fn key(name: &str, suffix: &str) -> String {
45    format!("red.collection.{name}.{suffix}")
46}
47
48fn now_ms() -> u128 {
49    SystemTime::now()
50        .duration_since(UNIX_EPOCH)
51        .map(|d| d.as_millis())
52        .unwrap_or(0)
53}
54
55fn hex_encode(bytes: &[u8]) -> String {
56    let mut s = String::with_capacity(bytes.len() * 2);
57    for b in bytes {
58        s.push_str(&format!("{b:02x}"));
59    }
60    s
61}
62
63fn hex_decode_32(s: &str) -> Option<[u8; SIGNER_PUBKEY_LEN]> {
64    if s.len() != SIGNER_PUBKEY_LEN * 2 {
65        return None;
66    }
67    let mut out = [0u8; SIGNER_PUBKEY_LEN];
68    for i in 0..SIGNER_PUBKEY_LEN {
69        out[i] = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
70    }
71    Some(out)
72}
73
74fn action_str(a: SignerHistoryAction) -> &'static str {
75    match a {
76        SignerHistoryAction::Add => "add",
77        SignerHistoryAction::Revoke => "revoke",
78    }
79}
80
81fn action_from_str(s: &str) -> Option<SignerHistoryAction> {
82    match s {
83        "add" => Some(SignerHistoryAction::Add),
84        "revoke" => Some(SignerHistoryAction::Revoke),
85        _ => None,
86    }
87}
88
89fn entry_to_json(e: &SignerHistoryEntry) -> crate::serde_json::Value {
90    let mut obj = crate::serde_json::Map::new();
91    obj.insert(
92        "action".to_string(),
93        crate::serde_json::Value::String(action_str(e.action).to_string()),
94    );
95    obj.insert(
96        "pubkey".to_string(),
97        crate::serde_json::Value::String(hex_encode(&e.pubkey)),
98    );
99    obj.insert(
100        "actor".to_string(),
101        crate::serde_json::Value::String(e.actor.clone()),
102    );
103    obj.insert(
104        "ts_unix_ms".to_string(),
105        crate::serde_json::Value::Number(e.ts_unix_ms as f64),
106    );
107    crate::serde_json::Value::Object(obj)
108}
109
110fn entry_from_json(v: &crate::serde_json::Value) -> Option<SignerHistoryEntry> {
111    let obj = v.as_object()?;
112    let action = action_from_str(obj.get("action")?.as_str()?)?;
113    let pubkey = hex_decode_32(obj.get("pubkey")?.as_str()?)?;
114    let actor = obj.get("actor")?.as_str()?.to_string();
115    let ts_unix_ms = obj.get("ts_unix_ms")?.as_u64()? as u128;
116    Some(SignerHistoryEntry {
117        action,
118        pubkey,
119        actor,
120        ts_unix_ms,
121    })
122}
123
124/// Returns true if `CREATE COLLECTION ... SIGNED_BY (...)` was issued
125/// (or `ALTER COLLECTION ... ADD SIGNER` has been used to enable the
126/// registry) and there is at least a marker in `red_config`.
127pub fn is_signed(store: &UnifiedStore, collection: &str) -> bool {
128    matches!(
129        store.get_config(&key(collection, ENABLED_SUFFIX)),
130        Some(Value::Boolean(true)) | Some(Value::Text(_))
131    )
132}
133
134/// Persist the registry-bearing marker plus the initial allowed-signer
135/// list. Idempotent: re-calling with the same list is a no-op if a
136/// registry is already installed.
137pub fn install(
138    store: &UnifiedStore,
139    collection: &str,
140    initial: &[[u8; SIGNER_PUBKEY_LEN]],
141    actor: &str,
142) {
143    if is_signed(store, collection) {
144        return;
145    }
146    let reg = SignerRegistry::from_initial(initial, actor.to_string(), now_ms());
147    write_registry(store, collection, &reg);
148    // Mark enabled last so a partial install never leaves the marker
149    // without payload.
150    store.set_config_tree(
151        &key(collection, ENABLED_SUFFIX),
152        &crate::serde_json::Value::Bool(true),
153    );
154}
155
156/// Serialise registry state into the config tree. Overwrites any prior
157/// value at the same key — the store treats `red_config` as
158/// insert-only, but the read path returns the most recent matching row.
159fn write_registry(store: &UnifiedStore, collection: &str, reg: &SignerRegistry) {
160    let allowed: Vec<crate::serde_json::Value> = reg
161        .allowed()
162        .map(|pk| crate::serde_json::Value::String(hex_encode(pk)))
163        .collect();
164    let history: Vec<crate::serde_json::Value> =
165        reg.history().iter().map(entry_to_json).collect();
166    store.set_config_tree(
167        &key(collection, ALLOWED_SUFFIX),
168        &crate::serde_json::Value::String(crate::serde_json::Value::Array(allowed).to_string()),
169    );
170    store.set_config_tree(
171        &key(collection, HISTORY_SUFFIX),
172        &crate::serde_json::Value::String(crate::serde_json::Value::Array(history).to_string()),
173    );
174}
175
176/// Read the *latest* value stored under a `red_config` key.
177///
178/// `UnifiedStore::get_config` returns the *first* matching row, which
179/// for append-only configs means the oldest write wins. Registry
180/// mutations need the newest write, so we scan and keep the last
181/// match.
182fn read_latest_config(store: &UnifiedStore, full_key: &str) -> Option<Value> {
183    let manager = store.get_collection("red_config")?;
184    // `red_config` is append-only: every set rewrites by appending a new
185    // row. The growing-segment iterator backs entities with a HashMap so
186    // iteration order is non-deterministic — sort by the engine-assigned
187    // monotonic `EntityId` descending and take the first match to get
188    // the most recent write.
189    let mut all = manager.query_all(|_| true);
190    all.sort_by(|a, b| b.id.raw().cmp(&a.id.raw()));
191    for entity in all {
192        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
193            continue;
194        };
195        let Some(named) = &row.named else { continue };
196        let matches = matches!(
197            named.get("key"),
198            Some(Value::Text(s)) if s.as_ref() == full_key
199        );
200        if matches {
201            return named.get("value").cloned();
202        }
203    }
204    None
205}
206
207fn read_registry(store: &UnifiedStore, collection: &str) -> SignerRegistry {
208    let allowed_json = match read_latest_config(store, &key(collection, ALLOWED_SUFFIX)) {
209        Some(Value::Text(s)) => s.to_string(),
210        _ => "[]".to_string(),
211    };
212    let history_json = match read_latest_config(store, &key(collection, HISTORY_SUFFIX)) {
213        Some(Value::Text(s)) => s.to_string(),
214        _ => "[]".to_string(),
215    };
216    let parsed_allowed: Vec<[u8; SIGNER_PUBKEY_LEN]> = match crate::utils::json::parse_json(
217        &allowed_json,
218    ) {
219        Ok(v) => match crate::serde_json::Value::from(v) {
220            crate::serde_json::Value::Array(arr) => arr
221                .iter()
222                .filter_map(|v| v.as_str().and_then(hex_decode_32))
223                .collect(),
224            _ => Vec::new(),
225        },
226        Err(_) => Vec::new(),
227    };
228    let parsed_history: Vec<SignerHistoryEntry> = match crate::utils::json::parse_json(
229        &history_json,
230    ) {
231        Ok(v) => match crate::serde_json::Value::from(v) {
232            crate::serde_json::Value::Array(arr) => {
233                arr.iter().filter_map(entry_from_json).collect()
234            }
235            _ => Vec::new(),
236        },
237        Err(_) => Vec::new(),
238    };
239    SignerRegistry::from_persisted_parts(parsed_allowed, parsed_history)
240}
241
242/// Load the current registry. Cheap enough for the INSERT hot path:
243/// two `red_config` reads + two JSON parses, no scan of the parent
244/// collection.
245pub fn registry(store: &UnifiedStore, collection: &str) -> SignerRegistry {
246    read_registry(store, collection)
247}
248
249/// Apply `ALTER COLLECTION ... ADD SIGNER 'hex'` on a signed-writes
250/// collection. Returns `true` if the registry actually changed.
251pub fn add_signer(
252    store: &UnifiedStore,
253    collection: &str,
254    pubkey: [u8; SIGNER_PUBKEY_LEN],
255    actor: &str,
256) -> bool {
257    let mut reg = read_registry(store, collection);
258    let changed = reg.add_signer(pubkey, actor.to_string(), now_ms());
259    if changed {
260        write_registry(store, collection, &reg);
261    }
262    changed
263}
264
265/// Apply `ALTER COLLECTION ... REVOKE SIGNER 'hex'` on a signed-writes
266/// collection. Returns `true` if the key was previously allowed.
267pub fn revoke_signer(
268    store: &UnifiedStore,
269    collection: &str,
270    pubkey: &[u8; SIGNER_PUBKEY_LEN],
271    actor: &str,
272) -> bool {
273    let mut reg = read_registry(store, collection);
274    let changed = reg.revoke_signer(pubkey, actor.to_string(), now_ms());
275    if changed {
276        write_registry(store, collection, &reg);
277    }
278    changed
279}
280
281/// Reserved column set automatically present on every signed-writes
282/// collection. Filtered out of the canonical-payload bytes the client
283/// signs.
284pub const RESERVED_COLUMNS: &[&str] = &[RESERVED_SIGNER_PUBKEY_COL, RESERVED_SIGNATURE_COL];
285
286/// Pulled-apart signer / signature reserved columns. Carries:
287///
288/// * The user's original `Value` (for round-trip storage so SELECT and
289///   `WHERE signer_pubkey = '<hex>'` predicates compare against the
290///   same encoding the caller supplied — Text-typed hex on the JSON /
291///   SQL path, Blob on the binary protobuf path).
292/// * The decoded raw bytes used to verify the Ed25519 signature.
293pub struct SignerColumn {
294    pub raw_value: Value,
295    pub bytes: Vec<u8>,
296}
297
298/// Pull the `signer_pubkey` and `signature` values out of the row's
299/// fields. Returns the parsed reserved columns + the residual field
300/// list (fields stripped of the two reserved columns) — the residual
301/// goes into the canonical payload.
302pub fn split_signature_fields(
303    fields: Vec<(String, Value)>,
304) -> (Option<SignerColumn>, Option<SignerColumn>, Vec<(String, Value)>) {
305    let mut pubkey: Option<SignerColumn> = None;
306    let mut signature: Option<SignerColumn> = None;
307    let mut residual: Vec<(String, Value)> = Vec::with_capacity(fields.len());
308    for (k, v) in fields {
309        if k == RESERVED_SIGNER_PUBKEY_COL {
310            let bytes = match &v {
311                Value::Blob(b) => Some(b.clone()),
312                // Accept hex-encoded pubkey from JSON / SQL callers
313                // that can't easily express literal blobs.
314                Value::Text(s) => decode_hex(s.as_ref()),
315                _ => None,
316            };
317            if let Some(bytes) = bytes {
318                pubkey = Some(SignerColumn { raw_value: v, bytes });
319            }
320            continue;
321        }
322        if k == RESERVED_SIGNATURE_COL {
323            let bytes = match &v {
324                Value::Blob(b) => Some(b.clone()),
325                Value::Text(s) => decode_hex(s.as_ref()),
326                _ => None,
327            };
328            if let Some(bytes) = bytes {
329                signature = Some(SignerColumn { raw_value: v, bytes });
330            }
331            continue;
332        }
333        residual.push((k, v));
334    }
335    (pubkey, signature, residual)
336}
337
338fn decode_hex(s: &str) -> Option<Vec<u8>> {
339    if !s.len().is_multiple_of(2) {
340        return None;
341    }
342    let mut out = Vec::with_capacity(s.len() / 2);
343    for i in (0..s.len()).step_by(2) {
344        out.push(u8::from_str_radix(&s[i..i + 2], 16).ok()?);
345    }
346    Some(out)
347}
348
349/// Top-level INSERT-time check used by the engine. Computes the
350/// canonical payload from the (already reserved-column-stripped)
351/// residual fields and dispatches to [`verify_insert`].
352pub fn verify_row(
353    registry: &SignerRegistry,
354    signer_pubkey: Option<&[u8]>,
355    signature: Option<&[u8]>,
356    canonical_payload: &[u8],
357) -> Result<(), SignedWriteError> {
358    verify_insert(
359        registry,
360        &InsertSignatureFields {
361            signer_pubkey,
362            signature,
363        },
364        canonical_payload,
365    )
366}
367
368/// Map a [`SignedWriteError`] onto a [`RedDBError`] whose marker prefix
369/// is picked up by the transport-layer status mapper.
370///
371/// | Variant                  | Prefix                                     | HTTP |
372/// |--------------------------|--------------------------------------------|------|
373/// | `MissingSignatureFields` | `SignedWriteError:MissingSignatureFields:` | 400  |
374/// | `MalformedSignerPubkey`  | `SignedWriteError:MalformedSignerPubkey`   | 400  |
375/// | `MalformedSignature`     | `SignedWriteError:MalformedSignature`      | 400  |
376/// | `UnknownSigner`          | `SignedWriteError:UnknownSigner`           | 401  |
377/// | `RevokedSigner`          | `SignedWriteError:RevokedSigner`           | 401  |
378/// | `InvalidSignature`       | `SignedWriteError:InvalidSignature`        | 401  |
379pub fn map_error(err: SignedWriteError) -> crate::api::RedDBError {
380    let body = match &err {
381        SignedWriteError::MissingSignatureFields { fields } => {
382            format!("SignedWriteError:MissingSignatureFields:{}", fields.join(","))
383        }
384        SignedWriteError::UnknownSigner { pubkey } => {
385            format!("SignedWriteError:UnknownSigner:{}", hex_encode(pubkey))
386        }
387        SignedWriteError::RevokedSigner { pubkey } => {
388            format!("SignedWriteError:RevokedSigner:{}", hex_encode(pubkey))
389        }
390        SignedWriteError::InvalidSignature => "SignedWriteError:InvalidSignature".to_string(),
391        SignedWriteError::MalformedSignerPubkey => {
392            "SignedWriteError:MalformedSignerPubkey".to_string()
393        }
394        SignedWriteError::MalformedSignature => "SignedWriteError:MalformedSignature".to_string(),
395    };
396    crate::api::RedDBError::InvalidOperation(body)
397}
398
399/// Length sanity: a signature blob must be exactly 64 bytes. Surfaced
400/// to the caller so it can return `MalformedSignature` before computing
401/// the canonical payload.
402pub const SIGNATURE_BYTES: usize = SIGNATURE_LEN;
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use ed25519_dalek::{Signer, SigningKey};
408
409    fn signing_key(seed: u8) -> SigningKey {
410        SigningKey::from_bytes(&[seed; 32])
411    }
412
413    fn pubkey_of(sk: &SigningKey) -> [u8; SIGNER_PUBKEY_LEN] {
414        sk.verifying_key().to_bytes()
415    }
416
417    fn make_store() -> UnifiedStore {
418        UnifiedStore::new()
419    }
420
421    #[test]
422    fn install_and_read_roundtrip_preserves_registry() {
423        let store = make_store();
424        let pk1 = pubkey_of(&signing_key(1));
425        let pk2 = pubkey_of(&signing_key(2));
426        install(&store, "sc", &[pk1, pk2], "@system/create");
427        assert!(is_signed(&store, "sc"));
428        let reg = registry(&store, "sc");
429        assert_eq!(reg.allowed_len(), 2);
430        assert!(reg.is_allowed(&pk1));
431        assert!(reg.is_allowed(&pk2));
432        assert_eq!(reg.history().len(), 2);
433    }
434
435    #[test]
436    fn add_signer_persists_and_records_history() {
437        let store = make_store();
438        let pk1 = pubkey_of(&signing_key(1));
439        install(&store, "sc", &[pk1], "@system/create");
440        let pk2 = pubkey_of(&signing_key(2));
441        assert!(add_signer(&store, "sc", pk2, "admin:alice"));
442        // Idempotent re-add returns false.
443        assert!(!add_signer(&store, "sc", pk2, "admin:alice"));
444        let reg = registry(&store, "sc");
445        assert!(reg.is_allowed(&pk2));
446        assert_eq!(reg.history().len(), 2);
447        let last = reg.history().last().unwrap();
448        assert_eq!(last.action, SignerHistoryAction::Add);
449        assert_eq!(last.actor, "admin:alice");
450    }
451
452    #[test]
453    fn revoke_signer_blocks_future_inserts_but_history_preserved() {
454        let store = make_store();
455        let sk = signing_key(7);
456        let pk = pubkey_of(&sk);
457        install(&store, "sc", &[pk], "@system/create");
458        assert!(revoke_signer(&store, "sc", &pk, "admin:bob"));
459        let reg = registry(&store, "sc");
460        assert!(!reg.is_allowed(&pk));
461        assert!(reg.ever_added(&pk));
462        let last = reg.history().last().unwrap();
463        assert_eq!(last.action, SignerHistoryAction::Revoke);
464        assert_eq!(last.actor, "admin:bob");
465    }
466
467    #[test]
468    fn split_signature_fields_extracts_blob_columns() {
469        let fields = vec![
470            ("name".to_string(), Value::text("alice".to_string())),
471            (RESERVED_SIGNER_PUBKEY_COL.to_string(), Value::Blob(vec![0x11; 32])),
472            (RESERVED_SIGNATURE_COL.to_string(), Value::Blob(vec![0x22; 64])),
473        ];
474        let (pk, sig, residual) = split_signature_fields(fields);
475        assert_eq!(pk.as_ref().unwrap().bytes.len(), 32);
476        assert!(matches!(pk.unwrap().raw_value, Value::Blob(_)));
477        assert_eq!(sig.as_ref().unwrap().bytes.len(), 64);
478        assert!(matches!(sig.unwrap().raw_value, Value::Blob(_)));
479        assert_eq!(residual.len(), 1);
480        assert_eq!(residual[0].0, "name");
481    }
482
483    #[test]
484    fn split_signature_fields_accepts_hex_text() {
485        let pk_hex = "11".repeat(32);
486        let sig_hex = "22".repeat(64);
487        let fields = vec![
488            (RESERVED_SIGNER_PUBKEY_COL.to_string(), Value::text(pk_hex)),
489            (RESERVED_SIGNATURE_COL.to_string(), Value::text(sig_hex)),
490        ];
491        let (pk, sig, residual) = split_signature_fields(fields);
492        assert_eq!(pk.as_ref().unwrap().bytes, vec![0x11; 32]);
493        assert!(matches!(pk.unwrap().raw_value, Value::Text(_)));
494        assert_eq!(sig.as_ref().unwrap().bytes, vec![0x22; 64]);
495        assert!(matches!(sig.unwrap().raw_value, Value::Text(_)));
496        assert!(residual.is_empty());
497    }
498
499    #[test]
500    fn map_error_carries_variant_prefix() {
501        let pk = [0u8; SIGNER_PUBKEY_LEN];
502        match map_error(SignedWriteError::UnknownSigner { pubkey: pk }) {
503            crate::api::RedDBError::InvalidOperation(s) => {
504                assert!(s.starts_with("SignedWriteError:UnknownSigner"));
505            }
506            other => panic!("unexpected mapping: {other:?}"),
507        }
508        match map_error(SignedWriteError::InvalidSignature) {
509            crate::api::RedDBError::InvalidOperation(s) => {
510                assert_eq!(s, "SignedWriteError:InvalidSignature");
511            }
512            other => panic!("unexpected mapping: {other:?}"),
513        }
514    }
515
516    #[test]
517    fn verify_row_accepts_valid_signature_over_canonical_payload() {
518        let sk = signing_key(3);
519        let pk = pubkey_of(&sk);
520        let store = make_store();
521        install(&store, "sc", &[pk], "@system/create");
522        let payload = b"hello-world";
523        let sig = sk.sign(payload).to_bytes();
524        let reg = registry(&store, "sc");
525        verify_row(&reg, Some(&pk), Some(&sig), payload).unwrap();
526    }
527
528    #[test]
529    fn verify_row_rejects_tampered_payload() {
530        let sk = signing_key(4);
531        let pk = pubkey_of(&sk);
532        let store = make_store();
533        install(&store, "sc", &[pk], "@system/create");
534        let payload = b"hello-world";
535        let sig = sk.sign(payload).to_bytes();
536        let reg = registry(&store, "sc");
537        let err = verify_row(&reg, Some(&pk), Some(&sig), b"tampered").unwrap_err();
538        assert_eq!(err, SignedWriteError::InvalidSignature);
539    }
540}