1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use crate::auth::UserId;
20use crate::crypto::uuid::Uuid;
21use crate::storage::schema::types::Value;
22use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
23use crate::storage::UnifiedStore;
24use crate::utils::now_unix_millis;
25
26pub const CONTROL_EVENTS_COLLECTION: &str = "red.control_events";
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36pub enum EventKind {
37 PolicyCreate,
38 PolicyUpdate,
39 PolicyDelete,
40 PolicyAttach,
41 PolicyDetach,
42 ConfigWrite,
43 ConfigDelete,
44 UserCreate,
45 UserUpdate,
46 UserDelete,
47 UserDisable,
48 ApiKeyCreate,
49 ApiKeyRevoke,
50 VaultMetadataRead,
51 VaultRead,
52 VaultUnseal,
53 VaultRotate,
54 VaultPurge,
55 SchemaDdl,
56 TenantGovernance,
57 RlsGovernance,
58 BackupRun,
59 RestoreRun,
60 FailoverPromotion,
61 ReplicationSafety,
62 EvidenceExport,
63 PolicyBreakGlass,
67}
68
69impl EventKind {
70 pub fn as_str(self) -> &'static str {
71 match self {
72 Self::PolicyCreate => "policy.create",
73 Self::PolicyUpdate => "policy.update",
74 Self::PolicyDelete => "policy.delete",
75 Self::PolicyAttach => "policy.attach",
76 Self::PolicyDetach => "policy.detach",
77 Self::ConfigWrite => "config.write",
78 Self::ConfigDelete => "config.delete",
79 Self::UserCreate => "user.create",
80 Self::UserUpdate => "user.update",
81 Self::UserDelete => "user.delete",
82 Self::UserDisable => "user.disable",
83 Self::ApiKeyCreate => "apikey.create",
84 Self::ApiKeyRevoke => "apikey.revoke",
85 Self::VaultMetadataRead => "vault.metadata_read",
86 Self::VaultRead => "vault.read",
87 Self::VaultUnseal => "vault.unseal",
88 Self::VaultRotate => "vault.rotate",
89 Self::VaultPurge => "vault.purge",
90 Self::SchemaDdl => "schema.ddl",
91 Self::TenantGovernance => "tenant.governance",
92 Self::RlsGovernance => "rls.governance",
93 Self::BackupRun => "backup.run",
94 Self::RestoreRun => "restore.run",
95 Self::FailoverPromotion => "failover.promotion",
96 Self::ReplicationSafety => "replication.safety",
97 Self::EvidenceExport => "evidence.export",
98 Self::PolicyBreakGlass => "policy.break_glass",
99 }
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
108pub enum Outcome {
109 Allowed,
110 Denied,
111 Error,
112}
113
114impl Outcome {
115 pub fn as_str(self) -> &'static str {
116 match self {
117 Self::Allowed => "allowed",
118 Self::Denied => "denied",
119 Self::Error => "error",
120 }
121 }
122}
123
124#[derive(Debug)]
131pub enum ActorRef<'a> {
132 User(&'a UserId),
133 System(&'static str),
137 Anonymous,
138}
139
140impl<'a> ActorRef<'a> {
141 pub fn user_id(&self) -> Option<&UserId> {
142 match self {
143 Self::User(u) => Some(u),
144 _ => None,
145 }
146 }
147
148 pub fn kind_str(&self) -> &'static str {
149 match self {
150 Self::User(_) => "user",
151 Self::System(_) => "system",
152 Self::Anonymous => "anonymous",
153 }
154 }
155}
156
157pub struct ControlEventCtx<'a> {
160 pub actor: ActorRef<'a>,
161 pub scope: Option<Cow<'a, str>>,
162 pub request_id: Option<Cow<'a, str>>,
163 pub trace_id: Option<Cow<'a, str>>,
164}
165
166pub struct ControlEvent {
171 pub kind: EventKind,
172 pub outcome: Outcome,
173 pub action: Cow<'static, str>,
174 pub resource: Option<String>,
175 pub reason: Option<String>,
176 pub matched_policy_id: Option<String>,
177 pub fields: HashMap<String, Sensitivity>,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
188pub enum Sensitivity {
189 Raw(String),
191 Hashed { algo: &'static str, hex: String },
195 Redacted,
198}
199
200impl Sensitivity {
201 pub fn raw<S: Into<String>>(s: S) -> Self {
202 Self::Raw(s.into())
203 }
204
205 pub fn hashed(value: &[u8]) -> Self {
206 let hex = blake3::hash(value).to_hex().to_string();
207 Self::Hashed {
208 algo: "blake3",
209 hex,
210 }
211 }
212
213 pub fn redacted() -> Self {
214 Self::Redacted
215 }
216}
217
218#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct EventId(pub String);
226
227#[derive(Debug)]
228pub enum ControlEventError {
229 Persistence(String),
230}
231
232impl std::fmt::Display for ControlEventError {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 match self {
235 Self::Persistence(msg) => write!(f, "control-event persistence failed: {msg}"),
236 }
237 }
238}
239
240impl std::error::Error for ControlEventError {}
241
242pub trait ControlEventLedger: Send + Sync {
250 fn emit(
251 &self,
252 ctx: &ControlEventCtx<'_>,
253 event: ControlEvent,
254 ) -> Result<EventId, ControlEventError>;
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
265pub struct ControlEventConfig {
266 pub compliance_mode: bool,
269}
270
271impl ControlEventConfig {
272 pub fn require_persistence(&self) -> bool {
275 self.compliance_mode
276 }
277}
278
279pub struct RuntimeLedger {
287 store: Arc<UnifiedStore>,
288}
289
290impl RuntimeLedger {
291 pub fn new(store: Arc<UnifiedStore>) -> Self {
292 let _ = store.get_or_create_collection(CONTROL_EVENTS_COLLECTION);
293 Self { store }
294 }
295}
296
297impl ControlEventLedger for RuntimeLedger {
298 fn emit(
299 &self,
300 ctx: &ControlEventCtx<'_>,
301 event: ControlEvent,
302 ) -> Result<EventId, ControlEventError> {
303 let ts_ms = now_unix_millis();
304 let ts_ns = (ts_ms as i128)
308 .saturating_mul(1_000_000)
309 .min(i64::MAX as i128) as i64;
310 let id = Uuid::new_v7().to_string();
314
315 let mut named: HashMap<String, Value> = HashMap::with_capacity(14);
316 named.insert("id".into(), Value::text(id.clone()));
317 named.insert("ts".into(), Value::Integer(ts_ns));
318 named.insert("kind".into(), Value::text(event.kind.as_str()));
319 named.insert("outcome".into(), Value::text(event.outcome.as_str()));
320 named.insert("actor_kind".into(), Value::text(ctx.actor.kind_str()));
321 named.insert(
322 "actor_user_id".into(),
323 ctx.actor
324 .user_id()
325 .map(|u| Value::text(u.to_string()))
326 .unwrap_or(Value::Null),
327 );
328 named.insert(
329 "scope".into(),
330 ctx.scope
331 .as_ref()
332 .map(|s| Value::text(s.to_string()))
333 .unwrap_or(Value::Null),
334 );
335 named.insert("action".into(), Value::text(event.action.to_string()));
336 named.insert(
337 "resource".into(),
338 event.resource.map(Value::text).unwrap_or(Value::Null),
339 );
340 named.insert(
341 "reason".into(),
342 event.reason.map(Value::text).unwrap_or(Value::Null),
343 );
344 named.insert(
345 "matched_policy_id".into(),
346 event
347 .matched_policy_id
348 .map(Value::text)
349 .unwrap_or(Value::Null),
350 );
351 named.insert(
352 "request_id".into(),
353 ctx.request_id
354 .as_ref()
355 .map(|s| Value::text(s.to_string()))
356 .unwrap_or(Value::Null),
357 );
358 named.insert(
359 "trace_id".into(),
360 ctx.trace_id
361 .as_ref()
362 .map(|s| Value::text(s.to_string()))
363 .unwrap_or(Value::Null),
364 );
365 named.insert(
366 "fields_json".into(),
367 Value::text(serialise_fields(&event.fields)),
368 );
369
370 let entity = UnifiedEntity::new(
371 EntityId::new(0),
372 EntityKind::TableRow {
373 table: Arc::from(CONTROL_EVENTS_COLLECTION),
374 row_id: 0,
375 },
376 EntityData::Row(RowData {
377 columns: Vec::new(),
378 named: Some(named),
379 schema: None,
380 }),
381 );
382 self.store
383 .insert_auto(CONTROL_EVENTS_COLLECTION, entity)
384 .map_err(|e| ControlEventError::Persistence(e.to_string()))?;
385 Ok(EventId(id))
386 }
387}
388
389fn serialise_fields(fields: &HashMap<String, Sensitivity>) -> String {
394 let mut keys: Vec<&String> = fields.keys().collect();
395 keys.sort();
396 let mut out = String::from("{");
397 for (i, k) in keys.iter().enumerate() {
398 if i > 0 {
399 out.push(',');
400 }
401 out.push('"');
402 json_escape_into(k, &mut out);
403 out.push_str("\":");
404 match &fields[*k] {
405 Sensitivity::Raw(s) => {
406 out.push_str(r#"{"kind":"raw","value":""#);
407 json_escape_into(s, &mut out);
408 out.push_str(r#""}"#);
409 }
410 Sensitivity::Hashed { algo, hex } => {
411 out.push_str(r#"{"kind":"hashed","algo":""#);
412 out.push_str(algo);
413 out.push_str(r#"","hex":""#);
414 out.push_str(hex);
415 out.push_str(r#""}"#);
416 }
417 Sensitivity::Redacted => {
418 out.push_str(r#"{"kind":"redacted"}"#);
419 }
420 }
421 }
422 out.push('}');
423 out
424}
425
426fn json_escape_into(s: &str, out: &mut String) {
427 for c in s.chars() {
428 match c {
429 '"' => out.push_str("\\\""),
430 '\\' => out.push_str("\\\\"),
431 '\n' => out.push_str("\\n"),
432 '\r' => out.push_str("\\r"),
433 '\t' => out.push_str("\\t"),
434 c if (c as u32) < 0x20 => {
435 out.push_str(&format!("\\u{:04x}", c as u32));
436 }
437 c => out.push(c),
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 fn anon_ctx<'a>() -> ControlEventCtx<'a> {
447 ControlEventCtx {
448 actor: ActorRef::Anonymous,
449 scope: None,
450 request_id: None,
451 trace_id: None,
452 }
453 }
454
455 fn sample_event() -> ControlEvent {
456 ControlEvent {
457 kind: EventKind::PolicyCreate,
458 outcome: Outcome::Allowed,
459 action: Cow::Borrowed("policy.write"),
460 resource: Some("policy:test".into()),
461 reason: None,
462 matched_policy_id: Some("p-abc".into()),
463 fields: HashMap::new(),
464 }
465 }
466
467 #[test]
468 fn collection_is_created_on_first_open_and_reopen_is_idempotent() {
469 let store = Arc::new(UnifiedStore::new());
470 let _l1 = RuntimeLedger::new(store.clone());
471 assert!(store.get_collection(CONTROL_EVENTS_COLLECTION).is_some());
472 let _l2 = RuntimeLedger::new(store.clone());
476 assert!(store.get_collection(CONTROL_EVENTS_COLLECTION).is_some());
477 }
478
479 #[test]
480 fn runtime_ledger_emit_persists_row_with_every_schema_column() {
481 let store = Arc::new(UnifiedStore::new());
482 let ledger = RuntimeLedger::new(store.clone());
483 let id = ledger.emit(&anon_ctx(), sample_event()).expect("emit ok");
484 assert!(!id.0.is_empty(), "row id must be populated");
485
486 let manager = store
487 .get_collection(CONTROL_EVENTS_COLLECTION)
488 .expect("table exists");
489 let rows = manager.query_all(|_| true);
490 assert_eq!(rows.len(), 1);
491
492 match &rows[0].data {
493 EntityData::Row(row) => {
494 let named = row.named.as_ref().expect("named columns present");
495 for col in [
496 "id",
497 "ts",
498 "kind",
499 "outcome",
500 "actor_kind",
501 "actor_user_id",
502 "scope",
503 "action",
504 "resource",
505 "reason",
506 "matched_policy_id",
507 "request_id",
508 "trace_id",
509 "fields_json",
510 ] {
511 assert!(named.contains_key(col), "missing schema column {col}");
512 }
513 assert_eq!(named["kind"], Value::text("policy.create"));
514 assert_eq!(named["outcome"], Value::text("allowed"));
515 assert_eq!(named["actor_kind"], Value::text("anonymous"));
516 assert_eq!(named["actor_user_id"], Value::Null);
517 assert_eq!(named["scope"], Value::Null);
518 assert_eq!(named["resource"], Value::text("policy:test"));
519 assert_eq!(named["matched_policy_id"], Value::text("p-abc"));
520 }
521 other => panic!("expected Row, got {other:?}"),
522 }
523 }
524
525 #[test]
526 fn emit_with_user_actor_records_kind_and_label() {
527 let store = Arc::new(UnifiedStore::new());
528 let ledger = RuntimeLedger::new(store.clone());
529 let user = UserId::scoped("acme", "alice");
530 let ctx = ControlEventCtx {
531 actor: ActorRef::User(&user),
532 scope: Some(Cow::Borrowed("acme")),
533 request_id: Some(Cow::Borrowed("req-42")),
534 trace_id: None,
535 };
536 ledger.emit(&ctx, sample_event()).unwrap();
537 let manager = store.get_collection(CONTROL_EVENTS_COLLECTION).unwrap();
538 let rows = manager.query_all(|_| true);
539 let named = match &rows[0].data {
540 EntityData::Row(r) => r.named.as_ref().unwrap(),
541 _ => panic!(),
542 };
543 assert_eq!(named["actor_kind"], Value::text("user"));
544 assert_eq!(named["actor_user_id"], Value::text("acme/alice"));
545 assert_eq!(named["scope"], Value::text("acme"));
546 assert_eq!(named["request_id"], Value::text("req-42"));
547 assert_eq!(named["trace_id"], Value::Null);
548 }
549
550 #[test]
551 fn sensitivity_hashed_is_stable_blake3() {
552 let a = Sensitivity::hashed(b"hunter2");
553 let b = Sensitivity::hashed(b"hunter2");
554 assert_eq!(a, b);
555 match a {
556 Sensitivity::Hashed { algo, hex } => {
557 assert_eq!(algo, "blake3");
558 assert_eq!(hex.len(), 64);
559 assert_eq!(hex, blake3::hash(b"hunter2").to_hex().to_string(),);
562 }
563 other => panic!("expected Hashed, got {other:?}"),
564 }
565 }
566
567 #[test]
568 fn sensitivity_redacted_serialises_without_value() {
569 let mut fields = HashMap::new();
570 fields.insert("password".to_string(), Sensitivity::redacted());
571 let s = serialise_fields(&fields);
572 assert_eq!(s, r#"{"password":{"kind":"redacted"}}"#);
573 }
574
575 struct FailingLedger;
583 impl ControlEventLedger for FailingLedger {
584 fn emit(
585 &self,
586 _: &ControlEventCtx<'_>,
587 _: ControlEvent,
588 ) -> Result<EventId, ControlEventError> {
589 Err(ControlEventError::Persistence("simulated".into()))
590 }
591 }
592
593 fn caller_decides(
594 cfg: ControlEventConfig,
595 ledger: &dyn ControlEventLedger,
596 ) -> Result<(), &'static str> {
597 match ledger.emit(&anon_ctx(), sample_event()) {
598 Ok(_) => Ok(()),
599 Err(_) if cfg.require_persistence() => Err("aborted"),
600 Err(_) => Ok(()),
601 }
602 }
603
604 #[test]
605 fn compliance_mode_makes_callers_fail_closed_on_persistence_failure() {
606 let cfg = ControlEventConfig {
607 compliance_mode: true,
608 };
609 assert!(cfg.require_persistence());
610 assert_eq!(caller_decides(cfg, &FailingLedger), Err("aborted"));
611 }
612
613 #[test]
614 fn default_mode_lets_callers_continue_on_persistence_failure() {
615 let cfg = ControlEventConfig::default();
616 assert!(!cfg.require_persistence());
617 assert!(FailingLedger.emit(&anon_ctx(), sample_event()).is_err());
619 assert_eq!(caller_decides(cfg, &FailingLedger), Ok(()));
621 }
622}