pub mod audit;
pub mod cache;
pub mod cedar;
pub mod client;
pub mod signing;
pub mod streaming;
pub mod tenant;
use anyhow::{Result, bail};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, watch};
use crate::config::EnterpriseConfig;
pub use audit::{PolicyAuditLogger, PolicyDecisionLog};
pub use cache::{OfflineMode, PolicyCache};
pub use cedar::{
Action, CedarEngine, PolicyDecision, PolicyEffect, Principal, Resource, validate_cedar_syntax,
};
pub use client::PolicyClient;
pub use signing::{PolicyBundle, TrustAnchor, verify_bundle};
const DEFAULT_POLICY: &str = r#"
// Default enterprise policy: permit all actions for authenticated users
permit(
principal is AgentKernel::User,
action,
resource is AgentKernel::Sandbox
);
"#;
pub struct PolicyEngine {
engine: Arc<RwLock<CedarEngine>>,
cache: PolicyCache,
audit: PolicyAuditLogger,
client: Option<Arc<PolicyClient>>,
trust_anchors: Vec<TrustAnchor>,
current_version: Arc<RwLock<u64>>,
org_id: Option<String>,
shutdown_tx: Option<watch::Sender<bool>>,
}
impl PolicyEngine {
pub fn new(config: &EnterpriseConfig) -> Result<Self> {
if !config.enabled {
bail!("Enterprise policy engine is not enabled");
}
let offline_mode =
OfflineMode::from_config(&config.offline_mode, config.cache_max_age_hours);
let cache = PolicyCache::default_dir(offline_mode.clone());
let audit = PolicyAuditLogger::default_path();
let trust_anchors = build_trust_anchors(&config.trust_anchors.keys);
let client = if let Some(ref server) = config.policy_server {
let api_key = config
.api_key_env
.as_ref()
.and_then(|env_name| std::env::var(env_name).ok());
Some(Arc::new(PolicyClient::new(server, api_key)?))
} else {
None
};
let initial_policies = match cache.load() {
Ok(Some(bundle)) => {
if !trust_anchors.is_empty() {
if let Err(e) = verify_bundle(&bundle, &trust_anchors, None) {
eprintln!(
"[enterprise] Cached bundle failed verification: {}. Using default.",
e
);
DEFAULT_POLICY.to_string()
} else {
bundle.policies.clone()
}
} else {
bundle.policies.clone()
}
}
Ok(None) => DEFAULT_POLICY.to_string(),
Err(e) => {
eprintln!("[enterprise] Failed to load cache: {}. Using default.", e);
DEFAULT_POLICY.to_string()
}
};
let engine = CedarEngine::new(&initial_policies)?;
let current_version = cache.cached_version()?.unwrap_or(0);
Ok(Self {
engine: Arc::new(RwLock::new(engine)),
cache,
audit,
client,
trust_anchors,
current_version: Arc::new(RwLock::new(current_version)),
org_id: config.org_id.clone(),
shutdown_tx: None,
})
}
pub async fn start(&mut self) -> Result<()> {
if let Some(ref client) = self.client {
match client.fetch_bundle().await {
Ok(bundle) => {
self.apply_bundle(bundle).await?;
}
Err(e) => {
eprintln!(
"[enterprise] Could not reach policy server: {}. Using cached/default.",
e
);
}
}
}
if let Some(ref client) = self.client {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
self.shutdown_tx = Some(shutdown_tx);
let mut bundle_rx = client.clone().poll(Duration::from_secs(300), shutdown_rx);
let engine = self.engine.clone();
let trust_anchors = self.trust_anchors.clone();
let cache_dir = self.cache.cache_dir().to_path_buf();
let offline_mode = OfflineMode::from_config("cached_with_expiry", 24);
let current_version = self.current_version.clone();
tokio::spawn(async move {
while bundle_rx.changed().await.is_ok() {
let bundle = { bundle_rx.borrow().clone() };
if let Some(bundle) = bundle {
let min_ver = *current_version.read().await;
if !trust_anchors.is_empty()
&& let Err(e) = verify_bundle(&bundle, &trust_anchors, Some(min_ver))
{
eprintln!("[enterprise] Policy bundle verification failed: {}", e);
continue;
}
{
let mut eng = engine.write().await;
if let Err(e) = eng.update_policies(&bundle.policies) {
eprintln!("[enterprise] Failed to update policies: {}", e);
continue;
}
}
let cache = PolicyCache::new(cache_dir.clone(), offline_mode.clone());
if let Err(e) = cache.store(&bundle) {
eprintln!("[enterprise] Failed to cache bundle: {}", e);
}
*current_version.write().await = bundle.version;
}
}
});
}
Ok(())
}
pub async fn evaluate(
&self,
principal: &Principal,
action: Action,
resource: &Resource,
) -> PolicyDecision {
let engine = self.engine.read().await;
let decision = engine.evaluate(principal, action, resource, None);
let log_entry = PolicyDecisionLog::new(
&principal.id,
action,
&resource.name,
decision.decision,
decision.matched_policies.clone(),
decision.evaluation_time_us,
self.org_id.clone(),
Some(decision.reason.clone()),
);
if let Err(e) = self.audit.log_decision(&log_entry) {
eprintln!("[enterprise] Failed to write audit log: {}", e);
}
decision
}
pub async fn reload(&mut self) -> Result<()> {
let client = self
.client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("No policy server configured"))?;
let bundle = client.fetch_bundle().await?;
self.apply_bundle(bundle).await
}
pub async fn version(&self) -> u64 {
*self.current_version.read().await
}
pub fn audit_logger(&self) -> &PolicyAuditLogger {
&self.audit
}
async fn apply_bundle(&self, bundle: PolicyBundle) -> Result<()> {
let min_ver = *self.current_version.read().await;
if !self.trust_anchors.is_empty() {
verify_bundle(&bundle, &self.trust_anchors, Some(min_ver))?;
}
{
let mut engine = self.engine.write().await;
engine.update_policies(&bundle.policies)?;
}
self.cache.store(&bundle)?;
*self.current_version.write().await = bundle.version;
Ok(())
}
pub fn shutdown(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(true);
}
}
}
impl Drop for PolicyEngine {
fn drop(&mut self) {
self.shutdown();
}
}
fn build_trust_anchors(key_ids: &[String]) -> Vec<TrustAnchor> {
key_ids
.iter()
.map(|key_id| TrustAnchor {
key_id: key_id.clone(),
public_key: vec![0u8; 32],
valid_from: chrono::Utc::now() - chrono::Duration::days(365),
valid_until: None,
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> EnterpriseConfig {
EnterpriseConfig {
enabled: true,
policy_server: None, org_id: Some("test-org".to_string()),
api_key_env: None,
offline_mode: "default_policy".to_string(),
cache_max_age_hours: 24,
trust_anchors: crate::config::TrustAnchorsConfig { keys: vec![] },
default_roles: vec!["developer".to_string()],
jwks_url: None,
}
}
#[test]
fn test_engine_creation() {
let config = test_config();
let engine = PolicyEngine::new(&config);
assert!(engine.is_ok());
}
#[test]
fn test_disabled_engine() {
let mut config = test_config();
config.enabled = false;
let result = PolicyEngine::new(&config);
assert!(result.is_err());
}
#[tokio::test]
async fn test_evaluate_default_policy() {
let config = test_config();
let engine = PolicyEngine::new(&config).unwrap();
let principal = Principal {
id: "alice".to_string(),
email: "alice@test.com".to_string(),
org_id: "test-org".to_string(),
roles: vec!["developer".to_string()],
mfa_verified: true,
};
let resource = Resource {
name: "test-sandbox".to_string(),
agent_type: "claude".to_string(),
runtime: "python".to_string(),
};
let decision = engine.evaluate(&principal, Action::Run, &resource).await;
assert!(decision.is_permit());
}
#[tokio::test]
async fn test_version_tracking() {
let config = test_config();
let engine = PolicyEngine::new(&config).unwrap();
let _version = engine.version().await;
}
#[test]
fn test_build_trust_anchors() {
let keys = vec!["key1".to_string(), "key2".to_string()];
let anchors = build_trust_anchors(&keys);
assert_eq!(anchors.len(), 2);
assert_eq!(anchors[0].key_id, "key1");
assert_eq!(anchors[1].key_id, "key2");
}
}