hirn-engine 0.1.0

Engine for the hirn cognitive memory database
Documentation
use std::sync::Arc;

use hirn_core::timestamp::Timestamp;
use hirn_core::types::AgentId;
use hirn_core::{HirnError, HirnResult};
use hirn_storage::PhysicalStore;

use crate::event::MemoryEvent;
use crate::policy::{Action, AuthzRequest, PolicyEngine};

pub(crate) struct PolicyAuthorization {
    pub(crate) audit_event: MemoryEvent,
    pub(crate) denial_error: Option<HirnError>,
}

pub(crate) struct PolicyRuntime {
    storage: Arc<dyn PhysicalStore>,
    policy_engine: Option<PolicyEngine>,
}

impl PolicyRuntime {
    pub(crate) fn new(storage: Arc<dyn PhysicalStore>) -> Self {
        Self {
            storage,
            policy_engine: None,
        }
    }

    pub(crate) fn set_engine(&mut self, engine: PolicyEngine) {
        self.policy_engine = Some(engine);
    }

    pub(crate) fn engine(&self) -> Option<&PolicyEngine> {
        self.policy_engine.as_ref()
    }

    pub(crate) fn authorize(
        &self,
        agent_id: &str,
        action: Action,
        realm: &str,
        namespace: &str,
    ) -> Option<PolicyAuthorization> {
        let Some(engine) = &self.policy_engine else {
            return None;
        };

        let span = tracing::info_span!(
            "recall.authorize",
            agent_id = %agent_id,
            action = %action,
            decision = tracing::field::Empty,
            policy_ids = tracing::field::Empty,
            latency_us = tracing::field::Empty,
        );

        let _guard = span.enter();

        let request = AuthzRequest {
            agent_id: agent_id.to_string(),
            action,
            realm: realm.to_string(),
            namespace: namespace.to_string(),
        };

        let authz_start = std::time::Instant::now();
        let decision = engine.authorize(&request);
        let authz_elapsed = authz_start.elapsed();

        let decision_label = if decision.allowed { "allow" } else { "deny" };
        let latency_us = authz_elapsed.as_micros() as u64;
        span.record("decision", decision_label);
        span.record("latency_us", latency_us);
        span.record("policy_ids", &format!("{:?}", decision.policy_ids));

        metrics::counter!(crate::metrics::AUTHZ_DECISIONS_TOTAL, "decision" => decision_label)
            .increment(1);
        metrics::histogram!(crate::metrics::AUTHZ_LATENCY_SECONDS)
            .record(authz_elapsed.as_secs_f64());

        let audit_event = if decision.allowed {
            MemoryEvent::AccessGranted {
                action: action.to_string(),
                realm: realm.to_string(),
                namespace: namespace.to_string(),
                policy_ids: decision.policy_ids.clone(),
            }
        } else {
            MemoryEvent::AccessDenied {
                action: action.to_string(),
                realm: realm.to_string(),
                namespace: namespace.to_string(),
                reasons: decision.reasons.clone(),
                policy_ids: decision.policy_ids.clone(),
            }
        };

        let denial_error = if decision.allowed {
            None
        } else {
            let reasons = if decision.reasons.is_empty() {
                "no matching permit policy".to_string()
            } else {
                decision.reasons.join("; ")
            };
            Some(HirnError::AccessDenied(format!(
                "{} cannot {} on {}{}: {}",
                agent_id,
                action,
                realm,
                if namespace.is_empty() {
                    String::new()
                } else {
                    format!("/{namespace}")
                },
                reasons,
            )))
        };

        Some(PolicyAuthorization {
            audit_event,
            denial_error,
        })
    }

    pub(crate) fn is_action_allowed(
        &self,
        agent_id: &str,
        action: Action,
        realm: &str,
        namespace: &str,
    ) -> bool {
        let Some(engine) = &self.policy_engine else {
            return true;
        };

        let request = AuthzRequest {
            agent_id: agent_id.to_string(),
            action,
            realm: realm.to_string(),
            namespace: namespace.to_string(),
        };

        engine.authorize(&request).allowed
    }

    pub(crate) async fn append_audit(
        &self,
        actor: Option<AgentId>,
        action: hirn_core::audit::AuditAction,
    ) -> HirnResult<()> {
        let entry = hirn_core::audit::AuditEntry::new(actor, action);
        let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
            .map_err(|e| HirnError::storage(e))?;
        self.storage
            .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
            .await
            .map_err(|e| HirnError::storage(e))?;
        Ok(())
    }

    pub(crate) async fn audit_log(
        &self,
        after: Option<&Timestamp>,
        before: Option<&Timestamp>,
    ) -> HirnResult<Vec<hirn_core::audit::AuditEntry>> {
        let mut parts = Vec::new();
        if let Some(a) = after {
            parts.push(format!("timestamp_ms > {}", a.timestamp_ms()));
        }
        if let Some(b) = before {
            parts.push(format!("timestamp_ms < {}", b.timestamp_ms()));
        }
        let filter = if parts.is_empty() {
            None
        } else {
            Some(parts.join(" AND "))
        };

        let opts = hirn_storage::store::ScanOptions {
            filter,
            ..Default::default()
        };
        let batches = self
            .storage
            .scan(hirn_storage::datasets::audit::DATASET_NAME, opts)
            .await
            .map_err(|e| HirnError::storage(e))?;

        let mut result = Vec::new();
        for batch in &batches {
            let entries = hirn_storage::datasets::audit::from_batch(batch)
                .map_err(|e| HirnError::storage(e))?;
            result.extend(entries);
        }
        result.sort_by_key(|entry| entry.timestamp);
        Ok(result)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use hirn_storage::{HirnDb, HirnDbConfig};

    #[tokio::test(flavor = "multi_thread")]
    async fn no_engine_soft_check_allows() {
        let dir = tempfile::tempdir().expect("tempdir");
        let lance_path = dir.path().join("lance");
        let storage = HirnDb::open(HirnDbConfig::local(lance_path.to_str().expect("path")))
            .await
            .expect("open storage")
            .store_arc();
        let runtime = PolicyRuntime::new(storage);

        assert!(runtime.is_action_allowed("agent", Action::Recall, "realm", "namespace"));
        assert!(
            runtime
                .authorize("agent", Action::Recall, "realm", "namespace")
                .is_none()
        );
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn open_mode_engine_authorize_produces_audit_event() {
        let dir = tempfile::tempdir().expect("tempdir");
        let lance_path = dir.path().join("lance");
        let storage = HirnDb::open(HirnDbConfig::local(lance_path.to_str().expect("path")))
            .await
            .expect("open storage")
            .store_arc();
        let mut runtime = PolicyRuntime::new(storage);
        runtime.set_engine(PolicyEngine::open_mode());

        let decision = runtime
            .authorize("agent", Action::Recall, "realm", "namespace")
            .expect("configured engine should authorize");

        assert!(decision.denial_error.is_none());
        assert!(matches!(
            decision.audit_event,
            MemoryEvent::AccessGranted { .. }
        ));
    }
}