reddb_server/runtime/
query_audit.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::crypto::uuid::Uuid;
5use crate::storage::schema::types::Value;
6use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
7use crate::storage::UnifiedStore;
8use crate::utils::now_unix_millis;
9
10pub const QUERY_AUDIT_COLLECTION: &str = "red.query_audit";
11
12#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub struct QueryAuditRule {
14 pub actor: Option<String>,
15 pub tenant: Option<String>,
16 pub collection: Option<String>,
17 pub action: Option<String>,
18}
19
20impl QueryAuditRule {
21 pub fn new() -> Self {
22 Self::default()
23 }
24
25 pub fn actor(mut self, actor: impl Into<String>) -> Self {
26 self.actor = Some(actor.into());
27 self
28 }
29
30 pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
31 self.tenant = Some(tenant.into());
32 self
33 }
34
35 pub fn collection(mut self, collection: impl Into<String>) -> Self {
36 self.collection = Some(collection.into());
37 self
38 }
39
40 pub fn action(mut self, action: impl Into<String>) -> Self {
41 self.action = Some(action.into());
42 self
43 }
44
45 fn matches(&self, event: &QueryAuditEvent) -> bool {
46 self.actor
47 .as_deref()
48 .is_none_or(|actor| event.actor.as_deref() == Some(actor))
49 && self
50 .tenant
51 .as_deref()
52 .is_none_or(|tenant| event.tenant.as_deref() == Some(tenant))
53 && self
54 .action
55 .as_deref()
56 .is_none_or(|action| event.statement_kind.eq_ignore_ascii_case(action))
57 && self.collection.as_deref().is_none_or(|collection| {
58 event
59 .touched_collections
60 .iter()
61 .any(|touched| touched == collection)
62 })
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Default)]
67pub struct QueryAuditConfig {
68 pub enabled: bool,
69 pub rules: Vec<QueryAuditRule>,
70}
71
72impl QueryAuditConfig {
73 pub fn enabled_with_rules(rules: Vec<QueryAuditRule>) -> Self {
74 Self {
75 enabled: true,
76 rules,
77 }
78 }
79
80 pub fn regulated() -> Self {
81 Self {
82 enabled: true,
83 rules: Vec::new(),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
89pub struct QueryAuditEvent {
90 pub actor: Option<String>,
91 pub tenant: Option<String>,
92 pub statement_kind: &'static str,
93 pub touched_collections: Vec<String>,
94 pub duration_ms: u64,
95 pub row_count: u64,
96 pub request_id: Option<String>,
97 pub query_hash: Option<String>,
98}
99
100pub struct QueryAuditStream {
101 store: Arc<UnifiedStore>,
102 config: parking_lot::RwLock<QueryAuditConfig>,
103}
104
105impl QueryAuditStream {
106 pub fn new(store: Arc<UnifiedStore>, config: QueryAuditConfig) -> Self {
107 if config.enabled {
108 let _ = store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
109 }
110 Self {
111 store,
112 config: parking_lot::RwLock::new(config),
113 }
114 }
115
116 pub fn enable_infrastructure(&self) {
117 self.config.write().enabled = true;
118 let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
119 }
120
121 pub fn is_enabled(&self) -> bool {
122 self.config.read().enabled
123 }
124
125 pub fn has_rules(&self) -> bool {
126 let cfg = self.config.read();
127 cfg.enabled && !cfg.rules.is_empty()
128 }
129
130 pub fn rules(&self) -> Vec<QueryAuditRule> {
131 self.config.read().rules.clone()
132 }
133
134 pub fn add_rule(&self, rule: QueryAuditRule) {
135 let mut cfg = self.config.write();
136 cfg.enabled = true;
137 cfg.rules.push(rule);
138 let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
139 }
140
141 pub fn emit(&self, event: QueryAuditEvent) {
142 let cfg = self.config.read();
143 if !cfg.enabled || !cfg.rules.iter().any(|rule| rule.matches(&event)) {
144 return;
145 }
146 drop(cfg);
147
148 let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
149 let ts_ms = now_unix_millis();
150 let ts_ns = (ts_ms as i128)
151 .saturating_mul(1_000_000)
152 .min(i64::MAX as i128) as i64;
153 let id = Uuid::new_v7().to_string();
154
155 let mut named = HashMap::with_capacity(11);
156 named.insert("id".into(), Value::text(id));
157 named.insert("ts".into(), Value::Integer(ts_ns));
158 named.insert(
159 "actor".into(),
160 event.actor.map(Value::text).unwrap_or(Value::Null),
161 );
162 named.insert(
163 "tenant".into(),
164 event.tenant.map(Value::text).unwrap_or(Value::Null),
165 );
166 named.insert(
167 "statement_kind".into(),
168 Value::text(event.statement_kind.to_string()),
169 );
170 named.insert(
171 "touched_collections".into(),
172 Value::text(event.touched_collections.join(",")),
173 );
174 named.insert(
175 "duration_ms".into(),
176 Value::UnsignedInteger(event.duration_ms),
177 );
178 named.insert("row_count".into(), Value::UnsignedInteger(event.row_count));
179 named.insert(
180 "request_id".into(),
181 event.request_id.map(Value::text).unwrap_or(Value::Null),
182 );
183 named.insert(
184 "query_hash".into(),
185 event.query_hash.map(Value::text).unwrap_or(Value::Null),
186 );
187
188 let entity = UnifiedEntity::new(
189 EntityId::new(0),
190 EntityKind::TableRow {
191 table: Arc::from(QUERY_AUDIT_COLLECTION),
192 row_id: 0,
193 },
194 EntityData::Row(RowData {
195 columns: Vec::new(),
196 named: Some(named),
197 schema: None,
198 }),
199 );
200 let _ = self.store.insert_auto(QUERY_AUDIT_COLLECTION, entity);
201 }
202}