arbiter-lifecycle 0.0.46

Agent lifecycle HTTP API for Arbiter
Documentation
use arbiter_audit::AuditSink;
use arbiter_identity::{AnyRegistry, InMemoryRegistry};
use arbiter_metrics::ArbiterMetrics;
use arbiter_policy::PolicyConfig;
use arbiter_session::{AnySessionStore, SessionStore};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use uuid::Uuid;

use crate::token::TokenConfig;

/// Per-client sliding-window rate limiter for admin API endpoints.
///
/// Tracks request timestamps per source key (IP address or API key),
/// preventing a single attacker from exhausting the rate limit budget
/// for all legitimate operators.
pub struct AdminRateLimiter {
    /// Per-client windows: key -> timestamps within window.
    clients: Mutex<HashMap<String, VecDeque<Instant>>>,
    /// Maximum requests allowed per window per client.
    max_requests: u64,
    /// Sliding window duration.
    window_duration: Duration,
}

impl AdminRateLimiter {
    /// Create a new rate limiter with the given capacity and window duration.
    pub fn new(max_requests: u64, window_duration: Duration) -> Self {
        Self {
            clients: Mutex::new(HashMap::new()),
            max_requests,
            window_duration,
        }
    }

    /// Check whether a request from the given client key should be allowed.
    ///
    /// Returns `true` if the request is within the rate limit, `false` if it
    /// should be rejected.
    pub fn check_rate_limit(&self, client_key: &str) -> bool {
        let now = Instant::now();
        let mut clients = self.clients.lock().unwrap_or_else(|e| e.into_inner());
        let window = clients.entry(client_key.to_string()).or_default();

        // Evict timestamps outside the sliding window.
        while let Some(&front) = window.front() {
            if now.duration_since(front) > self.window_duration {
                window.pop_front();
            } else {
                break;
            }
        }

        if (window.len() as u64) >= self.max_requests {
            false
        } else {
            window.push_back(now);
            true
        }
    }

    /// Backward-compatible global check (uses "__global__" as the key).
    pub fn check_rate_limit_global(&self) -> bool {
        self.check_rate_limit("__global__")
    }
}

/// Shared application state for the lifecycle API.
#[derive(Clone)]
pub struct AppState {
    pub registry: Arc<AnyRegistry>,
    pub admin_api_key: String,
    pub token_config: TokenConfig,
    pub session_store: AnySessionStore,
    pub policy_config: Arc<watch::Sender<Arc<Option<PolicyConfig>>>>,
    /// Audit sink for querying per-session stats on close.
    pub audit_sink: Option<Arc<AuditSink>>,
    /// Path to the policy TOML file (for hot-reload).
    pub policy_file_path: Option<String>,
    /// Percentage threshold for session budget/time warnings.
    pub warning_threshold_pct: f64,
    /// Default rate-limit window duration in seconds for new sessions.
    pub default_rate_limit_window_secs: u64,
    /// Shared metrics for gauge updates (active_sessions, registered_agents).
    pub metrics: Arc<ArbiterMetrics>,
    /// Maximum concurrent active sessions per agent (None = no limit).
    /// P0: Per-agent session cap to prevent session multiplication attacks.
    pub max_concurrent_sessions_per_agent: Option<u64>,
    /// Rate limiter for admin API endpoints.
    /// Shared via Arc so the Clone-based axum state sharing works correctly.
    pub admin_rate_limiter: Arc<AdminRateLimiter>,
}

/// Default admin API rate limit: 60 requests per minute.
const DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE: u64 = 60;

/// Minimum admin API key length. Keys shorter than this are likely
/// trivially brutable or are default/placeholder values.
pub const MIN_ADMIN_API_KEY_LEN: usize = 16;

impl AppState {
    /// Create a new application state with the given admin API key.
    pub fn new(admin_api_key: String) -> Self {
        if admin_api_key.len() < MIN_ADMIN_API_KEY_LEN {
            tracing::error!(
                length = admin_api_key.len(),
                minimum = MIN_ADMIN_API_KEY_LEN,
                "admin API key is shorter than minimum recommended length; \
                 the gateway will start but authentication is weakened"
            );
        }
        Self {
            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
            admin_api_key,
            token_config: TokenConfig::default(),
            session_store: AnySessionStore::InMemory(SessionStore::new()),
            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
            audit_sink: None,
            policy_file_path: None,
            warning_threshold_pct: 20.0,
            default_rate_limit_window_secs: 60,
            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
            max_concurrent_sessions_per_agent: Some(10),
            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
                Duration::from_secs(60),
            )),
        }
    }

    /// Create a new application state with custom token config.
    pub fn with_token_config(admin_api_key: String, token_config: TokenConfig) -> Self {
        Self {
            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
            admin_api_key,
            token_config,
            session_store: AnySessionStore::InMemory(SessionStore::new()),
            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
            audit_sink: None,
            policy_file_path: None,
            warning_threshold_pct: 20.0,
            default_rate_limit_window_secs: 60,
            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
            max_concurrent_sessions_per_agent: Some(10),
            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
                Duration::from_secs(60),
            )),
        }
    }

    /// Create a rate limiter with a custom max requests per minute.
    pub fn with_admin_rate_limit(mut self, max_requests_per_minute: u64) -> Self {
        self.admin_rate_limiter = Arc::new(AdminRateLimiter::new(
            max_requests_per_minute,
            Duration::from_secs(60),
        ));
        self
    }

    /// Log an admin API operation with structured fields for audit trail.
    ///
    /// All admin operations are now logged at info level with
    /// structured tracing fields for observability and forensic analysis.
    pub fn admin_audit_log(&self, operation: &str, agent_id: Option<Uuid>, detail: &str) {
        match agent_id {
            Some(id) => {
                tracing::info!(
                    operation = operation,
                    agent_id = %id,
                    detail = detail,
                    "ADMIN_AUDIT: admin API operation"
                );
            }
            None => {
                tracing::info!(
                    operation = operation,
                    detail = detail,
                    "ADMIN_AUDIT: admin API operation"
                );
            }
        }
    }
}