Skip to main content

arbiter_lifecycle/
state.rs

1use arbiter_audit::AuditSink;
2use arbiter_identity::{AnyRegistry, InMemoryRegistry};
3use arbiter_metrics::ArbiterMetrics;
4use arbiter_policy::PolicyConfig;
5use arbiter_session::{AnySessionStore, SessionStore};
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use tokio::sync::watch;
11use uuid::Uuid;
12
13use crate::token::TokenConfig;
14
15/// Simple sliding-window rate limiter for admin API endpoints.
16///
17/// Without rate limiting, an attacker who compromises the admin
18/// API key can make unlimited requests, enabling rapid brute-force enumeration
19/// of agents/sessions and denial-of-service against the control plane.
20///
21/// This implements a global sliding window: it tracks timestamps of recent
22/// requests and rejects new ones when the window is full.
23pub struct AdminRateLimiter {
24    /// Timestamps of requests within the current window.
25    window: Mutex<VecDeque<Instant>>,
26    /// Maximum requests allowed per window.
27    max_requests: u64,
28    /// Sliding window duration.
29    window_duration: Duration,
30}
31
32impl AdminRateLimiter {
33    /// Create a new rate limiter with the given capacity and window duration.
34    pub fn new(max_requests: u64, window_duration: Duration) -> Self {
35        Self {
36            window: Mutex::new(VecDeque::new()),
37            max_requests,
38            window_duration,
39        }
40    }
41
42    /// Check whether a request should be allowed.
43    ///
44    /// Returns `true` if the request is within the rate limit, `false` if it
45    /// should be rejected. Automatically evicts expired entries from the window.
46    pub fn check_rate_limit(&self) -> bool {
47        let now = Instant::now();
48        let mut window = self.window.lock().unwrap_or_else(|e| e.into_inner());
49
50        // Evict timestamps outside the sliding window.
51        while let Some(&front) = window.front() {
52            if now.duration_since(front) > self.window_duration {
53                window.pop_front();
54            } else {
55                break;
56            }
57        }
58
59        if (window.len() as u64) >= self.max_requests {
60            false
61        } else {
62            window.push_back(now);
63            true
64        }
65    }
66}
67
68/// Shared application state for the lifecycle API.
69#[derive(Clone)]
70pub struct AppState {
71    pub registry: Arc<AnyRegistry>,
72    pub admin_api_key: String,
73    pub token_config: TokenConfig,
74    pub session_store: AnySessionStore,
75    pub policy_config: Arc<watch::Sender<Arc<Option<PolicyConfig>>>>,
76    /// Audit sink for querying per-session stats on close.
77    pub audit_sink: Option<Arc<AuditSink>>,
78    /// Path to the policy TOML file (for hot-reload).
79    pub policy_file_path: Option<String>,
80    /// Percentage threshold for session budget/time warnings.
81    pub warning_threshold_pct: f64,
82    /// Default rate-limit window duration in seconds for new sessions.
83    pub default_rate_limit_window_secs: u64,
84    /// Shared metrics for gauge updates (active_sessions, registered_agents).
85    pub metrics: Arc<ArbiterMetrics>,
86    /// Maximum concurrent active sessions per agent (None = no limit).
87    /// P0: Per-agent session cap to prevent session multiplication attacks.
88    pub max_concurrent_sessions_per_agent: Option<u64>,
89    /// Rate limiter for admin API endpoints.
90    /// Shared via Arc so the Clone-based axum state sharing works correctly.
91    pub admin_rate_limiter: Arc<AdminRateLimiter>,
92}
93
94/// Default admin API rate limit: 60 requests per minute.
95const DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE: u64 = 60;
96
97impl AppState {
98    /// Create a new application state with the given admin API key.
99    pub fn new(admin_api_key: String) -> Self {
100        Self {
101            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
102            admin_api_key,
103            token_config: TokenConfig::default(),
104            session_store: AnySessionStore::InMemory(SessionStore::new()),
105            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
106            audit_sink: None,
107            policy_file_path: None,
108            warning_threshold_pct: 20.0,
109            default_rate_limit_window_secs: 60,
110            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
111            max_concurrent_sessions_per_agent: Some(10),
112            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
113                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
114                Duration::from_secs(60),
115            )),
116        }
117    }
118
119    /// Create a new application state with custom token config.
120    pub fn with_token_config(admin_api_key: String, token_config: TokenConfig) -> Self {
121        Self {
122            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
123            admin_api_key,
124            token_config,
125            session_store: AnySessionStore::InMemory(SessionStore::new()),
126            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
127            audit_sink: None,
128            policy_file_path: None,
129            warning_threshold_pct: 20.0,
130            default_rate_limit_window_secs: 60,
131            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
132            max_concurrent_sessions_per_agent: Some(10),
133            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
134                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
135                Duration::from_secs(60),
136            )),
137        }
138    }
139
140    /// Create a rate limiter with a custom max requests per minute.
141    pub fn with_admin_rate_limit(mut self, max_requests_per_minute: u64) -> Self {
142        self.admin_rate_limiter = Arc::new(AdminRateLimiter::new(
143            max_requests_per_minute,
144            Duration::from_secs(60),
145        ));
146        self
147    }
148
149    /// Log an admin API operation with structured fields for audit trail.
150    ///
151    /// All admin operations are now logged at info level with
152    /// structured tracing fields for observability and forensic analysis.
153    pub fn admin_audit_log(&self, operation: &str, agent_id: Option<Uuid>, detail: &str) {
154        match agent_id {
155            Some(id) => {
156                tracing::info!(
157                    operation = operation,
158                    agent_id = %id,
159                    detail = detail,
160                    "ADMIN_AUDIT: admin API operation"
161                );
162            }
163            None => {
164                tracing::info!(
165                    operation = operation,
166                    detail = detail,
167                    "ADMIN_AUDIT: admin API operation"
168                );
169            }
170        }
171    }
172}