Skip to main content

arbiter_lifecycle/
api.rs

1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18/// Request body for POST /agents.
19#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21    pub owner: String,
22    pub model: String,
23    #[serde(default)]
24    pub capabilities: Vec<String>,
25    #[serde(default = "default_trust_level")]
26    pub trust_level: TrustLevel,
27    #[serde(default)]
28    pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32    TrustLevel::Untrusted
33}
34
35/// Response body for POST /agents.
36#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38    pub agent_id: Uuid,
39    pub token: String,
40}
41
42/// Request body for POST /agents/:id/delegate.
43#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45    pub to: Uuid,
46    pub scopes: Vec<String>,
47    #[serde(default)]
48    pub expires_at: Option<DateTime<Utc>>,
49}
50
51/// Request body for POST /agents/:id/token.
52#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54    #[serde(default)]
55    pub expiry_seconds: Option<i64>,
56}
57
58/// Request body for POST /sessions.
59#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61    pub agent_id: Uuid,
62    #[serde(default)]
63    pub delegation_chain_snapshot: Vec<String>,
64    pub declared_intent: String,
65    #[serde(default)]
66    pub authorized_tools: Vec<String>,
67    /// Credential references this session may resolve. Empty = no credentials.
68    #[serde(default)]
69    pub authorized_credentials: Option<Vec<String>>,
70    #[serde(default = "default_time_limit")]
71    pub time_limit_secs: i64,
72    #[serde(default = "default_call_budget")]
73    pub call_budget: u64,
74    #[serde(default)]
75    pub rate_limit_per_minute: Option<u64>,
76    /// Override the global rate-limit window duration for this session (seconds).
77    /// If omitted, uses the server default from `[sessions].rate_limit_window_secs`.
78    #[serde(default)]
79    pub rate_limit_window_secs: Option<u64>,
80    #[serde(default = "default_data_sensitivity")]
81    pub data_sensitivity_ceiling: DataSensitivity,
82}
83
84fn default_time_limit() -> i64 {
85    3600
86}
87
88fn default_call_budget() -> u64 {
89    1000
90}
91
92fn default_data_sensitivity() -> DataSensitivity {
93    DataSensitivity::Internal
94}
95
96/// Response body for POST /sessions.
97#[derive(Debug, Serialize)]
98pub struct CreateSessionResponse {
99    pub session_id: Uuid,
100    pub declared_intent: String,
101    pub authorized_tools: Vec<String>,
102    pub authorized_credentials: Vec<String>,
103    pub call_budget: u64,
104    pub time_limit_secs: i64,
105}
106
107/// Response body for GET /sessions/:id.
108#[derive(Debug, Serialize)]
109pub struct SessionStatusResponse {
110    pub session_id: Uuid,
111    pub agent_id: Uuid,
112    pub status: String,
113    pub declared_intent: String,
114    pub authorized_tools: Vec<String>,
115    pub authorized_credentials: Vec<String>,
116    pub calls_made: u64,
117    pub call_budget: u64,
118    pub calls_remaining: u64,
119    pub rate_limit_per_minute: Option<u64>,
120    pub data_sensitivity_ceiling: String,
121    pub created_at: DateTime<Utc>,
122    pub expires_at: DateTime<Utc>,
123    pub seconds_remaining: i64,
124    pub warnings: Vec<String>,
125}
126
127/// Response for token issuance.
128#[derive(Debug, Serialize)]
129pub struct TokenResponse {
130    pub token: String,
131    pub expires_in: i64,
132}
133
134/// Error response body.
135#[derive(Debug, Serialize)]
136pub struct ErrorResponse {
137    pub error: String,
138}
139
140/// Constant-time byte comparison to prevent timing side-channel attacks
141/// on the admin API key (P0 credential fix).
142///
143/// Uses the `subtle` crate (`ConstantTimeEq`) instead
144/// of a hand-rolled implementation with fragile `black_box`/`#[inline(never)]`
145/// barriers. The `subtle` crate is the Rust cryptographic community standard
146/// and is designed to resist compiler optimizations that break constant-time
147/// guarantees. Pads both inputs to equal length and checks lengths separately
148/// to avoid leaking length information through timing.
149fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
150    use subtle::ConstantTimeEq;
151    // Pad both slices to the same length so ct_eq can compare equal-length
152    // slices, then separately check that the original lengths match.
153    // This avoids leaking length information through timing.
154    let max_len = std::cmp::max(a.len(), b.len());
155    let mut a_padded = vec![0u8; max_len];
156    let mut b_padded = vec![0u8; max_len];
157    a_padded[..a.len()].copy_from_slice(a);
158    b_padded[..b.len()].copy_from_slice(b);
159    let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
160    let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
161    bytes_equal & len_equal
162}
163
164/// Sanitize user-provided strings for safe inclusion in log messages.
165/// Replaces control characters (newlines, carriage returns, tabs) with
166/// their escaped representations to prevent log injection attacks.
167/// (RT-003 F-04: log injection via user input)
168fn sanitize_for_log(input: &str) -> String {
169    input
170        .replace('\n', "\\n")
171        .replace('\r', "\\r")
172        .replace('\t', "\\t")
173}
174
175/// Validate the admin API key from headers.
176///
177/// Uses constant-time comparison to prevent timing side-channel attacks.
178fn validate_admin_key(
179    headers: &HeaderMap,
180    expected: &str,
181) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
182    let key = headers
183        .get("x-api-key")
184        .and_then(|v| v.to_str().ok())
185        .ok_or_else(|| {
186            (
187                StatusCode::UNAUTHORIZED,
188                Json(ErrorResponse {
189                    error: "missing x-api-key header".into(),
190                }),
191            )
192        })?;
193
194    // P0 credential fix: constant-time comparison to prevent timing attacks.
195    // Return 401 (not 403) for invalid keys to avoid differentiating missing vs. wrong key,
196    // which would let an attacker confirm the header name without knowing the value.
197    if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
198        return Err((
199            StatusCode::UNAUTHORIZED,
200            Json(ErrorResponse {
201                error: "invalid API key".into(),
202            }),
203        ));
204    }
205
206    Ok(())
207}
208
209/// Check the admin API rate limiter and return 429 if the limit is exceeded.
210///
211/// Rate limiting prevents an attacker with a compromised API key
212/// from making unlimited requests to enumerate agents, create sessions, or
213/// overwhelm the control plane.
214fn check_admin_rate_limit(
215    state: &AppState,
216    headers: &axum::http::HeaderMap,
217) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
218    // Per-client rate limiting keyed by API key hash.
219    let client_key = headers
220        .get("x-api-key")
221        .and_then(|v| v.to_str().ok())
222        .map(|k| {
223            use std::hash::{Hash, Hasher};
224            let mut hasher = std::collections::hash_map::DefaultHasher::new();
225            k.hash(&mut hasher);
226            format!("key:{:x}", hasher.finish())
227        })
228        .unwrap_or_else(|| "__unauth__".to_string());
229
230    if !state.admin_rate_limiter.check_rate_limit(&client_key) {
231        tracing::warn!(client = %client_key, "ADMIN_AUDIT: rate limit exceeded on admin API");
232        return Err((
233            StatusCode::TOO_MANY_REQUESTS,
234            Json(ErrorResponse {
235                error: "admin API rate limit exceeded, try again later".into(),
236            }),
237        ));
238    }
239    Ok(())
240}
241
242/// POST /agents: register a new agent.
243pub async fn register_agent(
244    State(state): State<AppState>,
245    headers: HeaderMap,
246    Json(req): Json<RegisterAgentRequest>,
247) -> impl IntoResponse {
248    if let Err(e) = check_admin_rate_limit(&state, &headers) {
249        return e.into_response();
250    }
251    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
252        return e.into_response();
253    }
254
255    match state
256        .registry
257        .register_agent(
258            req.owner.clone(),
259            req.model,
260            req.capabilities,
261            req.trust_level,
262            req.expires_at,
263        )
264        .await
265    {
266        Ok(agent) => {
267            let token = match issue_token(agent.id, &req.owner, &state.token_config) {
268                Ok(t) => t,
269                Err(e) => {
270                    tracing::error!(error = %e, "failed to issue token");
271                    return (
272                        StatusCode::INTERNAL_SERVER_ERROR,
273                        Json(ErrorResponse {
274                            error: "token issuance failed".into(),
275                        }),
276                    )
277                        .into_response();
278                }
279            };
280
281            state.metrics.registered_agents.inc();
282            state.admin_audit_log(
283                "register_agent",
284                Some(agent.id),
285                &format!("owner={}", sanitize_for_log(&req.owner)),
286            );
287
288            (
289                StatusCode::CREATED,
290                Json(RegisterAgentResponse {
291                    agent_id: agent.id,
292                    token,
293                }),
294            )
295                .into_response()
296        }
297        Err(e) => {
298            tracing::error!(error = %e, "failed to register agent");
299            (
300                StatusCode::INTERNAL_SERVER_ERROR,
301                Json(ErrorResponse {
302                    error: "internal error".into(),
303                }),
304            )
305                .into_response()
306        }
307    }
308}
309
310/// GET /agents/:id: get agent details.
311pub async fn get_agent(
312    State(state): State<AppState>,
313    headers: HeaderMap,
314    Path(id): Path<Uuid>,
315) -> impl IntoResponse {
316    if let Err(e) = check_admin_rate_limit(&state, &headers) {
317        return e.into_response();
318    }
319    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
320        return e.into_response();
321    }
322
323    state.admin_audit_log("get_agent", Some(id), "");
324
325    match state.registry.get_agent(id).await {
326        Ok(agent) => match serde_json::to_value(&agent) {
327            Ok(val) => (StatusCode::OK, Json(val)).into_response(),
328            Err(e) => (
329                StatusCode::INTERNAL_SERVER_ERROR,
330                Json(serde_json::json!({"error": e.to_string()})),
331            )
332                .into_response(),
333        },
334        Err(e) => {
335            // Log the actual error internally but return an opaque message.
336            // Collapsing AgentNotFound/AgentDeactivated prevents attackers
337            // from probing whether a UUID is registered vs deactivated.
338            tracing::debug!(error = %e, agent_id = %id, "get_agent failed");
339            (
340                StatusCode::NOT_FOUND,
341                Json(ErrorResponse {
342                    error: "agent not found".into(),
343                }),
344            )
345                .into_response()
346        }
347    }
348}
349
350/// POST /agents/:id/delegate: create delegation to sub-agent.
351pub async fn delegate_agent(
352    State(state): State<AppState>,
353    headers: HeaderMap,
354    Path(from_id): Path<Uuid>,
355    Json(req): Json<DelegateRequest>,
356) -> impl IntoResponse {
357    if let Err(e) = check_admin_rate_limit(&state, &headers) {
358        return e.into_response();
359    }
360    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
361        return e.into_response();
362    }
363
364    state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
365
366    match state
367        .registry
368        .create_delegation(from_id, req.to, req.scopes, req.expires_at)
369        .await
370    {
371        Ok(link) => match serde_json::to_value(&link) {
372            Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
373            Err(e) => (
374                StatusCode::INTERNAL_SERVER_ERROR,
375                Json(serde_json::json!({"error": e.to_string()})),
376            )
377                .into_response(),
378        },
379        Err(e) => {
380            let status = match &e {
381                arbiter_identity::IdentityError::DelegationSourceNotFound(_)
382                | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
383                    StatusCode::NOT_FOUND
384                }
385                arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
386                | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
387                | arbiter_identity::IdentityError::CircularDelegation { .. } => {
388                    StatusCode::BAD_REQUEST
389                }
390                _ => StatusCode::INTERNAL_SERVER_ERROR,
391            };
392            (
393                status,
394                Json(ErrorResponse {
395                    error: "internal error".into(),
396                }),
397            )
398                .into_response()
399        }
400    }
401}
402
403/// GET /agents/:id/delegations: list incoming and outgoing delegations.
404pub async fn list_delegations(
405    State(state): State<AppState>,
406    headers: HeaderMap,
407    Path(id): Path<Uuid>,
408) -> impl IntoResponse {
409    if let Err(e) = check_admin_rate_limit(&state, &headers) {
410        return e.into_response();
411    }
412    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
413        return e.into_response();
414    }
415
416    state.admin_audit_log("list_delegations", Some(id), "");
417
418    // Verify agent exists.
419    if let Err(_e) = state.registry.get_agent(id).await {
420        return (
421            StatusCode::NOT_FOUND,
422            Json(ErrorResponse {
423                error: "internal error".into(),
424            }),
425        )
426            .into_response();
427    }
428
429    let outgoing = state.registry.list_delegations_from(id).await;
430    let incoming = state.registry.list_delegations_to(id).await;
431
432    (
433        StatusCode::OK,
434        Json(serde_json::json!({
435            "agent_id": id,
436            "outgoing": outgoing,
437            "incoming": incoming,
438            "outgoing_count": outgoing.len(),
439            "incoming_count": incoming.len(),
440        })),
441    )
442        .into_response()
443}
444
445/// DELETE /agents/:id: deactivate agent + cascade.
446pub async fn deactivate_agent(
447    State(state): State<AppState>,
448    headers: HeaderMap,
449    Path(id): Path<Uuid>,
450) -> impl IntoResponse {
451    if let Err(e) = check_admin_rate_limit(&state, &headers) {
452        return e.into_response();
453    }
454    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
455        return e.into_response();
456    }
457
458    state.admin_audit_log("deactivate_agent", Some(id), "cascade");
459
460    match state.registry.cascade_deactivate(id).await {
461        Ok(deactivated) => {
462            // Close sessions for all deactivated agents.
463            let mut total_sessions_closed = 0usize;
464            for &agent_id in &deactivated {
465                let closed = state.session_store.close_sessions_for_agent(agent_id).await;
466                total_sessions_closed += closed;
467                state.metrics.registered_agents.dec();
468            }
469            (
470                StatusCode::OK,
471                Json(serde_json::json!({
472                    "deactivated": deactivated,
473                    "count": deactivated.len(),
474                    "sessions_closed": total_sessions_closed,
475                })),
476            )
477                .into_response()
478        }
479        Err(e) => {
480            let status = match &e {
481                arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
482                arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
483                _ => StatusCode::INTERNAL_SERVER_ERROR,
484            };
485            (
486                status,
487                Json(ErrorResponse {
488                    error: "internal error".into(),
489                }),
490            )
491                .into_response()
492        }
493    }
494}
495
496/// POST /agents/:id/token: issue new short-lived credential.
497pub async fn issue_agent_token(
498    State(state): State<AppState>,
499    headers: HeaderMap,
500    Path(id): Path<Uuid>,
501    body: Option<Json<TokenRequest>>,
502) -> impl IntoResponse {
503    if let Err(e) = check_admin_rate_limit(&state, &headers) {
504        return e.into_response();
505    }
506    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
507        return e.into_response();
508    }
509
510    state.admin_audit_log("issue_agent_token", Some(id), "");
511
512    let agent = match state.registry.get_agent(id).await {
513        Ok(a) => a,
514        Err(_e) => {
515            return (
516                StatusCode::NOT_FOUND,
517                Json(ErrorResponse {
518                    error: "internal error".into(),
519                }),
520            )
521                .into_response();
522        }
523    };
524
525    if !agent.active {
526        return (
527            StatusCode::BAD_REQUEST,
528            Json(ErrorResponse {
529                error: format!("agent {} is deactivated", id),
530            }),
531        )
532            .into_response();
533    }
534
535    let mut config = state.token_config.clone();
536    if let Some(Json(req)) = body
537        && let Some(expiry) = req.expiry_seconds
538    {
539        // P4: Reject non-positive expiry to prevent creating immediately-invalid tokens.
540        // (RT-003 F-09: token expiry accepts negative values)
541        if expiry <= 0 {
542            return (
543                StatusCode::BAD_REQUEST,
544                Json(ErrorResponse {
545                    error: "expiry_seconds must be positive".into(),
546                }),
547            )
548                .into_response();
549        }
550        config.expiry_seconds = expiry;
551    }
552
553    match issue_token(agent.id, &agent.owner, &config) {
554        Ok(token) => (
555            StatusCode::OK,
556            Json(TokenResponse {
557                token,
558                expires_in: config.expiry_seconds,
559            }),
560        )
561            .into_response(),
562        Err(e) => {
563            tracing::error!(error = %e, "failed to issue token");
564            (
565                StatusCode::INTERNAL_SERVER_ERROR,
566                Json(ErrorResponse {
567                    error: "token issuance failed".into(),
568                }),
569            )
570                .into_response()
571        }
572    }
573}
574
575/// GET /agents: list all agents.
576pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
577    if let Err(e) = check_admin_rate_limit(&state, &headers) {
578        return e.into_response();
579    }
580    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
581        return e.into_response();
582    }
583
584    state.admin_audit_log("list_agents", None, "");
585
586    let agents = state.registry.list_agents().await;
587    match serde_json::to_value(&agents) {
588        Ok(val) => (StatusCode::OK, Json(val)).into_response(),
589        Err(e) => (
590            StatusCode::INTERNAL_SERVER_ERROR,
591            Json(serde_json::json!({"error": e.to_string()})),
592        )
593            .into_response(),
594    }
595}
596
597/// POST /sessions: create a new task session.
598pub async fn create_session(
599    State(state): State<AppState>,
600    headers: HeaderMap,
601    Json(req): Json<CreateSessionApiRequest>,
602) -> impl IntoResponse {
603    if let Err(e) = check_admin_rate_limit(&state, &headers) {
604        return e.into_response();
605    }
606    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
607        return e.into_response();
608    }
609
610    state.admin_audit_log(
611        "create_session",
612        Some(req.agent_id),
613        &format!("intent={}", sanitize_for_log(&req.declared_intent)),
614    );
615
616    // P0: Validate agent exists, is active, and not expired before creating session.
617    // Without this check, sessions can be created for non-existent or deactivated agents,
618    // bypassing the entire identity model (RT-003 F-01: ghost agent sessions).
619    let agent = match state.registry.get_agent(req.agent_id).await {
620        Ok(a) => a,
621        Err(e) => {
622            tracing::warn!(
623                agent_id = %req.agent_id,
624                error = %e,
625                "session creation denied: agent not found"
626            );
627            return (
628                StatusCode::NOT_FOUND,
629                Json(ErrorResponse {
630                    error: format!("agent {} not found", req.agent_id),
631                }),
632            )
633                .into_response();
634        }
635    };
636
637    if !agent.active {
638        tracing::warn!(
639            agent_id = %req.agent_id,
640            "session creation denied: agent is deactivated"
641        );
642        return (
643            StatusCode::BAD_REQUEST,
644            Json(ErrorResponse {
645                error: format!("agent {} is deactivated", req.agent_id),
646            }),
647        )
648            .into_response();
649    }
650
651    if let Some(expires_at) = agent.expires_at
652        && expires_at < Utc::now()
653    {
654        tracing::warn!(
655            agent_id = %req.agent_id,
656            expires_at = %expires_at,
657            "session creation denied: agent has expired"
658        );
659        return (
660            StatusCode::BAD_REQUEST,
661            Json(ErrorResponse {
662                error: format!("agent {} has expired", req.agent_id),
663            }),
664        )
665            .into_response();
666    }
667
668    // P0: Per-agent session cap to prevent session multiplication attacks.
669    // An agent creating N sessions * M budget each = N*M tool calls,
670    // bypassing per-session rate limits.
671    if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
672        let active_count = state
673            .session_store
674            .count_active_for_agent(req.agent_id)
675            .await;
676        if active_count >= max_sessions {
677            tracing::warn!(
678                agent_id = %req.agent_id,
679                active = active_count,
680                max = max_sessions,
681                "session creation denied: per-agent concurrent session cap reached"
682            );
683            return (
684                StatusCode::TOO_MANY_REQUESTS,
685                Json(ErrorResponse {
686                    error: format!(
687                        "agent {} has too many concurrent sessions ({}/{})",
688                        req.agent_id, active_count, max_sessions
689                    ),
690                }),
691            )
692                .into_response();
693        }
694    }
695
696    // Validate session creation parameters.
697    if req.time_limit_secs <= 0 {
698        return (
699            StatusCode::BAD_REQUEST,
700            Json(ErrorResponse {
701                error: "time_limit_secs must be positive".into(),
702            }),
703        )
704            .into_response();
705    }
706    // Validate session creation parameters.
707    if req.time_limit_secs > 86400 {
708        return (
709            StatusCode::BAD_REQUEST,
710            Json(ErrorResponse {
711                error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
712            }),
713        )
714            .into_response();
715    }
716    if req.call_budget == 0 {
717        return (
718            StatusCode::BAD_REQUEST,
719            Json(ErrorResponse {
720                error: "call_budget must be positive".into(),
721            }),
722        )
723            .into_response();
724    }
725    if req.call_budget > 1_000_000 {
726        return (
727            StatusCode::BAD_REQUEST,
728            Json(ErrorResponse {
729                error: "call_budget cannot exceed 1000000".into(),
730            }),
731        )
732            .into_response();
733    }
734    if req.declared_intent.trim().is_empty() {
735        return (
736            StatusCode::BAD_REQUEST,
737            Json(ErrorResponse {
738                error: "declared_intent must not be empty".into(),
739            }),
740        )
741            .into_response();
742    }
743
744    // Limit declared_intent length to prevent keyword injection amplification
745    // and memory exhaustion in regex matching. The behavior engine's
746    // classify_intent runs regex matching on this string for every request.
747    const MAX_INTENT_LEN: usize = 512;
748    if req.declared_intent.len() > MAX_INTENT_LEN {
749        return (
750            StatusCode::BAD_REQUEST,
751            Json(ErrorResponse {
752                error: format!("declared_intent exceeds maximum length of {MAX_INTENT_LEN} bytes"),
753            }),
754        )
755            .into_response();
756    }
757
758    // P1: Validate rate_limit_window_secs bounds to prevent rate limit bypass.
759    // A window of 0 disables rate limiting entirely; a window of 1 second converts
760    // "per minute" limits into "per second" (100x amplification).
761    // (RT-003 F-03: rate limit window manipulation)
762    if let Some(window) = req.rate_limit_window_secs {
763        if window == 0 {
764            return (
765                StatusCode::BAD_REQUEST,
766                Json(ErrorResponse {
767                    error: "rate_limit_window_secs must be positive".into(),
768                }),
769            )
770                .into_response();
771        }
772        if window < 10 {
773            return (
774                StatusCode::BAD_REQUEST,
775                Json(ErrorResponse {
776                    error: "rate_limit_window_secs must be at least 10 seconds".into(),
777                }),
778            )
779                .into_response();
780        }
781        if window > 3600 {
782            return (
783                StatusCode::BAD_REQUEST,
784                Json(ErrorResponse {
785                    error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
786                }),
787            )
788                .into_response();
789        }
790    }
791
792    if req.authorized_tools.is_empty() {
793        tracing::warn!(
794            agent_id = %req.agent_id,
795            "session created with empty authorized_tools; all tools will be permitted. \
796             Specify an explicit tool allowlist for least-privilege enforcement."
797        );
798    }
799
800    let create_req = CreateSessionRequest {
801        agent_id: req.agent_id,
802        delegation_chain_snapshot: req.delegation_chain_snapshot,
803        declared_intent: req.declared_intent,
804        authorized_tools: req.authorized_tools,
805        authorized_credentials: req.authorized_credentials.unwrap_or_default(),
806        time_limit: chrono::Duration::seconds(req.time_limit_secs),
807        call_budget: req.call_budget,
808        rate_limit_per_minute: req.rate_limit_per_minute,
809        rate_limit_window_secs: req
810            .rate_limit_window_secs
811            .unwrap_or(state.default_rate_limit_window_secs),
812        data_sensitivity_ceiling: req.data_sensitivity_ceiling,
813    };
814
815    let session = state.session_store.create(create_req).await;
816
817    state.metrics.active_sessions.inc();
818
819    (
820        StatusCode::CREATED,
821        Json(CreateSessionResponse {
822            session_id: session.session_id,
823            declared_intent: session.declared_intent,
824            authorized_tools: session.authorized_tools.clone(),
825            authorized_credentials: session.authorized_credentials,
826            call_budget: session.call_budget,
827            time_limit_secs: req.time_limit_secs,
828        }),
829    )
830        .into_response()
831}
832
833/// GET /sessions/:id: get live session status.
834pub async fn get_session(
835    State(state): State<AppState>,
836    headers: HeaderMap,
837    Path(id): Path<Uuid>,
838) -> impl IntoResponse {
839    if let Err(e) = check_admin_rate_limit(&state, &headers) {
840        return e.into_response();
841    }
842    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
843        return e.into_response();
844    }
845
846    state.admin_audit_log("get_session", None, &format!("session_id={}", id));
847
848    let session = match state.session_store.get(id).await {
849        Ok(s) => s,
850        Err(_e) => {
851            return (
852                StatusCode::NOT_FOUND,
853                Json(ErrorResponse {
854                    error: "internal error".into(),
855                }),
856            )
857                .into_response();
858        }
859    };
860
861    let now = Utc::now();
862    let expires_at = session.created_at + session.time_limit;
863    let seconds_remaining = (expires_at - now).num_seconds().max(0);
864    let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
865
866    let mut warnings = Vec::new();
867    let budget_pct_remaining = if session.call_budget > 0 {
868        (calls_remaining as f64 / session.call_budget as f64) * 100.0
869    } else {
870        0.0
871    };
872    let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
873        (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
874    } else {
875        0.0
876    };
877    if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
878        warnings.push(format!(
879            "budget low: {} of {} calls remaining",
880            calls_remaining, session.call_budget
881        ));
882    }
883    if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
884        warnings.push(format!("time low: {}s remaining", seconds_remaining));
885    }
886
887    let status_str = format!("{:?}", session.status).to_lowercase();
888    let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
889        .unwrap_or_default()
890        .as_str()
891        .unwrap_or("unknown")
892        .to_string();
893
894    (
895        StatusCode::OK,
896        Json(SessionStatusResponse {
897            session_id: session.session_id,
898            agent_id: session.agent_id,
899            status: status_str,
900            declared_intent: session.declared_intent,
901            authorized_tools: session.authorized_tools.clone(),
902            authorized_credentials: session.authorized_credentials,
903            calls_made: session.calls_made,
904            call_budget: session.call_budget,
905            calls_remaining,
906            rate_limit_per_minute: session.rate_limit_per_minute,
907            data_sensitivity_ceiling: sensitivity_str,
908            created_at: session.created_at,
909            expires_at,
910            seconds_remaining,
911            warnings,
912        }),
913    )
914        .into_response()
915}
916
917/// Response body for POST /sessions/:id/close.
918#[derive(Debug, Serialize)]
919pub struct SessionCloseResponse {
920    pub session_id: Uuid,
921    pub status: String,
922    pub declared_intent: String,
923    pub total_calls: u64,
924    pub call_budget: u64,
925    pub budget_utilization_pct: f64,
926    pub time_used_secs: i64,
927    pub time_limit_secs: i64,
928    /// Number of denied requests during this session (from audit log).
929    pub denied_attempts: u64,
930    /// Number of requests that triggered anomaly flags (from audit log).
931    pub anomalies_detected: u64,
932}
933
934/// POST /sessions/:id/close: close a session and return summary.
935pub async fn close_session(
936    State(state): State<AppState>,
937    headers: HeaderMap,
938    Path(id): Path<Uuid>,
939) -> impl IntoResponse {
940    if let Err(e) = check_admin_rate_limit(&state, &headers) {
941        return e.into_response();
942    }
943    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
944        return e.into_response();
945    }
946
947    state.admin_audit_log("close_session", None, &format!("session_id={}", id));
948
949    let session = match state.session_store.close(id).await {
950        Ok(s) => {
951            state.metrics.active_sessions.dec();
952            s
953        }
954        Err(e) => {
955            let status = match &e {
956                arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
957                arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
958                _ => StatusCode::INTERNAL_SERVER_ERROR,
959            };
960            return (
961                status,
962                Json(ErrorResponse {
963                    error: "internal error".into(),
964                }),
965            )
966                .into_response();
967        }
968    };
969
970    let now = Utc::now();
971    let time_used_secs = (now - session.created_at).num_seconds();
972    let budget_utilization_pct = if session.call_budget > 0 {
973        (session.calls_made as f64 / session.call_budget as f64) * 100.0
974    } else {
975        0.0
976    };
977
978    // Query audit stats for this session.
979    let session_id_str = id.to_string();
980    let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
981        let stats = sink.stats().stats_for_session(&session_id_str).await;
982        // Clean up stats now that session is closed.
983        sink.stats().remove_session(&session_id_str).await;
984        (stats.denied_count, stats.anomaly_count)
985    } else {
986        (0, 0)
987    };
988
989    (
990        StatusCode::OK,
991        Json(SessionCloseResponse {
992            session_id: session.session_id,
993            status: "closed".into(),
994            declared_intent: session.declared_intent,
995            total_calls: session.calls_made,
996            call_budget: session.call_budget,
997            budget_utilization_pct,
998            time_used_secs,
999            time_limit_secs: session.time_limit.num_seconds(),
1000            denied_attempts,
1001            anomalies_detected,
1002        }),
1003    )
1004        .into_response()
1005}
1006
1007/// Request body for POST /policy/explain.
1008#[derive(Debug, Deserialize)]
1009pub struct PolicyExplainRequest {
1010    pub agent_id: Uuid,
1011    pub declared_intent: String,
1012    pub tool: String,
1013    #[serde(default)]
1014    pub arguments: Option<serde_json::Value>,
1015    #[serde(default)]
1016    pub principal: Option<String>,
1017}
1018
1019/// Response body for POST /policy/explain.
1020#[derive(Debug, Serialize)]
1021pub struct PolicyExplainResponse {
1022    pub decision: String,
1023    pub matched_policy: Option<String>,
1024    pub reason: Option<String>,
1025    pub trace: Vec<PolicyTrace>,
1026}
1027
1028/// POST /policy/explain: dry-run policy evaluation without executing.
1029pub async fn explain_policy(
1030    State(state): State<AppState>,
1031    headers: HeaderMap,
1032    Json(req): Json<PolicyExplainRequest>,
1033) -> impl IntoResponse {
1034    if let Err(e) = check_admin_rate_limit(&state, &headers) {
1035        return e.into_response();
1036    }
1037    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1038        return e.into_response();
1039    }
1040
1041    state.admin_audit_log(
1042        "explain_policy",
1043        Some(req.agent_id),
1044        &format!("tool={}", sanitize_for_log(&req.tool)),
1045    );
1046
1047    let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
1048    let policy_config = match policy_snapshot.as_ref() {
1049        Some(pc) => pc,
1050        None => {
1051            return (
1052                StatusCode::OK,
1053                Json(PolicyExplainResponse {
1054                    decision: "allow".into(),
1055                    matched_policy: None,
1056                    reason: Some("no policies configured".into()),
1057                    trace: vec![],
1058                }),
1059            )
1060                .into_response();
1061        }
1062    };
1063
1064    let agent = match state.registry.get_agent(req.agent_id).await {
1065        Ok(a) => a,
1066        Err(e) => {
1067            return (
1068                StatusCode::NOT_FOUND,
1069                Json(ErrorResponse {
1070                    error: format!("agent not found: {e}"),
1071                }),
1072            )
1073                .into_response();
1074        }
1075    };
1076
1077    let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1078
1079    let eval_ctx = EvalContext {
1080        agent,
1081        delegation_chain: vec![],
1082        declared_intent: req.declared_intent,
1083        principal_sub: principal,
1084        principal_groups: vec![],
1085    };
1086
1087    let mcp_req = McpRequest {
1088        id: None,
1089        method: "tools/call".into(),
1090        tool_name: Some(req.tool.clone()),
1091        arguments: req.arguments,
1092        resource_uri: None,
1093    };
1094
1095    let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1096
1097    let (decision, matched_policy, reason) = match result.decision {
1098        arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1099        arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1100        arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1101        arbiter_policy::Decision::Annotate { policy_id, reason } => {
1102            ("annotate".into(), Some(policy_id), Some(reason))
1103        }
1104    };
1105
1106    (
1107        StatusCode::OK,
1108        Json(PolicyExplainResponse {
1109            decision,
1110            matched_policy,
1111            reason,
1112            trace: result.trace,
1113        }),
1114    )
1115        .into_response()
1116}
1117
1118/// Request body for POST /policy/validate.
1119#[derive(Debug, Deserialize)]
1120pub struct PolicyValidateRequest {
1121    pub policy_toml: String,
1122}
1123
1124/// POST /policy/validate: validate a policy TOML string without loading it.
1125pub async fn validate_policy(
1126    State(state): State<AppState>,
1127    headers: HeaderMap,
1128    Json(req): Json<PolicyValidateRequest>,
1129) -> impl IntoResponse {
1130    if let Err(e) = check_admin_rate_limit(&state, &headers) {
1131        return e.into_response();
1132    }
1133    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1134        return e.into_response();
1135    }
1136
1137    state.admin_audit_log("validate_policy", None, "");
1138
1139    let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1140    (StatusCode::OK, Json(result)).into_response()
1141}
1142
1143/// POST /policy/reload: re-read the policy file and atomically swap the config.
1144pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1145    if let Err(e) = check_admin_rate_limit(&state, &headers) {
1146        return e.into_response();
1147    }
1148    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1149        return e.into_response();
1150    }
1151
1152    state.admin_audit_log("reload_policy", None, "");
1153
1154    let path = match &state.policy_file_path {
1155        Some(p) => p.clone(),
1156        None => {
1157            return (
1158                StatusCode::BAD_REQUEST,
1159                Json(ErrorResponse {
1160                    error: "no policy file configured; policies are inline or absent".into(),
1161                }),
1162            )
1163                .into_response();
1164        }
1165    };
1166
1167    let contents = match std::fs::read_to_string(&path) {
1168        Ok(c) => c,
1169        Err(e) => {
1170            // Do not leak filesystem paths in error responses.
1171            tracing::error!(path = %path, error = %e, "failed to read policy file");
1172            return (
1173                StatusCode::INTERNAL_SERVER_ERROR,
1174                Json(ErrorResponse {
1175                    error: "failed to read policy file".into(),
1176                }),
1177            )
1178                .into_response();
1179        }
1180    };
1181
1182    let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1183        Ok(pc) => pc,
1184        Err(e) => {
1185            // Log parse errors internally, don't expose details.
1186            tracing::error!(error = %e, "failed to parse policy file");
1187            return (
1188                StatusCode::BAD_REQUEST,
1189                Json(ErrorResponse {
1190                    error: "failed to parse policy file".into(),
1191                }),
1192            )
1193                .into_response();
1194        }
1195    };
1196
1197    let policy_count = new_config.policies.len();
1198    let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1199
1200    // Audit log policy reload events for change tracking.
1201    tracing::warn!(
1202        path,
1203        policy_count,
1204        "AUDIT: policy configuration reloaded via admin API"
1205    );
1206    (
1207        StatusCode::OK,
1208        Json(serde_json::json!({
1209            "status": "ok",
1210            "reloaded": true,
1211            "policies_loaded": policy_count,
1212            "policy_count": policy_count,
1213        })),
1214    )
1215        .into_response()
1216}
1217
1218/// GET /policy/schema: returns the policy TOML schema as JSON Schema.
1219pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1220    if let Err(e) = check_admin_rate_limit(&state, &headers) {
1221        return e.into_response();
1222    }
1223    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1224        return e.into_response();
1225    }
1226
1227    state.admin_audit_log("policy_schema", None, "");
1228
1229    let schema = serde_json::json!({
1230        "$schema": "https://json-schema.org/draft/2020-12/schema",
1231        "title": "Arbiter Policy Configuration",
1232        "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1233        "type": "object",
1234        "properties": {
1235            "policies": {
1236                "type": "array",
1237                "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1238                "items": {
1239                    "$ref": "#/$defs/Policy"
1240                }
1241            }
1242        },
1243        "$defs": {
1244            "Policy": {
1245                "type": "object",
1246                "description": "A single authorization policy rule.",
1247                "required": ["id", "effect"],
1248                "properties": {
1249                    "id": {
1250                        "type": "string",
1251                        "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1252                    },
1253                    "effect": {
1254                        "$ref": "#/$defs/Effect"
1255                    },
1256                    "agent_match": {
1257                        "$ref": "#/$defs/AgentMatch"
1258                    },
1259                    "principal_match": {
1260                        "$ref": "#/$defs/PrincipalMatch"
1261                    },
1262                    "intent_match": {
1263                        "$ref": "#/$defs/IntentMatch"
1264                    },
1265                    "allowed_tools": {
1266                        "type": "array",
1267                        "items": { "type": "string" },
1268                        "description": "Tools this policy applies to. Empty array means 'all tools'."
1269                    },
1270                    "parameter_constraints": {
1271                        "type": "array",
1272                        "items": { "$ref": "#/$defs/ParameterConstraint" },
1273                        "description": "Per-parameter bounds on tool arguments."
1274                    },
1275                    "priority": {
1276                        "type": "integer",
1277                        "default": 0,
1278                        "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1279                    }
1280                }
1281            },
1282            "Effect": {
1283                "type": "string",
1284                "enum": ["allow", "deny", "escalate"],
1285                "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1286            },
1287            "AgentMatch": {
1288                "type": "object",
1289                "description": "Criteria for matching the requesting agent. All specified fields must match.",
1290                "properties": {
1291                    "agent_id": {
1292                        "type": "string",
1293                        "format": "uuid",
1294                        "description": "Match a specific agent by UUID. Specificity: +100."
1295                    },
1296                    "trust_level": {
1297                        "$ref": "#/$defs/TrustLevel"
1298                    },
1299                    "capabilities": {
1300                        "type": "array",
1301                        "items": { "type": "string" },
1302                        "description": "Agent must have all listed capabilities. Specificity: +25 each."
1303                    }
1304                }
1305            },
1306            "TrustLevel": {
1307                "type": "string",
1308                "enum": ["untrusted", "basic", "verified", "trusted"],
1309                "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1310            },
1311            "PrincipalMatch": {
1312                "type": "object",
1313                "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1314                "properties": {
1315                    "sub": {
1316                        "type": "string",
1317                        "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1318                    },
1319                    "groups": {
1320                        "type": "array",
1321                        "items": { "type": "string" },
1322                        "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1323                    }
1324                }
1325            },
1326            "IntentMatch": {
1327                "type": "object",
1328                "description": "Criteria for matching the session's declared intent.",
1329                "properties": {
1330                    "keywords": {
1331                        "type": "array",
1332                        "items": { "type": "string" },
1333                        "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1334                    },
1335                    "regex": {
1336                        "type": "string",
1337                        "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1338                    }
1339                }
1340            },
1341            "ParameterConstraint": {
1342                "type": "object",
1343                "description": "A constraint on a tool call parameter.",
1344                "required": ["key"],
1345                "properties": {
1346                    "key": {
1347                        "type": "string",
1348                        "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1349                    },
1350                    "max_value": {
1351                        "type": "number",
1352                        "description": "Maximum numeric value allowed."
1353                    },
1354                    "min_value": {
1355                        "type": "number",
1356                        "description": "Minimum numeric value allowed."
1357                    },
1358                    "allowed_values": {
1359                        "type": "array",
1360                        "items": { "type": "string" },
1361                        "description": "Whitelist of allowed string values."
1362                    }
1363                }
1364            },
1365            "DataSensitivity": {
1366                "type": "string",
1367                "enum": ["public", "internal", "confidential", "restricted"],
1368                "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1369            }
1370        }
1371    });
1372
1373    (StatusCode::OK, Json(schema)).into_response()
1374}
1375
1376/// Build the axum router for the lifecycle API.
1377pub fn router(state: AppState) -> axum::Router {
1378    axum::Router::new()
1379        .route("/agents", axum::routing::post(register_agent))
1380        .route("/agents", axum::routing::get(list_agents))
1381        .route("/agents/{id}", axum::routing::get(get_agent))
1382        .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1383        .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1384        .route(
1385            "/agents/{id}/delegations",
1386            axum::routing::get(list_delegations),
1387        )
1388        .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1389        .route("/sessions", axum::routing::post(create_session))
1390        .route("/sessions/{id}", axum::routing::get(get_session))
1391        .route("/sessions/{id}/close", axum::routing::post(close_session))
1392        .route("/policy/explain", axum::routing::post(explain_policy))
1393        .route("/policy/validate", axum::routing::post(validate_policy))
1394        .route("/policy/reload", axum::routing::post(reload_policy))
1395        .route("/admin/policies/reload", axum::routing::post(reload_policy))
1396        .route("/policy/schema", axum::routing::get(policy_schema))
1397        .with_state(state)
1398}
1399
1400#[cfg(test)]
1401mod tests {
1402    use super::*;
1403
1404    /// RT-003 F-04: sanitize_for_log replaces control characters.
1405    #[test]
1406    fn sanitize_for_log_strips_newlines() {
1407        assert_eq!(
1408            sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1409            "read config\\nINFO ADMIN_AUDIT: fake"
1410        );
1411    }
1412
1413    #[test]
1414    fn sanitize_for_log_strips_carriage_return() {
1415        assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1416    }
1417
1418    #[test]
1419    fn sanitize_for_log_strips_tabs() {
1420        assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1421    }
1422
1423    #[test]
1424    fn sanitize_for_log_preserves_normal_text() {
1425        assert_eq!(
1426            sanitize_for_log("read configuration files"),
1427            "read configuration files"
1428        );
1429    }
1430}