Skip to main content

arbiter_lifecycle/
state.rs

1use arbiter_audit::AuditSink;
2use arbiter_identity::{AnyRegistry, InMemoryRegistry};
3use arbiter_metrics::ArbiterMetrics;
4use arbiter_policy::PolicyConfig;
5use arbiter_session::{AnySessionStore, SessionStore};
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use tokio::sync::watch;
11use uuid::Uuid;
12
13use crate::token::TokenConfig;
14
15/// Per-client sliding-window rate limiter for admin API endpoints.
16///
17/// Tracks request timestamps per source key (IP address or API key),
18/// preventing a single attacker from exhausting the rate limit budget
19/// for all legitimate operators.
20pub struct AdminRateLimiter {
21    /// Per-client windows: key -> timestamps within window.
22    clients: Mutex<HashMap<String, VecDeque<Instant>>>,
23    /// Maximum requests allowed per window per client.
24    max_requests: u64,
25    /// Sliding window duration.
26    window_duration: Duration,
27}
28
29impl AdminRateLimiter {
30    /// Create a new rate limiter with the given capacity and window duration.
31    pub fn new(max_requests: u64, window_duration: Duration) -> Self {
32        Self {
33            clients: Mutex::new(HashMap::new()),
34            max_requests,
35            window_duration,
36        }
37    }
38
39    /// Check whether a request from the given client key should be allowed.
40    ///
41    /// Returns `true` if the request is within the rate limit, `false` if it
42    /// should be rejected.
43    pub fn check_rate_limit(&self, client_key: &str) -> bool {
44        let now = Instant::now();
45        let mut clients = self.clients.lock().unwrap_or_else(|e| e.into_inner());
46        let window = clients.entry(client_key.to_string()).or_default();
47
48        // Evict timestamps outside the sliding window.
49        while let Some(&front) = window.front() {
50            if now.duration_since(front) > self.window_duration {
51                window.pop_front();
52            } else {
53                break;
54            }
55        }
56
57        if (window.len() as u64) >= self.max_requests {
58            false
59        } else {
60            window.push_back(now);
61            true
62        }
63    }
64
65    /// Backward-compatible global check (uses "__global__" as the key).
66    pub fn check_rate_limit_global(&self) -> bool {
67        self.check_rate_limit("__global__")
68    }
69}
70
71/// Shared application state for the lifecycle API.
72#[derive(Clone)]
73pub struct AppState {
74    pub registry: Arc<AnyRegistry>,
75    pub admin_api_key: String,
76    pub token_config: TokenConfig,
77    pub session_store: AnySessionStore,
78    pub policy_config: Arc<watch::Sender<Arc<Option<PolicyConfig>>>>,
79    /// Audit sink for querying per-session stats on close.
80    pub audit_sink: Option<Arc<AuditSink>>,
81    /// Path to the policy TOML file (for hot-reload).
82    pub policy_file_path: Option<String>,
83    /// Percentage threshold for session budget/time warnings.
84    pub warning_threshold_pct: f64,
85    /// Default rate-limit window duration in seconds for new sessions.
86    pub default_rate_limit_window_secs: u64,
87    /// Shared metrics for gauge updates (active_sessions, registered_agents).
88    pub metrics: Arc<ArbiterMetrics>,
89    /// Maximum concurrent active sessions per agent (None = no limit).
90    /// P0: Per-agent session cap to prevent session multiplication attacks.
91    pub max_concurrent_sessions_per_agent: Option<u64>,
92    /// Rate limiter for admin API endpoints.
93    /// Shared via Arc so the Clone-based axum state sharing works correctly.
94    pub admin_rate_limiter: Arc<AdminRateLimiter>,
95}
96
97/// Default admin API rate limit: 60 requests per minute.
98const DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE: u64 = 60;
99
100/// Minimum admin API key length. Keys shorter than this are likely
101/// trivially brutable or are default/placeholder values.
102pub const MIN_ADMIN_API_KEY_LEN: usize = 16;
103
104impl AppState {
105    /// Create a new application state with the given admin API key.
106    pub fn new(admin_api_key: String) -> Self {
107        if admin_api_key.len() < MIN_ADMIN_API_KEY_LEN {
108            tracing::error!(
109                length = admin_api_key.len(),
110                minimum = MIN_ADMIN_API_KEY_LEN,
111                "admin API key is shorter than minimum recommended length; \
112                 the gateway will start but authentication is weakened"
113            );
114        }
115        Self {
116            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
117            admin_api_key,
118            token_config: TokenConfig::default(),
119            session_store: AnySessionStore::InMemory(SessionStore::new()),
120            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
121            audit_sink: None,
122            policy_file_path: None,
123            warning_threshold_pct: 20.0,
124            default_rate_limit_window_secs: 60,
125            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
126            max_concurrent_sessions_per_agent: Some(10),
127            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
128                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
129                Duration::from_secs(60),
130            )),
131        }
132    }
133
134    /// Create a new application state with custom token config.
135    pub fn with_token_config(admin_api_key: String, token_config: TokenConfig) -> Self {
136        Self {
137            registry: Arc::new(AnyRegistry::InMemory(InMemoryRegistry::new())),
138            admin_api_key,
139            token_config,
140            session_store: AnySessionStore::InMemory(SessionStore::new()),
141            policy_config: Arc::new(watch::channel(Arc::new(None)).0),
142            audit_sink: None,
143            policy_file_path: None,
144            warning_threshold_pct: 20.0,
145            default_rate_limit_window_secs: 60,
146            metrics: Arc::new(ArbiterMetrics::new().expect("metrics registry init")),
147            max_concurrent_sessions_per_agent: Some(10),
148            admin_rate_limiter: Arc::new(AdminRateLimiter::new(
149                DEFAULT_ADMIN_MAX_REQUESTS_PER_MINUTE,
150                Duration::from_secs(60),
151            )),
152        }
153    }
154
155    /// Create a rate limiter with a custom max requests per minute.
156    pub fn with_admin_rate_limit(mut self, max_requests_per_minute: u64) -> Self {
157        self.admin_rate_limiter = Arc::new(AdminRateLimiter::new(
158            max_requests_per_minute,
159            Duration::from_secs(60),
160        ));
161        self
162    }
163
164    /// Log an admin API operation with structured fields for audit trail.
165    ///
166    /// All admin operations are now logged at info level with
167    /// structured tracing fields for observability and forensic analysis.
168    pub fn admin_audit_log(&self, operation: &str, agent_id: Option<Uuid>, detail: &str) {
169        match agent_id {
170            Some(id) => {
171                tracing::info!(
172                    operation = operation,
173                    agent_id = %id,
174                    detail = detail,
175                    "ADMIN_AUDIT: admin API operation"
176                );
177            }
178            None => {
179                tracing::info!(
180                    operation = operation,
181                    detail = detail,
182                    "ADMIN_AUDIT: admin API operation"
183                );
184            }
185        }
186    }
187}