reddb-io-server 1.9.1

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use crate::crypto::uuid::Uuid;
use crate::storage::schema::types::Value;
use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
use crate::storage::UnifiedStore;
use crate::utils::now_unix_millis;

pub const QUERY_AUDIT_COLLECTION: &str = "red.query_audit";

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct QueryAuditRule {
    pub actor: Option<String>,
    pub tenant: Option<String>,
    pub collection: Option<String>,
    pub action: Option<String>,
}

impl QueryAuditRule {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn actor(mut self, actor: impl Into<String>) -> Self {
        self.actor = Some(actor.into());
        self
    }

    pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
        self.tenant = Some(tenant.into());
        self
    }

    pub fn collection(mut self, collection: impl Into<String>) -> Self {
        self.collection = Some(collection.into());
        self
    }

    pub fn action(mut self, action: impl Into<String>) -> Self {
        self.action = Some(action.into());
        self
    }

    fn matches(&self, event: &QueryAuditEvent) -> bool {
        self.actor
            .as_deref()
            .is_none_or(|actor| event.actor.as_deref() == Some(actor))
            && self
                .tenant
                .as_deref()
                .is_none_or(|tenant| event.tenant.as_deref() == Some(tenant))
            && self
                .action
                .as_deref()
                .is_none_or(|action| event.statement_kind.eq_ignore_ascii_case(action))
            && self.collection.as_deref().is_none_or(|collection| {
                event
                    .touched_collections
                    .iter()
                    .any(|touched| touched == collection)
            })
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct QueryAuditConfig {
    pub enabled: bool,
    pub rules: Vec<QueryAuditRule>,
}

impl QueryAuditConfig {
    pub fn enabled_with_rules(rules: Vec<QueryAuditRule>) -> Self {
        Self {
            enabled: true,
            rules,
        }
    }

    pub fn regulated() -> Self {
        Self {
            enabled: true,
            rules: Vec::new(),
        }
    }
}

#[derive(Debug, Clone)]
pub struct QueryAuditEvent {
    pub actor: Option<String>,
    pub tenant: Option<String>,
    pub statement_kind: &'static str,
    pub touched_collections: Vec<String>,
    pub duration_ms: u64,
    pub row_count: u64,
    pub request_id: Option<String>,
    pub query_hash: Option<String>,
}

pub struct QueryAuditStream {
    store: Arc<UnifiedStore>,
    config: parking_lot::RwLock<QueryAuditConfig>,
}

impl QueryAuditStream {
    pub fn new(store: Arc<UnifiedStore>, config: QueryAuditConfig) -> Self {
        if config.enabled {
            let _ = store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
        }
        Self {
            store,
            config: parking_lot::RwLock::new(config),
        }
    }

    pub fn enable_infrastructure(&self) {
        self.config.write().enabled = true;
        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
    }

    pub fn is_enabled(&self) -> bool {
        self.config.read().enabled
    }

    pub fn has_rules(&self) -> bool {
        let cfg = self.config.read();
        cfg.enabled && !cfg.rules.is_empty()
    }

    pub fn rules(&self) -> Vec<QueryAuditRule> {
        self.config.read().rules.clone()
    }

    pub fn add_rule(&self, rule: QueryAuditRule) {
        let mut cfg = self.config.write();
        cfg.enabled = true;
        cfg.rules.push(rule);
        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
    }

    pub fn emit(&self, event: QueryAuditEvent) {
        let cfg = self.config.read();
        if !cfg.enabled || !cfg.rules.iter().any(|rule| rule.matches(&event)) {
            return;
        }
        drop(cfg);

        let _ = self.store.get_or_create_collection(QUERY_AUDIT_COLLECTION);
        let ts_ms = now_unix_millis();
        let ts_ns = (ts_ms as i128)
            .saturating_mul(1_000_000)
            .min(i64::MAX as i128) as i64;
        let id = Uuid::new_v7().to_string();

        let mut named = HashMap::with_capacity(11);
        named.insert("id".into(), Value::text(id));
        named.insert("ts".into(), Value::Integer(ts_ns));
        named.insert(
            "actor".into(),
            event.actor.map(Value::text).unwrap_or(Value::Null),
        );
        named.insert(
            "tenant".into(),
            event.tenant.map(Value::text).unwrap_or(Value::Null),
        );
        named.insert(
            "statement_kind".into(),
            Value::text(event.statement_kind.to_string()),
        );
        named.insert(
            "touched_collections".into(),
            Value::text(event.touched_collections.join(",")),
        );
        named.insert(
            "duration_ms".into(),
            Value::UnsignedInteger(event.duration_ms),
        );
        named.insert("row_count".into(), Value::UnsignedInteger(event.row_count));
        named.insert(
            "request_id".into(),
            event.request_id.map(Value::text).unwrap_or(Value::Null),
        );
        named.insert(
            "query_hash".into(),
            event.query_hash.map(Value::text).unwrap_or(Value::Null),
        );

        let entity = UnifiedEntity::new(
            EntityId::new(0),
            EntityKind::TableRow {
                table: Arc::from(QUERY_AUDIT_COLLECTION),
                row_id: 0,
            },
            EntityData::Row(RowData {
                columns: Vec::new(),
                named: Some(named),
                schema: None,
            }),
        );
        let _ = self.store.insert_auto(QUERY_AUDIT_COLLECTION, entity);
    }
}