Skip to main content

reddb_server/runtime/
control_events.rs

1//! Control Event Ledger — skeleton (issue #652).
2//!
3//! Cross-cutting types + the [`ControlEventLedger`] trait that the
4//! policy / config / user-lifecycle producer slices (issues 665/666/
5//! 667) will call into. Ships ONE implementor — [`RuntimeLedger`] —
6//! which writes one row per `emit()` to the `red.control_events`
7//! collection via the unified entity API.
8//!
9//! This module deliberately does NOT wire `emit()` into any producer
10//! call site (`AuthStore::*`, `ConfigRegistry::*`, etc.); that is the
11//! scope of 652b/c/d. It also does not decide what counts as
12//! sensitive — producers call [`Sensitivity::hashed`] /
13//! [`Sensitivity::redacted`] at their own emit sites.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use crate::auth::UserId;
20use crate::crypto::uuid::Uuid;
21use crate::storage::schema::types::Value;
22use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
23use crate::storage::UnifiedStore;
24use crate::utils::now_unix_millis;
25
26/// Canonical name of the managed control-event collection.
27pub const CONTROL_EVENTS_COLLECTION: &str = "red.control_events";
28
29// ---------------------------------------------------------------------------
30// EventKind
31// ---------------------------------------------------------------------------
32
33/// Strong enum of every control-plane mutation the ledger records.
34/// Mirrors the `kind` column in `red.control_events`.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36pub enum EventKind {
37    PolicyCreate,
38    PolicyUpdate,
39    PolicyDelete,
40    PolicyAttach,
41    PolicyDetach,
42    ConfigWrite,
43    ConfigDelete,
44    UserCreate,
45    UserUpdate,
46    UserDelete,
47    UserDisable,
48    ApiKeyCreate,
49    ApiKeyRevoke,
50    VaultMetadataRead,
51    VaultRead,
52    VaultUnseal,
53    VaultRotate,
54    VaultPurge,
55    SchemaDdl,
56    TenantGovernance,
57    RlsGovernance,
58    BackupRun,
59    RestoreRun,
60    FailoverPromotion,
61    ReplicationSafety,
62    EvidenceExport,
63    /// Emitted at startup when the `REDDB_POLICY_BREAK_GLASS` env var
64    /// triggers the [`crate::auth::self_lock_guard`] recovery path —
65    /// see issue #713.
66    PolicyBreakGlass,
67}
68
69impl EventKind {
70    pub fn as_str(self) -> &'static str {
71        match self {
72            Self::PolicyCreate => "policy.create",
73            Self::PolicyUpdate => "policy.update",
74            Self::PolicyDelete => "policy.delete",
75            Self::PolicyAttach => "policy.attach",
76            Self::PolicyDetach => "policy.detach",
77            Self::ConfigWrite => "config.write",
78            Self::ConfigDelete => "config.delete",
79            Self::UserCreate => "user.create",
80            Self::UserUpdate => "user.update",
81            Self::UserDelete => "user.delete",
82            Self::UserDisable => "user.disable",
83            Self::ApiKeyCreate => "apikey.create",
84            Self::ApiKeyRevoke => "apikey.revoke",
85            Self::VaultMetadataRead => "vault.metadata_read",
86            Self::VaultRead => "vault.read",
87            Self::VaultUnseal => "vault.unseal",
88            Self::VaultRotate => "vault.rotate",
89            Self::VaultPurge => "vault.purge",
90            Self::SchemaDdl => "schema.ddl",
91            Self::TenantGovernance => "tenant.governance",
92            Self::RlsGovernance => "rls.governance",
93            Self::BackupRun => "backup.run",
94            Self::RestoreRun => "restore.run",
95            Self::FailoverPromotion => "failover.promotion",
96            Self::ReplicationSafety => "replication.safety",
97            Self::EvidenceExport => "evidence.export",
98            Self::PolicyBreakGlass => "policy.break_glass",
99        }
100    }
101}
102
103// ---------------------------------------------------------------------------
104// Outcome
105// ---------------------------------------------------------------------------
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
108pub enum Outcome {
109    Allowed,
110    Denied,
111    Error,
112}
113
114impl Outcome {
115    pub fn as_str(self) -> &'static str {
116        match self {
117            Self::Allowed => "allowed",
118            Self::Denied => "denied",
119            Self::Error => "error",
120        }
121    }
122}
123
124// ---------------------------------------------------------------------------
125// ActorRef / Context
126// ---------------------------------------------------------------------------
127
128/// Who attempted the mutation. Borrowed so producer call-sites don't
129/// allocate at every emit; the ledger copies into the persisted row.
130#[derive(Debug)]
131pub enum ActorRef<'a> {
132    User(&'a UserId),
133    /// A static system label (e.g. `"bootstrap"`, `"wal_replay"`).
134    /// Static-lifetime to keep the enum cheap and to disambiguate from
135    /// user identifiers — never a runtime tenant/user string.
136    System(&'static str),
137    Anonymous,
138}
139
140impl<'a> ActorRef<'a> {
141    pub fn user_id(&self) -> Option<&UserId> {
142        match self {
143            Self::User(u) => Some(u),
144            _ => None,
145        }
146    }
147
148    pub fn kind_str(&self) -> &'static str {
149        match self {
150            Self::User(_) => "user",
151            Self::System(_) => "system",
152            Self::Anonymous => "anonymous",
153        }
154    }
155}
156
157/// Request-scoped context attached to every emit. Producer call-sites
158/// fill what they have; missing fields land as `Null` in the row.
159pub struct ControlEventCtx<'a> {
160    pub actor: ActorRef<'a>,
161    pub scope: Option<Cow<'a, str>>,
162    pub request_id: Option<Cow<'a, str>>,
163    pub trace_id: Option<Cow<'a, str>>,
164}
165
166// ---------------------------------------------------------------------------
167// ControlEvent
168// ---------------------------------------------------------------------------
169
170pub struct ControlEvent {
171    pub kind: EventKind,
172    pub outcome: Outcome,
173    pub action: Cow<'static, str>,
174    pub resource: Option<String>,
175    pub reason: Option<String>,
176    pub matched_policy_id: Option<String>,
177    pub fields: HashMap<String, Sensitivity>,
178}
179
180// ---------------------------------------------------------------------------
181// Sensitivity
182// ---------------------------------------------------------------------------
183
184/// How a payload value is rendered when it lands in `fields_json`.
185/// Producer slices choose per-field; the skeleton does not decide what
186/// counts as sensitive.
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub enum Sensitivity {
189    /// Value persisted as-is. Producer guarantees it isn't sensitive.
190    Raw(String),
191    /// Fingerprint instead of the value. `algo` is the hash name
192    /// (always `"blake3"` for the skeleton helper) and `hex` is the
193    /// lowercase hex digest.
194    Hashed { algo: &'static str, hex: String },
195    /// Placeholder: "a value existed at this field but we are not
196    /// logging it." Distinguishable from absence (no key at all).
197    Redacted,
198}
199
200impl Sensitivity {
201    pub fn raw<S: Into<String>>(s: S) -> Self {
202        Self::Raw(s.into())
203    }
204
205    pub fn hashed(value: &[u8]) -> Self {
206        let hex = blake3::hash(value).to_hex().to_string();
207        Self::Hashed {
208            algo: "blake3",
209            hex,
210        }
211    }
212
213    pub fn redacted() -> Self {
214        Self::Redacted
215    }
216}
217
218// ---------------------------------------------------------------------------
219// Ledger trait
220// ---------------------------------------------------------------------------
221
222/// Opaque id of a persisted event. Producers may store this to chain a
223/// follow-up audit entry to the original.
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct EventId(pub String);
226
227#[derive(Debug)]
228pub enum ControlEventError {
229    Persistence(String),
230}
231
232impl std::fmt::Display for ControlEventError {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        match self {
235            Self::Persistence(msg) => write!(f, "control-event persistence failed: {msg}"),
236        }
237    }
238}
239
240impl std::error::Error for ControlEventError {}
241
242/// Persistence sink for control events.
243///
244/// `emit` is synchronous. Callers running under
245/// [`ControlEventConfig::compliance_mode`] MUST treat `Err` as
246/// "abort the originating mutation" (fail closed). Callers in the
247/// default mode may log-and-continue, but the trait never swallows
248/// the failure on their behalf — that policy is per-caller.
249pub trait ControlEventLedger: Send + Sync {
250    fn emit(
251        &self,
252        ctx: &ControlEventCtx<'_>,
253        event: ControlEvent,
254    ) -> Result<EventId, ControlEventError>;
255}
256
257// ---------------------------------------------------------------------------
258// ControlEventConfig
259// ---------------------------------------------------------------------------
260
261/// Runtime knob for the ledger. Lives on
262/// `RedDBOptions::control_events` and is read at boot from
263/// `REDDB_COMPLIANCE_MODE`.
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
265pub struct ControlEventConfig {
266    /// When true, the producer slices MUST abort their originating
267    /// mutation on `emit` failure (fail closed). Default: false.
268    pub compliance_mode: bool,
269}
270
271impl ControlEventConfig {
272    /// Convenience: do callers need durable evidence before letting
273    /// the originating mutation complete?
274    pub fn require_persistence(&self) -> bool {
275        self.compliance_mode
276    }
277}
278
279// ---------------------------------------------------------------------------
280// RuntimeLedger — the skeleton's single implementor
281// ---------------------------------------------------------------------------
282
283/// Writes one row per `emit()` to `red.control_events` via the
284/// unified entity API. The collection is created on construction if
285/// it doesn't already exist (idempotent across re-opens).
286pub struct RuntimeLedger {
287    store: Arc<UnifiedStore>,
288}
289
290impl RuntimeLedger {
291    pub fn new(store: Arc<UnifiedStore>) -> Self {
292        let _ = store.get_or_create_collection(CONTROL_EVENTS_COLLECTION);
293        Self { store }
294    }
295}
296
297impl ControlEventLedger for RuntimeLedger {
298    fn emit(
299        &self,
300        ctx: &ControlEventCtx<'_>,
301        event: ControlEvent,
302    ) -> Result<EventId, ControlEventError> {
303        let ts_ms = now_unix_millis();
304        // ns since epoch: ms * 1e6, clamped to i64 range. The actual
305        // call site doesn't have nanosecond precision today; producer
306        // slices that need it can carry it via `fields`.
307        let ts_ns = (ts_ms as i128)
308            .saturating_mul(1_000_000)
309            .min(i64::MAX as i128) as i64;
310        // UUIDv7 is time-sortable in its first 48 bits, satisfying the
311        // brief's "ULID, sortable by time" requirement without adding
312        // a ulid dep.
313        let id = Uuid::new_v7().to_string();
314
315        let mut named: HashMap<String, Value> = HashMap::with_capacity(14);
316        named.insert("id".into(), Value::text(id.clone()));
317        named.insert("ts".into(), Value::Integer(ts_ns));
318        named.insert("kind".into(), Value::text(event.kind.as_str()));
319        named.insert("outcome".into(), Value::text(event.outcome.as_str()));
320        named.insert("actor_kind".into(), Value::text(ctx.actor.kind_str()));
321        named.insert(
322            "actor_user_id".into(),
323            ctx.actor
324                .user_id()
325                .map(|u| Value::text(u.to_string()))
326                .unwrap_or(Value::Null),
327        );
328        named.insert(
329            "scope".into(),
330            ctx.scope
331                .as_ref()
332                .map(|s| Value::text(s.to_string()))
333                .unwrap_or(Value::Null),
334        );
335        named.insert("action".into(), Value::text(event.action.to_string()));
336        named.insert(
337            "resource".into(),
338            event.resource.map(Value::text).unwrap_or(Value::Null),
339        );
340        named.insert(
341            "reason".into(),
342            event.reason.map(Value::text).unwrap_or(Value::Null),
343        );
344        named.insert(
345            "matched_policy_id".into(),
346            event
347                .matched_policy_id
348                .map(Value::text)
349                .unwrap_or(Value::Null),
350        );
351        named.insert(
352            "request_id".into(),
353            ctx.request_id
354                .as_ref()
355                .map(|s| Value::text(s.to_string()))
356                .unwrap_or(Value::Null),
357        );
358        named.insert(
359            "trace_id".into(),
360            ctx.trace_id
361                .as_ref()
362                .map(|s| Value::text(s.to_string()))
363                .unwrap_or(Value::Null),
364        );
365        named.insert(
366            "fields_json".into(),
367            Value::text(serialise_fields(&event.fields)),
368        );
369
370        let entity = UnifiedEntity::new(
371            EntityId::new(0),
372            EntityKind::TableRow {
373                table: Arc::from(CONTROL_EVENTS_COLLECTION),
374                row_id: 0,
375            },
376            EntityData::Row(RowData {
377                columns: Vec::new(),
378                named: Some(named),
379                schema: None,
380            }),
381        );
382        self.store
383            .insert_auto(CONTROL_EVENTS_COLLECTION, entity)
384            .map_err(|e| ControlEventError::Persistence(e.to_string()))?;
385        Ok(EventId(id))
386    }
387}
388
389/// Deterministic JSON serialiser for the `fields` map. Keys are sorted
390/// so two events with the same logical payload hash-compare equal at
391/// the row level (helpful for tests and dedup). Hand-rolled to avoid
392/// pulling serde into the ledger hot path.
393fn serialise_fields(fields: &HashMap<String, Sensitivity>) -> String {
394    let mut keys: Vec<&String> = fields.keys().collect();
395    keys.sort();
396    let mut out = String::from("{");
397    for (i, k) in keys.iter().enumerate() {
398        if i > 0 {
399            out.push(',');
400        }
401        out.push('"');
402        json_escape_into(k, &mut out);
403        out.push_str("\":");
404        match &fields[*k] {
405            Sensitivity::Raw(s) => {
406                out.push_str(r#"{"kind":"raw","value":""#);
407                json_escape_into(s, &mut out);
408                out.push_str(r#""}"#);
409            }
410            Sensitivity::Hashed { algo, hex } => {
411                out.push_str(r#"{"kind":"hashed","algo":""#);
412                out.push_str(algo);
413                out.push_str(r#"","hex":""#);
414                out.push_str(hex);
415                out.push_str(r#""}"#);
416            }
417            Sensitivity::Redacted => {
418                out.push_str(r#"{"kind":"redacted"}"#);
419            }
420        }
421    }
422    out.push('}');
423    out
424}
425
426fn json_escape_into(s: &str, out: &mut String) {
427    for c in s.chars() {
428        match c {
429            '"' => out.push_str("\\\""),
430            '\\' => out.push_str("\\\\"),
431            '\n' => out.push_str("\\n"),
432            '\r' => out.push_str("\\r"),
433            '\t' => out.push_str("\\t"),
434            c if (c as u32) < 0x20 => {
435                out.push_str(&format!("\\u{:04x}", c as u32));
436            }
437            c => out.push(c),
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    fn anon_ctx<'a>() -> ControlEventCtx<'a> {
447        ControlEventCtx {
448            actor: ActorRef::Anonymous,
449            scope: None,
450            request_id: None,
451            trace_id: None,
452        }
453    }
454
455    fn sample_event() -> ControlEvent {
456        ControlEvent {
457            kind: EventKind::PolicyCreate,
458            outcome: Outcome::Allowed,
459            action: Cow::Borrowed("policy.write"),
460            resource: Some("policy:test".into()),
461            reason: None,
462            matched_policy_id: Some("p-abc".into()),
463            fields: HashMap::new(),
464        }
465    }
466
467    #[test]
468    fn collection_is_created_on_first_open_and_reopen_is_idempotent() {
469        let store = Arc::new(UnifiedStore::new());
470        let _l1 = RuntimeLedger::new(store.clone());
471        assert!(store.get_collection(CONTROL_EVENTS_COLLECTION).is_some());
472        // A second ledger over the same store must not explode —
473        // `create_collection` would error on duplicate, but
474        // `RuntimeLedger::new` goes through `get_or_create_collection`.
475        let _l2 = RuntimeLedger::new(store.clone());
476        assert!(store.get_collection(CONTROL_EVENTS_COLLECTION).is_some());
477    }
478
479    #[test]
480    fn runtime_ledger_emit_persists_row_with_every_schema_column() {
481        let store = Arc::new(UnifiedStore::new());
482        let ledger = RuntimeLedger::new(store.clone());
483        let id = ledger.emit(&anon_ctx(), sample_event()).expect("emit ok");
484        assert!(!id.0.is_empty(), "row id must be populated");
485
486        let manager = store
487            .get_collection(CONTROL_EVENTS_COLLECTION)
488            .expect("table exists");
489        let rows = manager.query_all(|_| true);
490        assert_eq!(rows.len(), 1);
491
492        match &rows[0].data {
493            EntityData::Row(row) => {
494                let named = row.named.as_ref().expect("named columns present");
495                for col in [
496                    "id",
497                    "ts",
498                    "kind",
499                    "outcome",
500                    "actor_kind",
501                    "actor_user_id",
502                    "scope",
503                    "action",
504                    "resource",
505                    "reason",
506                    "matched_policy_id",
507                    "request_id",
508                    "trace_id",
509                    "fields_json",
510                ] {
511                    assert!(named.contains_key(col), "missing schema column {col}");
512                }
513                assert_eq!(named["kind"], Value::text("policy.create"));
514                assert_eq!(named["outcome"], Value::text("allowed"));
515                assert_eq!(named["actor_kind"], Value::text("anonymous"));
516                assert_eq!(named["actor_user_id"], Value::Null);
517                assert_eq!(named["scope"], Value::Null);
518                assert_eq!(named["resource"], Value::text("policy:test"));
519                assert_eq!(named["matched_policy_id"], Value::text("p-abc"));
520            }
521            other => panic!("expected Row, got {other:?}"),
522        }
523    }
524
525    #[test]
526    fn emit_with_user_actor_records_kind_and_label() {
527        let store = Arc::new(UnifiedStore::new());
528        let ledger = RuntimeLedger::new(store.clone());
529        let user = UserId::scoped("acme", "alice");
530        let ctx = ControlEventCtx {
531            actor: ActorRef::User(&user),
532            scope: Some(Cow::Borrowed("acme")),
533            request_id: Some(Cow::Borrowed("req-42")),
534            trace_id: None,
535        };
536        ledger.emit(&ctx, sample_event()).unwrap();
537        let manager = store.get_collection(CONTROL_EVENTS_COLLECTION).unwrap();
538        let rows = manager.query_all(|_| true);
539        let named = match &rows[0].data {
540            EntityData::Row(r) => r.named.as_ref().unwrap(),
541            _ => panic!(),
542        };
543        assert_eq!(named["actor_kind"], Value::text("user"));
544        assert_eq!(named["actor_user_id"], Value::text("acme/alice"));
545        assert_eq!(named["scope"], Value::text("acme"));
546        assert_eq!(named["request_id"], Value::text("req-42"));
547        assert_eq!(named["trace_id"], Value::Null);
548    }
549
550    #[test]
551    fn sensitivity_hashed_is_stable_blake3() {
552        let a = Sensitivity::hashed(b"hunter2");
553        let b = Sensitivity::hashed(b"hunter2");
554        assert_eq!(a, b);
555        match a {
556            Sensitivity::Hashed { algo, hex } => {
557                assert_eq!(algo, "blake3");
558                assert_eq!(hex.len(), 64);
559                // Pin the digest so producer-slice tests (652b/c/d)
560                // can rely on it across runs.
561                assert_eq!(hex, blake3::hash(b"hunter2").to_hex().to_string(),);
562            }
563            other => panic!("expected Hashed, got {other:?}"),
564        }
565    }
566
567    #[test]
568    fn sensitivity_redacted_serialises_without_value() {
569        let mut fields = HashMap::new();
570        fields.insert("password".to_string(), Sensitivity::redacted());
571        let s = serialise_fields(&fields);
572        assert_eq!(s, r#"{"password":{"kind":"redacted"}}"#);
573    }
574
575    // ---- ComplianceMode caller-decision shape ---------------------------
576    //
577    // The trait never swallows persistence failures; the per-caller
578    // policy (`abort the originating mutation` vs `log and continue`)
579    // is encoded by the caller against [`ControlEventConfig::require_persistence`].
580    // The tests below pin that contract via a deliberately failing fake.
581
582    struct FailingLedger;
583    impl ControlEventLedger for FailingLedger {
584        fn emit(
585            &self,
586            _: &ControlEventCtx<'_>,
587            _: ControlEvent,
588        ) -> Result<EventId, ControlEventError> {
589            Err(ControlEventError::Persistence("simulated".into()))
590        }
591    }
592
593    fn caller_decides(
594        cfg: ControlEventConfig,
595        ledger: &dyn ControlEventLedger,
596    ) -> Result<(), &'static str> {
597        match ledger.emit(&anon_ctx(), sample_event()) {
598            Ok(_) => Ok(()),
599            Err(_) if cfg.require_persistence() => Err("aborted"),
600            Err(_) => Ok(()),
601        }
602    }
603
604    #[test]
605    fn compliance_mode_makes_callers_fail_closed_on_persistence_failure() {
606        let cfg = ControlEventConfig {
607            compliance_mode: true,
608        };
609        assert!(cfg.require_persistence());
610        assert_eq!(caller_decides(cfg, &FailingLedger), Err("aborted"));
611    }
612
613    #[test]
614    fn default_mode_lets_callers_continue_on_persistence_failure() {
615        let cfg = ControlEventConfig::default();
616        assert!(!cfg.require_persistence());
617        // The trait still surfaces the error...
618        assert!(FailingLedger.emit(&anon_ctx(), sample_event()).is_err());
619        // ...but the caller is free to swallow it.
620        assert_eq!(caller_decides(cfg, &FailingLedger), Ok(()));
621    }
622}