Skip to main content

reddb_server/runtime/
signed_writes_kind.rs

1//! Issue #522 — runtime wiring for `CREATE COLLECTION ... SIGNED_BY (...)`
2//! collections.
3//!
4//! The pure logic — registry + verify_insert + error taxonomy — lives in
5//! [`crate::storage::signed_writes`]. This module is the thin adapter
6//! that:
7//!
8//! 1. Persists the per-collection signer registry on the existing
9//!    `red_config` config tree under
10//!    `red.collection.{name}.signed_writes.*` so it survives restarts.
11//! 2. Loads the registry on demand for the INSERT-time verification
12//!    path and the `ALTER COLLECTION ... ADD|REVOKE SIGNER` executor.
13//! 3. Builds the canonical bytes the client must have signed by
14//!    reusing the engine's existing canonical-payload encoding
15//!    ([`super::blockchain_kind::canonical_payload`]) with the two
16//!    signed-writes reserved columns stripped — same encoding the
17//!    blockchain hash binds, so no new on-the-wire spec is introduced.
18
19use crate::storage::schema::Value;
20use crate::storage::signed_writes::{
21    verify_insert, InsertSignatureFields, SignedWriteError, SignerHistoryAction,
22    SignerHistoryEntry, SignerRegistry, RESERVED_SIGNATURE_COL, RESERVED_SIGNER_PUBKEY_COL,
23    SIGNATURE_LEN, SIGNER_PUBKEY_LEN,
24};
25use crate::storage::unified::UnifiedStore;
26
27use std::time::{SystemTime, UNIX_EPOCH};
28
29/// Marker stored at `red.collection.{name}.signed_writes.enabled = true`
30/// when the collection was created with a non-empty `SIGNED_BY` list.
31const ENABLED_SUFFIX: &str = "signed_writes.enabled";
32
33/// Single JSON-encoded array of currently-allowed Ed25519 pubkeys
34/// (lowercase hex). Stored as one text value to keep the read path one
35/// `get_config` hop instead of a tree scan.
36const ALLOWED_SUFFIX: &str = "signed_writes.allowed_json";
37
38/// Single JSON-encoded array of [`SignerHistoryEntry`] records.
39/// Append-only; revoke pushes a `Revoke` entry rather than rewriting the
40/// `Add` row, so the audit trail is preserved across registry
41/// mutations.
42const HISTORY_SUFFIX: &str = "signed_writes.history_json";
43
44fn key(name: &str, suffix: &str) -> String {
45    format!("red.collection.{name}.{suffix}")
46}
47
48fn now_ms() -> u128 {
49    SystemTime::now()
50        .duration_since(UNIX_EPOCH)
51        .map(|d| d.as_millis())
52        .unwrap_or(0)
53}
54
55fn hex_encode(bytes: &[u8]) -> String {
56    let mut s = String::with_capacity(bytes.len() * 2);
57    for b in bytes {
58        s.push_str(&format!("{b:02x}"));
59    }
60    s
61}
62
63fn hex_decode_32(s: &str) -> Option<[u8; SIGNER_PUBKEY_LEN]> {
64    if s.len() != SIGNER_PUBKEY_LEN * 2 {
65        return None;
66    }
67    let mut out = [0u8; SIGNER_PUBKEY_LEN];
68    for i in 0..SIGNER_PUBKEY_LEN {
69        out[i] = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
70    }
71    Some(out)
72}
73
74fn action_str(a: SignerHistoryAction) -> &'static str {
75    match a {
76        SignerHistoryAction::Add => "add",
77        SignerHistoryAction::Revoke => "revoke",
78    }
79}
80
81fn action_from_str(s: &str) -> Option<SignerHistoryAction> {
82    match s {
83        "add" => Some(SignerHistoryAction::Add),
84        "revoke" => Some(SignerHistoryAction::Revoke),
85        _ => None,
86    }
87}
88
89fn entry_to_json(e: &SignerHistoryEntry) -> crate::serde_json::Value {
90    let mut obj = crate::serde_json::Map::new();
91    obj.insert(
92        "action".to_string(),
93        crate::serde_json::Value::String(action_str(e.action).to_string()),
94    );
95    obj.insert(
96        "pubkey".to_string(),
97        crate::serde_json::Value::String(hex_encode(&e.pubkey)),
98    );
99    obj.insert(
100        "actor".to_string(),
101        crate::serde_json::Value::String(e.actor.clone()),
102    );
103    obj.insert(
104        "ts_unix_ms".to_string(),
105        crate::serde_json::Value::Number(e.ts_unix_ms as f64),
106    );
107    crate::serde_json::Value::Object(obj)
108}
109
110fn entry_from_json(v: &crate::serde_json::Value) -> Option<SignerHistoryEntry> {
111    let obj = v.as_object()?;
112    let action = action_from_str(obj.get("action")?.as_str()?)?;
113    let pubkey = hex_decode_32(obj.get("pubkey")?.as_str()?)?;
114    let actor = obj.get("actor")?.as_str()?.to_string();
115    let ts_unix_ms = obj.get("ts_unix_ms")?.as_u64()? as u128;
116    Some(SignerHistoryEntry {
117        action,
118        pubkey,
119        actor,
120        ts_unix_ms,
121    })
122}
123
124/// Returns true if `CREATE COLLECTION ... SIGNED_BY (...)` was issued
125/// (or `ALTER COLLECTION ... ADD SIGNER` has been used to enable the
126/// registry) and there is at least a marker in `red_config`.
127pub fn is_signed(store: &UnifiedStore, collection: &str) -> bool {
128    matches!(
129        store.get_config(&key(collection, ENABLED_SUFFIX)),
130        Some(Value::Boolean(true)) | Some(Value::Text(_))
131    )
132}
133
134/// Persist the registry-bearing marker plus the initial allowed-signer
135/// list. Idempotent: re-calling with the same list is a no-op if a
136/// registry is already installed.
137pub fn install(
138    store: &UnifiedStore,
139    collection: &str,
140    initial: &[[u8; SIGNER_PUBKEY_LEN]],
141    actor: &str,
142) {
143    if is_signed(store, collection) {
144        return;
145    }
146    let reg = SignerRegistry::from_initial(initial, actor.to_string(), now_ms());
147    write_registry(store, collection, &reg);
148    // Mark enabled last so a partial install never leaves the marker
149    // without payload.
150    store.set_config_tree(
151        &key(collection, ENABLED_SUFFIX),
152        &crate::serde_json::Value::Bool(true),
153    );
154}
155
156/// Serialise registry state into the config tree. Overwrites any prior
157/// value at the same key — the store treats `red_config` as
158/// insert-only, but the read path returns the most recent matching row.
159fn write_registry(store: &UnifiedStore, collection: &str, reg: &SignerRegistry) {
160    let allowed: Vec<crate::serde_json::Value> = reg
161        .allowed()
162        .map(|pk| crate::serde_json::Value::String(hex_encode(pk)))
163        .collect();
164    let history: Vec<crate::serde_json::Value> = reg.history().iter().map(entry_to_json).collect();
165    store.set_config_tree(
166        &key(collection, ALLOWED_SUFFIX),
167        &crate::serde_json::Value::String(crate::serde_json::Value::Array(allowed).to_string()),
168    );
169    store.set_config_tree(
170        &key(collection, HISTORY_SUFFIX),
171        &crate::serde_json::Value::String(crate::serde_json::Value::Array(history).to_string()),
172    );
173}
174
175/// Read the *latest* value stored under a `red_config` key.
176///
177/// `UnifiedStore::get_config` returns the *first* matching row, which
178/// for append-only configs means the oldest write wins. Registry
179/// mutations need the newest write, so we scan and keep the last
180/// match.
181fn read_latest_config(store: &UnifiedStore, full_key: &str) -> Option<Value> {
182    let manager = store.get_collection("red_config")?;
183    // `red_config` is append-only: every set rewrites by appending a new
184    // row. The growing-segment iterator backs entities with a HashMap so
185    // iteration order is non-deterministic — sort by the engine-assigned
186    // monotonic `EntityId` descending and take the first match to get
187    // the most recent write.
188    let mut all = manager.query_all(|_| true);
189    all.sort_by_key(|b| std::cmp::Reverse(b.id.raw()));
190    for entity in all {
191        let crate::storage::unified::EntityData::Row(row) = &entity.data else {
192            continue;
193        };
194        let Some(named) = &row.named else { continue };
195        let matches = matches!(
196            named.get("key"),
197            Some(Value::Text(s)) if s.as_ref() == full_key
198        );
199        if matches {
200            return named.get("value").cloned();
201        }
202    }
203    None
204}
205
206fn read_registry(store: &UnifiedStore, collection: &str) -> SignerRegistry {
207    let allowed_json = match read_latest_config(store, &key(collection, ALLOWED_SUFFIX)) {
208        Some(Value::Text(s)) => s.to_string(),
209        _ => "[]".to_string(),
210    };
211    let history_json = match read_latest_config(store, &key(collection, HISTORY_SUFFIX)) {
212        Some(Value::Text(s)) => s.to_string(),
213        _ => "[]".to_string(),
214    };
215    let parsed_allowed: Vec<[u8; SIGNER_PUBKEY_LEN]> =
216        match crate::utils::json::parse_json(&allowed_json) {
217            Ok(v) => match crate::serde_json::Value::from(v) {
218                crate::serde_json::Value::Array(arr) => arr
219                    .iter()
220                    .filter_map(|v| v.as_str().and_then(hex_decode_32))
221                    .collect(),
222                _ => Vec::new(),
223            },
224            Err(_) => Vec::new(),
225        };
226    let parsed_history: Vec<SignerHistoryEntry> =
227        match crate::utils::json::parse_json(&history_json) {
228            Ok(v) => match crate::serde_json::Value::from(v) {
229                crate::serde_json::Value::Array(arr) => {
230                    arr.iter().filter_map(entry_from_json).collect()
231                }
232                _ => Vec::new(),
233            },
234            Err(_) => Vec::new(),
235        };
236    SignerRegistry::from_persisted_parts(parsed_allowed, parsed_history)
237}
238
239/// Load the current registry. Cheap enough for the INSERT hot path:
240/// two `red_config` reads + two JSON parses, no scan of the parent
241/// collection.
242pub fn registry(store: &UnifiedStore, collection: &str) -> SignerRegistry {
243    read_registry(store, collection)
244}
245
246/// Apply `ALTER COLLECTION ... ADD SIGNER 'hex'` on a signed-writes
247/// collection. Returns `true` if the registry actually changed.
248pub fn add_signer(
249    store: &UnifiedStore,
250    collection: &str,
251    pubkey: [u8; SIGNER_PUBKEY_LEN],
252    actor: &str,
253) -> bool {
254    let mut reg = read_registry(store, collection);
255    let changed = reg.add_signer(pubkey, actor.to_string(), now_ms());
256    if changed {
257        write_registry(store, collection, &reg);
258    }
259    changed
260}
261
262/// Apply `ALTER COLLECTION ... REVOKE SIGNER 'hex'` on a signed-writes
263/// collection. Returns `true` if the key was previously allowed.
264pub fn revoke_signer(
265    store: &UnifiedStore,
266    collection: &str,
267    pubkey: &[u8; SIGNER_PUBKEY_LEN],
268    actor: &str,
269) -> bool {
270    let mut reg = read_registry(store, collection);
271    let changed = reg.revoke_signer(pubkey, actor.to_string(), now_ms());
272    if changed {
273        write_registry(store, collection, &reg);
274    }
275    changed
276}
277
278/// Reserved column set automatically present on every signed-writes
279/// collection. Filtered out of the canonical-payload bytes the client
280/// signs.
281pub const RESERVED_COLUMNS: &[&str] = &[RESERVED_SIGNER_PUBKEY_COL, RESERVED_SIGNATURE_COL];
282
283/// Pulled-apart signer / signature reserved columns. Carries:
284///
285/// * The user's original `Value` (for round-trip storage so SELECT and
286///   `WHERE signer_pubkey = '<hex>'` predicates compare against the
287///   same encoding the caller supplied — Text-typed hex on the JSON /
288///   SQL path, Blob on the binary protobuf path).
289/// * The decoded raw bytes used to verify the Ed25519 signature.
290pub struct SignerColumn {
291    pub raw_value: Value,
292    pub bytes: Vec<u8>,
293}
294
295/// Pull the `signer_pubkey` and `signature` values out of the row's
296/// fields. Returns the parsed reserved columns + the residual field
297/// list (fields stripped of the two reserved columns) — the residual
298/// goes into the canonical payload.
299pub fn split_signature_fields(
300    fields: Vec<(String, Value)>,
301) -> (
302    Option<SignerColumn>,
303    Option<SignerColumn>,
304    Vec<(String, Value)>,
305) {
306    let mut pubkey: Option<SignerColumn> = None;
307    let mut signature: Option<SignerColumn> = None;
308    let mut residual: Vec<(String, Value)> = Vec::with_capacity(fields.len());
309    for (k, v) in fields {
310        if k == RESERVED_SIGNER_PUBKEY_COL {
311            let bytes = match &v {
312                Value::Blob(b) => Some(b.clone()),
313                // Accept hex-encoded pubkey from JSON / SQL callers
314                // that can't easily express literal blobs.
315                Value::Text(s) => decode_hex(s.as_ref()),
316                _ => None,
317            };
318            if let Some(bytes) = bytes {
319                pubkey = Some(SignerColumn {
320                    raw_value: v,
321                    bytes,
322                });
323            }
324            continue;
325        }
326        if k == RESERVED_SIGNATURE_COL {
327            let bytes = match &v {
328                Value::Blob(b) => Some(b.clone()),
329                Value::Text(s) => decode_hex(s.as_ref()),
330                _ => None,
331            };
332            if let Some(bytes) = bytes {
333                signature = Some(SignerColumn {
334                    raw_value: v,
335                    bytes,
336                });
337            }
338            continue;
339        }
340        residual.push((k, v));
341    }
342    (pubkey, signature, residual)
343}
344
345fn decode_hex(s: &str) -> Option<Vec<u8>> {
346    if !s.len().is_multiple_of(2) {
347        return None;
348    }
349    let mut out = Vec::with_capacity(s.len() / 2);
350    for i in (0..s.len()).step_by(2) {
351        out.push(u8::from_str_radix(&s[i..i + 2], 16).ok()?);
352    }
353    Some(out)
354}
355
356/// Top-level INSERT-time check used by the engine. Computes the
357/// canonical payload from the (already reserved-column-stripped)
358/// residual fields and dispatches to [`verify_insert`].
359pub fn verify_row(
360    registry: &SignerRegistry,
361    signer_pubkey: Option<&[u8]>,
362    signature: Option<&[u8]>,
363    canonical_payload: &[u8],
364) -> Result<(), SignedWriteError> {
365    verify_insert(
366        registry,
367        &InsertSignatureFields {
368            signer_pubkey,
369            signature,
370        },
371        canonical_payload,
372    )
373}
374
375/// Map a [`SignedWriteError`] onto a [`RedDBError`] whose marker prefix
376/// is picked up by the transport-layer status mapper.
377///
378/// | Variant                  | Prefix                                     | HTTP |
379/// |--------------------------|--------------------------------------------|------|
380/// | `MissingSignatureFields` | `SignedWriteError:MissingSignatureFields:` | 400  |
381/// | `MalformedSignerPubkey`  | `SignedWriteError:MalformedSignerPubkey`   | 400  |
382/// | `MalformedSignature`     | `SignedWriteError:MalformedSignature`      | 400  |
383/// | `UnknownSigner`          | `SignedWriteError:UnknownSigner`           | 401  |
384/// | `RevokedSigner`          | `SignedWriteError:RevokedSigner`           | 401  |
385/// | `InvalidSignature`       | `SignedWriteError:InvalidSignature`        | 401  |
386pub fn map_error(err: SignedWriteError) -> crate::api::RedDBError {
387    let body = match &err {
388        SignedWriteError::MissingSignatureFields { fields } => {
389            format!(
390                "SignedWriteError:MissingSignatureFields:{}",
391                fields.join(",")
392            )
393        }
394        SignedWriteError::UnknownSigner { pubkey } => {
395            format!("SignedWriteError:UnknownSigner:{}", hex_encode(pubkey))
396        }
397        SignedWriteError::RevokedSigner { pubkey } => {
398            format!("SignedWriteError:RevokedSigner:{}", hex_encode(pubkey))
399        }
400        SignedWriteError::InvalidSignature => "SignedWriteError:InvalidSignature".to_string(),
401        SignedWriteError::MalformedSignerPubkey => {
402            "SignedWriteError:MalformedSignerPubkey".to_string()
403        }
404        SignedWriteError::MalformedSignature => "SignedWriteError:MalformedSignature".to_string(),
405    };
406    crate::api::RedDBError::InvalidOperation(body)
407}
408
409/// Length sanity: a signature blob must be exactly 64 bytes. Surfaced
410/// to the caller so it can return `MalformedSignature` before computing
411/// the canonical payload.
412pub const SIGNATURE_BYTES: usize = SIGNATURE_LEN;
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use ed25519_dalek::{Signer, SigningKey};
418
419    fn signing_key(seed: u8) -> SigningKey {
420        SigningKey::from_bytes(&[seed; 32])
421    }
422
423    fn pubkey_of(sk: &SigningKey) -> [u8; SIGNER_PUBKEY_LEN] {
424        sk.verifying_key().to_bytes()
425    }
426
427    fn make_store() -> UnifiedStore {
428        UnifiedStore::new()
429    }
430
431    #[test]
432    fn install_and_read_roundtrip_preserves_registry() {
433        let store = make_store();
434        let pk1 = pubkey_of(&signing_key(1));
435        let pk2 = pubkey_of(&signing_key(2));
436        install(&store, "sc", &[pk1, pk2], "@system/create");
437        assert!(is_signed(&store, "sc"));
438        let reg = registry(&store, "sc");
439        assert_eq!(reg.allowed_len(), 2);
440        assert!(reg.is_allowed(&pk1));
441        assert!(reg.is_allowed(&pk2));
442        assert_eq!(reg.history().len(), 2);
443    }
444
445    #[test]
446    fn add_signer_persists_and_records_history() {
447        let store = make_store();
448        let pk1 = pubkey_of(&signing_key(1));
449        install(&store, "sc", &[pk1], "@system/create");
450        let pk2 = pubkey_of(&signing_key(2));
451        assert!(add_signer(&store, "sc", pk2, "admin:alice"));
452        // Idempotent re-add returns false.
453        assert!(!add_signer(&store, "sc", pk2, "admin:alice"));
454        let reg = registry(&store, "sc");
455        assert!(reg.is_allowed(&pk2));
456        assert_eq!(reg.history().len(), 2);
457        let last = reg.history().last().unwrap();
458        assert_eq!(last.action, SignerHistoryAction::Add);
459        assert_eq!(last.actor, "admin:alice");
460    }
461
462    #[test]
463    fn revoke_signer_blocks_future_inserts_but_history_preserved() {
464        let store = make_store();
465        let sk = signing_key(7);
466        let pk = pubkey_of(&sk);
467        install(&store, "sc", &[pk], "@system/create");
468        assert!(revoke_signer(&store, "sc", &pk, "admin:bob"));
469        let reg = registry(&store, "sc");
470        assert!(!reg.is_allowed(&pk));
471        assert!(reg.ever_added(&pk));
472        let last = reg.history().last().unwrap();
473        assert_eq!(last.action, SignerHistoryAction::Revoke);
474        assert_eq!(last.actor, "admin:bob");
475    }
476
477    #[test]
478    fn split_signature_fields_extracts_blob_columns() {
479        let fields = vec![
480            ("name".to_string(), Value::text("alice".to_string())),
481            (
482                RESERVED_SIGNER_PUBKEY_COL.to_string(),
483                Value::Blob(vec![0x11; 32]),
484            ),
485            (
486                RESERVED_SIGNATURE_COL.to_string(),
487                Value::Blob(vec![0x22; 64]),
488            ),
489        ];
490        let (pk, sig, residual) = split_signature_fields(fields);
491        assert_eq!(pk.as_ref().unwrap().bytes.len(), 32);
492        assert!(matches!(pk.unwrap().raw_value, Value::Blob(_)));
493        assert_eq!(sig.as_ref().unwrap().bytes.len(), 64);
494        assert!(matches!(sig.unwrap().raw_value, Value::Blob(_)));
495        assert_eq!(residual.len(), 1);
496        assert_eq!(residual[0].0, "name");
497    }
498
499    #[test]
500    fn split_signature_fields_accepts_hex_text() {
501        let pk_hex = "11".repeat(32);
502        let sig_hex = "22".repeat(64);
503        let fields = vec![
504            (RESERVED_SIGNER_PUBKEY_COL.to_string(), Value::text(pk_hex)),
505            (RESERVED_SIGNATURE_COL.to_string(), Value::text(sig_hex)),
506        ];
507        let (pk, sig, residual) = split_signature_fields(fields);
508        assert_eq!(pk.as_ref().unwrap().bytes, vec![0x11; 32]);
509        assert!(matches!(pk.unwrap().raw_value, Value::Text(_)));
510        assert_eq!(sig.as_ref().unwrap().bytes, vec![0x22; 64]);
511        assert!(matches!(sig.unwrap().raw_value, Value::Text(_)));
512        assert!(residual.is_empty());
513    }
514
515    #[test]
516    fn map_error_carries_variant_prefix() {
517        let pk = [0u8; SIGNER_PUBKEY_LEN];
518        match map_error(SignedWriteError::UnknownSigner { pubkey: pk }) {
519            crate::api::RedDBError::InvalidOperation(s) => {
520                assert!(s.starts_with("SignedWriteError:UnknownSigner"));
521            }
522            other => panic!("unexpected mapping: {other:?}"),
523        }
524        match map_error(SignedWriteError::InvalidSignature) {
525            crate::api::RedDBError::InvalidOperation(s) => {
526                assert_eq!(s, "SignedWriteError:InvalidSignature");
527            }
528            other => panic!("unexpected mapping: {other:?}"),
529        }
530    }
531
532    #[test]
533    fn verify_row_accepts_valid_signature_over_canonical_payload() {
534        let sk = signing_key(3);
535        let pk = pubkey_of(&sk);
536        let store = make_store();
537        install(&store, "sc", &[pk], "@system/create");
538        let payload = b"hello-world";
539        let sig = sk.sign(payload).to_bytes();
540        let reg = registry(&store, "sc");
541        verify_row(&reg, Some(&pk), Some(&sig), payload).unwrap();
542    }
543
544    #[test]
545    fn verify_row_rejects_tampered_payload() {
546        let sk = signing_key(4);
547        let pk = pubkey_of(&sk);
548        let store = make_store();
549        install(&store, "sc", &[pk], "@system/create");
550        let payload = b"hello-world";
551        let sig = sk.sign(payload).to_bytes();
552        let reg = registry(&store, "sc");
553        let err = verify_row(&reg, Some(&pk), Some(&sig), b"tampered").unwrap_err();
554        assert_eq!(err, SignedWriteError::InvalidSignature);
555    }
556}