Skip to main content

reddb_server/runtime/
query_audit.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::crypto::uuid::Uuid;
5use crate::storage::schema::types::Value;
6use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
7use crate::storage::UnifiedStore;
8use crate::utils::now_unix_millis;
9
10pub const QUERY_AUDIT_COLLECTION: &str = "red.query_audit";
11
12#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub struct QueryAuditRule {
14    pub actor: Option<String>,
15    pub tenant: Option<String>,
16    pub collection: Option<String>,
17    pub action: Option<String>,
18}
19
20impl QueryAuditRule {
21    pub fn new() -> Self {
22        Self::default()
23    }
24
25    pub fn actor(mut self, actor: impl Into<String>) -> Self {
26        self.actor = Some(actor.into());
27        self
28    }
29
30    pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
31        self.tenant = Some(tenant.into());
32        self
33    }
34
35    pub fn collection(mut self, collection: impl Into<String>) -> Self {
36        self.collection = Some(collection.into());
37        self
38    }
39
40    pub fn action(mut self, action: impl Into<String>) -> Self {
41        self.action = Some(action.into());
42        self
43    }
44
45    fn matches(&self, event: &QueryAuditEvent) -> bool {
46        self.actor
47            .as_deref()
48            .is_none_or(|actor| event.actor.as_deref() == Some(actor))
49            && self
50                .tenant
51                .as_deref()
52                .is_none_or(|tenant| event.tenant.as_deref() == Some(tenant))
53            && self
54                .action
55                .as_deref()
56                .is_none_or(|action| event.statement_kind.eq_ignore_ascii_case(action))
57            && self.collection.as_deref().is_none_or(|collection| {
58                event
59                    .touched_collections
60                    .iter()
61                    .any(|touched| touched == collection)
62            })
63    }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Default)]
67pub struct QueryAuditConfig {
68    pub enabled: bool,
69    pub rules: Vec<QueryAuditRule>,
70}
71
72impl QueryAuditConfig {
73    pub fn enabled_with_rules(rules: Vec<QueryAuditRule>) -> Self {
74        Self {
75            enabled: true,
76            rules,
77        }
78    }
79
80    pub fn regulated() -> Self {
81        Self {
82            enabled: true,
83            rules: Vec::new(),
84        }
85    }
86}
87
88#[derive(Debug, Clone)]
89pub struct QueryAuditEvent {
90    pub actor: Option<String>,
91    pub tenant: Option<String>,
92    pub statement_kind: &'static str,
93    pub touched_collections: Vec<String>,
94    pub duration_ms: u64,
95    pub row_count: u64,
96    pub request_id: Option<String>,
97    pub query_hash: Option<String>,
98}
99
100pub struct QueryAuditStream {
101    store: Arc<UnifiedStore>,
102    config: parking_lot::RwLock<QueryAuditConfig>,
103}
104
105impl QueryAuditStream {
106    pub fn new(store: Arc<UnifiedStore>, config: QueryAuditConfig) -> Self {
107        if config.enabled {
108            let _ = store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
109        }
110        Self {
111            store,
112            config: parking_lot::RwLock::new(config),
113        }
114    }
115
116    pub fn enable_infrastructure(&self) {
117        self.config.write().enabled = true;
118        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
119    }
120
121    pub fn is_enabled(&self) -> bool {
122        self.config.read().enabled
123    }
124
125    pub fn has_rules(&self) -> bool {
126        let cfg = self.config.read();
127        cfg.enabled && !cfg.rules.is_empty()
128    }
129
130    pub fn rules(&self) -> Vec<QueryAuditRule> {
131        self.config.read().rules.clone()
132    }
133
134    pub fn add_rule(&self, rule: QueryAuditRule) {
135        let mut cfg = self.config.write();
136        cfg.enabled = true;
137        cfg.rules.push(rule);
138        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
139    }
140
141    pub fn emit(&self, event: QueryAuditEvent) {
142        let cfg = self.config.read();
143        if !cfg.enabled || !cfg.rules.iter().any(|rule| rule.matches(&event)) {
144            return;
145        }
146        drop(cfg);
147
148        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
149        let ts_ms = now_unix_millis();
150        let ts_ns = (ts_ms as i128)
151            .saturating_mul(1_000_000)
152            .min(i64::MAX as i128) as i64;
153        let id = Uuid::new_v7().to_string();
154
155        let mut named = HashMap::with_capacity(11);
156        named.insert("id".into(), Value::text(id));
157        named.insert("ts".into(), Value::Integer(ts_ns));
158        named.insert(
159            "actor".into(),
160            event.actor.map(Value::text).unwrap_or(Value::Null),
161        );
162        named.insert(
163            "tenant".into(),
164            event.tenant.map(Value::text).unwrap_or(Value::Null),
165        );
166        named.insert(
167            "statement_kind".into(),
168            Value::text(event.statement_kind.to_string()),
169        );
170        named.insert(
171            "touched_collections".into(),
172            Value::text(event.touched_collections.join(",")),
173        );
174        named.insert(
175            "duration_ms".into(),
176            Value::UnsignedInteger(event.duration_ms),
177        );
178        named.insert("row_count".into(), Value::UnsignedInteger(event.row_count));
179        named.insert(
180            "request_id".into(),
181            event.request_id.map(Value::text).unwrap_or(Value::Null),
182        );
183        named.insert(
184            "query_hash".into(),
185            event.query_hash.map(Value::text).unwrap_or(Value::Null),
186        );
187
188        let entity = UnifiedEntity::new(
189            EntityId::new(0),
190            EntityKind::TableRow {
191                table: Arc::from(QUERY_AUDIT_COLLECTION),
192                row_id: 0,
193            },
194            EntityData::Row(RowData {
195                columns: Vec::new(),
196                named: Some(named),
197                schema: None,
198            }),
199        );
200        let _ = self.store.insert_auto(QUERY_AUDIT_COLLECTION, entity);
201    }
202}