Skip to main content

arbiter_lifecycle/
api.rs

1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18/// Request body for POST /agents.
19#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21    pub owner: String,
22    pub model: String,
23    #[serde(default)]
24    pub capabilities: Vec<String>,
25    #[serde(default = "default_trust_level")]
26    pub trust_level: TrustLevel,
27    #[serde(default)]
28    pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32    TrustLevel::Untrusted
33}
34
35/// Response body for POST /agents.
36#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38    pub agent_id: Uuid,
39    pub token: String,
40}
41
42/// Request body for POST /agents/:id/delegate.
43#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45    pub to: Uuid,
46    pub scopes: Vec<String>,
47    #[serde(default)]
48    pub expires_at: Option<DateTime<Utc>>,
49}
50
51/// Request body for POST /agents/:id/token.
52#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54    #[serde(default)]
55    pub expiry_seconds: Option<i64>,
56}
57
58/// Request body for POST /sessions.
59#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61    pub agent_id: Uuid,
62    #[serde(default)]
63    pub delegation_chain_snapshot: Vec<String>,
64    pub declared_intent: String,
65    #[serde(default)]
66    pub authorized_tools: Vec<String>,
67    #[serde(default = "default_time_limit")]
68    pub time_limit_secs: i64,
69    #[serde(default = "default_call_budget")]
70    pub call_budget: u64,
71    #[serde(default)]
72    pub rate_limit_per_minute: Option<u64>,
73    /// Override the global rate-limit window duration for this session (seconds).
74    /// If omitted, uses the server default from `[sessions].rate_limit_window_secs`.
75    #[serde(default)]
76    pub rate_limit_window_secs: Option<u64>,
77    #[serde(default = "default_data_sensitivity")]
78    pub data_sensitivity_ceiling: DataSensitivity,
79}
80
81fn default_time_limit() -> i64 {
82    3600
83}
84
85fn default_call_budget() -> u64 {
86    1000
87}
88
89fn default_data_sensitivity() -> DataSensitivity {
90    DataSensitivity::Internal
91}
92
93/// Response body for POST /sessions.
94#[derive(Debug, Serialize)]
95pub struct CreateSessionResponse {
96    pub session_id: Uuid,
97    pub declared_intent: String,
98    pub authorized_tools: Vec<String>,
99    pub call_budget: u64,
100    pub time_limit_secs: i64,
101}
102
103/// Response body for GET /sessions/:id.
104#[derive(Debug, Serialize)]
105pub struct SessionStatusResponse {
106    pub session_id: Uuid,
107    pub agent_id: Uuid,
108    pub status: String,
109    pub declared_intent: String,
110    pub authorized_tools: Vec<String>,
111    pub calls_made: u64,
112    pub call_budget: u64,
113    pub calls_remaining: u64,
114    pub rate_limit_per_minute: Option<u64>,
115    pub data_sensitivity_ceiling: String,
116    pub created_at: DateTime<Utc>,
117    pub expires_at: DateTime<Utc>,
118    pub seconds_remaining: i64,
119    pub warnings: Vec<String>,
120}
121
122/// Response for token issuance.
123#[derive(Debug, Serialize)]
124pub struct TokenResponse {
125    pub token: String,
126    pub expires_in: i64,
127}
128
129/// Error response body.
130#[derive(Debug, Serialize)]
131pub struct ErrorResponse {
132    pub error: String,
133}
134
135/// Constant-time byte comparison to prevent timing side-channel attacks
136/// on the admin API key (P0 credential fix).
137///
138/// Uses the `subtle` crate (`ConstantTimeEq`) instead
139/// of a hand-rolled implementation with fragile `black_box`/`#[inline(never)]`
140/// barriers. The `subtle` crate is the Rust cryptographic community standard
141/// and is designed to resist compiler optimizations that break constant-time
142/// guarantees. Pads both inputs to equal length and checks lengths separately
143/// to avoid leaking length information through timing.
144fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
145    use subtle::ConstantTimeEq;
146    // Pad both slices to the same length so ct_eq can compare equal-length
147    // slices, then separately check that the original lengths match.
148    // This avoids leaking length information through timing.
149    let max_len = std::cmp::max(a.len(), b.len());
150    let mut a_padded = vec![0u8; max_len];
151    let mut b_padded = vec![0u8; max_len];
152    a_padded[..a.len()].copy_from_slice(a);
153    b_padded[..b.len()].copy_from_slice(b);
154    let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
155    let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
156    bytes_equal & len_equal
157}
158
159/// Sanitize user-provided strings for safe inclusion in log messages.
160/// Replaces control characters (newlines, carriage returns, tabs) with
161/// their escaped representations to prevent log injection attacks.
162/// (RT-003 F-04: log injection via user input)
163fn sanitize_for_log(input: &str) -> String {
164    input
165        .replace('\n', "\\n")
166        .replace('\r', "\\r")
167        .replace('\t', "\\t")
168}
169
170/// Validate the admin API key from headers.
171///
172/// Uses constant-time comparison to prevent timing side-channel attacks.
173fn validate_admin_key(
174    headers: &HeaderMap,
175    expected: &str,
176) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
177    let key = headers
178        .get("x-api-key")
179        .and_then(|v| v.to_str().ok())
180        .ok_or_else(|| {
181            (
182                StatusCode::UNAUTHORIZED,
183                Json(ErrorResponse {
184                    error: "missing x-api-key header".into(),
185                }),
186            )
187        })?;
188
189    // P0 credential fix: constant-time comparison to prevent timing attacks.
190    if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
191        return Err((
192            StatusCode::FORBIDDEN,
193            Json(ErrorResponse {
194                error: "invalid API key".into(),
195            }),
196        ));
197    }
198
199    Ok(())
200}
201
202/// Check the admin API rate limiter and return 429 if the limit is exceeded.
203///
204/// Rate limiting prevents an attacker with a compromised API key
205/// from making unlimited requests to enumerate agents, create sessions, or
206/// overwhelm the control plane.
207fn check_admin_rate_limit(state: &AppState) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
208    if !state.admin_rate_limiter.check_rate_limit() {
209        tracing::warn!("ADMIN_AUDIT: rate limit exceeded on admin API");
210        return Err((
211            StatusCode::TOO_MANY_REQUESTS,
212            Json(ErrorResponse {
213                error: "admin API rate limit exceeded, try again later".into(),
214            }),
215        ));
216    }
217    Ok(())
218}
219
220/// POST /agents: register a new agent.
221pub async fn register_agent(
222    State(state): State<AppState>,
223    headers: HeaderMap,
224    Json(req): Json<RegisterAgentRequest>,
225) -> impl IntoResponse {
226    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
227        return e.into_response();
228    }
229    if let Err(e) = check_admin_rate_limit(&state) {
230        return e.into_response();
231    }
232
233    match state
234        .registry
235        .register_agent(
236            req.owner.clone(),
237            req.model,
238            req.capabilities,
239            req.trust_level,
240            req.expires_at,
241        )
242        .await
243    {
244        Ok(agent) => {
245            let token = match issue_token(agent.id, &req.owner, &state.token_config) {
246                Ok(t) => t,
247                Err(e) => {
248                    tracing::error!(error = %e, "failed to issue token");
249                    return (
250                        StatusCode::INTERNAL_SERVER_ERROR,
251                        Json(ErrorResponse {
252                            error: "token issuance failed".into(),
253                        }),
254                    )
255                        .into_response();
256                }
257            };
258
259            state.metrics.registered_agents.inc();
260            state.admin_audit_log(
261                "register_agent",
262                Some(agent.id),
263                &format!("owner={}", sanitize_for_log(&req.owner)),
264            );
265
266            (
267                StatusCode::CREATED,
268                Json(RegisterAgentResponse {
269                    agent_id: agent.id,
270                    token,
271                }),
272            )
273                .into_response()
274        }
275        Err(e) => {
276            tracing::error!(error = %e, "failed to register agent");
277            (
278                StatusCode::INTERNAL_SERVER_ERROR,
279                Json(ErrorResponse {
280                    error: e.to_string(),
281                }),
282            )
283                .into_response()
284        }
285    }
286}
287
288/// GET /agents/:id: get agent details.
289pub async fn get_agent(
290    State(state): State<AppState>,
291    headers: HeaderMap,
292    Path(id): Path<Uuid>,
293) -> impl IntoResponse {
294    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
295        return e.into_response();
296    }
297    if let Err(e) = check_admin_rate_limit(&state) {
298        return e.into_response();
299    }
300
301    state.admin_audit_log("get_agent", Some(id), "");
302
303    match state.registry.get_agent(id).await {
304        Ok(agent) => match serde_json::to_value(&agent) {
305            Ok(val) => (StatusCode::OK, Json(val)).into_response(),
306            Err(e) => (
307                StatusCode::INTERNAL_SERVER_ERROR,
308                Json(serde_json::json!({"error": e.to_string()})),
309            )
310                .into_response(),
311        },
312        Err(e) => (
313            StatusCode::NOT_FOUND,
314            Json(ErrorResponse {
315                error: e.to_string(),
316            }),
317        )
318            .into_response(),
319    }
320}
321
322/// POST /agents/:id/delegate: create delegation to sub-agent.
323pub async fn delegate_agent(
324    State(state): State<AppState>,
325    headers: HeaderMap,
326    Path(from_id): Path<Uuid>,
327    Json(req): Json<DelegateRequest>,
328) -> impl IntoResponse {
329    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
330        return e.into_response();
331    }
332    if let Err(e) = check_admin_rate_limit(&state) {
333        return e.into_response();
334    }
335
336    state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
337
338    match state
339        .registry
340        .create_delegation(from_id, req.to, req.scopes, req.expires_at)
341        .await
342    {
343        Ok(link) => match serde_json::to_value(&link) {
344            Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
345            Err(e) => (
346                StatusCode::INTERNAL_SERVER_ERROR,
347                Json(serde_json::json!({"error": e.to_string()})),
348            )
349                .into_response(),
350        },
351        Err(e) => {
352            let status = match &e {
353                arbiter_identity::IdentityError::DelegationSourceNotFound(_)
354                | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
355                    StatusCode::NOT_FOUND
356                }
357                arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
358                | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
359                | arbiter_identity::IdentityError::CircularDelegation { .. } => {
360                    StatusCode::BAD_REQUEST
361                }
362                _ => StatusCode::INTERNAL_SERVER_ERROR,
363            };
364            (
365                status,
366                Json(ErrorResponse {
367                    error: e.to_string(),
368                }),
369            )
370                .into_response()
371        }
372    }
373}
374
375/// GET /agents/:id/delegations: list incoming and outgoing delegations.
376pub async fn list_delegations(
377    State(state): State<AppState>,
378    headers: HeaderMap,
379    Path(id): Path<Uuid>,
380) -> impl IntoResponse {
381    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
382        return e.into_response();
383    }
384    if let Err(e) = check_admin_rate_limit(&state) {
385        return e.into_response();
386    }
387
388    state.admin_audit_log("list_delegations", Some(id), "");
389
390    // Verify agent exists.
391    if let Err(e) = state.registry.get_agent(id).await {
392        return (
393            StatusCode::NOT_FOUND,
394            Json(ErrorResponse {
395                error: e.to_string(),
396            }),
397        )
398            .into_response();
399    }
400
401    let outgoing = state.registry.list_delegations_from(id).await;
402    let incoming = state.registry.list_delegations_to(id).await;
403
404    (
405        StatusCode::OK,
406        Json(serde_json::json!({
407            "agent_id": id,
408            "outgoing": outgoing,
409            "incoming": incoming,
410            "outgoing_count": outgoing.len(),
411            "incoming_count": incoming.len(),
412        })),
413    )
414        .into_response()
415}
416
417/// DELETE /agents/:id: deactivate agent + cascade.
418pub async fn deactivate_agent(
419    State(state): State<AppState>,
420    headers: HeaderMap,
421    Path(id): Path<Uuid>,
422) -> impl IntoResponse {
423    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
424        return e.into_response();
425    }
426    if let Err(e) = check_admin_rate_limit(&state) {
427        return e.into_response();
428    }
429
430    state.admin_audit_log("deactivate_agent", Some(id), "cascade");
431
432    match state.registry.cascade_deactivate(id).await {
433        Ok(deactivated) => {
434            // Close sessions for all deactivated agents.
435            let mut total_sessions_closed = 0usize;
436            for &agent_id in &deactivated {
437                let closed = state.session_store.close_sessions_for_agent(agent_id).await;
438                total_sessions_closed += closed;
439                state.metrics.registered_agents.dec();
440            }
441            (
442                StatusCode::OK,
443                Json(serde_json::json!({
444                    "deactivated": deactivated,
445                    "count": deactivated.len(),
446                    "sessions_closed": total_sessions_closed,
447                })),
448            )
449                .into_response()
450        }
451        Err(e) => {
452            let status = match &e {
453                arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
454                arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
455                _ => StatusCode::INTERNAL_SERVER_ERROR,
456            };
457            (
458                status,
459                Json(ErrorResponse {
460                    error: e.to_string(),
461                }),
462            )
463                .into_response()
464        }
465    }
466}
467
468/// POST /agents/:id/token: issue new short-lived credential.
469pub async fn issue_agent_token(
470    State(state): State<AppState>,
471    headers: HeaderMap,
472    Path(id): Path<Uuid>,
473    body: Option<Json<TokenRequest>>,
474) -> impl IntoResponse {
475    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
476        return e.into_response();
477    }
478    if let Err(e) = check_admin_rate_limit(&state) {
479        return e.into_response();
480    }
481
482    state.admin_audit_log("issue_agent_token", Some(id), "");
483
484    let agent = match state.registry.get_agent(id).await {
485        Ok(a) => a,
486        Err(e) => {
487            return (
488                StatusCode::NOT_FOUND,
489                Json(ErrorResponse {
490                    error: e.to_string(),
491                }),
492            )
493                .into_response();
494        }
495    };
496
497    if !agent.active {
498        return (
499            StatusCode::BAD_REQUEST,
500            Json(ErrorResponse {
501                error: format!("agent {} is deactivated", id),
502            }),
503        )
504            .into_response();
505    }
506
507    let mut config = state.token_config.clone();
508    if let Some(Json(req)) = body
509        && let Some(expiry) = req.expiry_seconds
510    {
511        // P4: Reject non-positive expiry to prevent creating immediately-invalid tokens.
512        // (RT-003 F-09: token expiry accepts negative values)
513        if expiry <= 0 {
514            return (
515                StatusCode::BAD_REQUEST,
516                Json(ErrorResponse {
517                    error: "expiry_seconds must be positive".into(),
518                }),
519            )
520                .into_response();
521        }
522        config.expiry_seconds = expiry;
523    }
524
525    match issue_token(agent.id, &agent.owner, &config) {
526        Ok(token) => (
527            StatusCode::OK,
528            Json(TokenResponse {
529                token,
530                expires_in: config.expiry_seconds,
531            }),
532        )
533            .into_response(),
534        Err(e) => {
535            tracing::error!(error = %e, "failed to issue token");
536            (
537                StatusCode::INTERNAL_SERVER_ERROR,
538                Json(ErrorResponse {
539                    error: "token issuance failed".into(),
540                }),
541            )
542                .into_response()
543        }
544    }
545}
546
547/// GET /agents: list all agents.
548pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
549    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
550        return e.into_response();
551    }
552    if let Err(e) = check_admin_rate_limit(&state) {
553        return e.into_response();
554    }
555
556    state.admin_audit_log("list_agents", None, "");
557
558    let agents = state.registry.list_agents().await;
559    match serde_json::to_value(&agents) {
560        Ok(val) => (StatusCode::OK, Json(val)).into_response(),
561        Err(e) => (
562            StatusCode::INTERNAL_SERVER_ERROR,
563            Json(serde_json::json!({"error": e.to_string()})),
564        )
565            .into_response(),
566    }
567}
568
569/// POST /sessions: create a new task session.
570pub async fn create_session(
571    State(state): State<AppState>,
572    headers: HeaderMap,
573    Json(req): Json<CreateSessionApiRequest>,
574) -> impl IntoResponse {
575    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
576        return e.into_response();
577    }
578    if let Err(e) = check_admin_rate_limit(&state) {
579        return e.into_response();
580    }
581
582    state.admin_audit_log(
583        "create_session",
584        Some(req.agent_id),
585        &format!("intent={}", sanitize_for_log(&req.declared_intent)),
586    );
587
588    // P0: Validate agent exists, is active, and not expired before creating session.
589    // Without this check, sessions can be created for non-existent or deactivated agents,
590    // bypassing the entire identity model (RT-003 F-01: ghost agent sessions).
591    let agent = match state.registry.get_agent(req.agent_id).await {
592        Ok(a) => a,
593        Err(e) => {
594            tracing::warn!(
595                agent_id = %req.agent_id,
596                error = %e,
597                "session creation denied: agent not found"
598            );
599            return (
600                StatusCode::NOT_FOUND,
601                Json(ErrorResponse {
602                    error: format!("agent {} not found", req.agent_id),
603                }),
604            )
605                .into_response();
606        }
607    };
608
609    if !agent.active {
610        tracing::warn!(
611            agent_id = %req.agent_id,
612            "session creation denied: agent is deactivated"
613        );
614        return (
615            StatusCode::BAD_REQUEST,
616            Json(ErrorResponse {
617                error: format!("agent {} is deactivated", req.agent_id),
618            }),
619        )
620            .into_response();
621    }
622
623    if let Some(expires_at) = agent.expires_at
624        && expires_at < Utc::now()
625    {
626        tracing::warn!(
627            agent_id = %req.agent_id,
628            expires_at = %expires_at,
629            "session creation denied: agent has expired"
630        );
631        return (
632            StatusCode::BAD_REQUEST,
633            Json(ErrorResponse {
634                error: format!("agent {} has expired", req.agent_id),
635            }),
636        )
637            .into_response();
638    }
639
640    // P0: Per-agent session cap to prevent session multiplication attacks.
641    // An agent creating N sessions * M budget each = N*M tool calls,
642    // bypassing per-session rate limits.
643    if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
644        let active_count = state
645            .session_store
646            .count_active_for_agent(req.agent_id)
647            .await;
648        if active_count >= max_sessions {
649            tracing::warn!(
650                agent_id = %req.agent_id,
651                active = active_count,
652                max = max_sessions,
653                "session creation denied: per-agent concurrent session cap reached"
654            );
655            return (
656                StatusCode::TOO_MANY_REQUESTS,
657                Json(ErrorResponse {
658                    error: format!(
659                        "agent {} has too many concurrent sessions ({}/{})",
660                        req.agent_id, active_count, max_sessions
661                    ),
662                }),
663            )
664                .into_response();
665        }
666    }
667
668    // Validate session creation parameters.
669    if req.time_limit_secs <= 0 {
670        return (
671            StatusCode::BAD_REQUEST,
672            Json(ErrorResponse {
673                error: "time_limit_secs must be positive".into(),
674            }),
675        )
676            .into_response();
677    }
678    // Validate session creation parameters.
679    if req.time_limit_secs > 86400 {
680        return (
681            StatusCode::BAD_REQUEST,
682            Json(ErrorResponse {
683                error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
684            }),
685        )
686            .into_response();
687    }
688    if req.call_budget == 0 {
689        return (
690            StatusCode::BAD_REQUEST,
691            Json(ErrorResponse {
692                error: "call_budget must be positive".into(),
693            }),
694        )
695            .into_response();
696    }
697    if req.call_budget > 1_000_000 {
698        return (
699            StatusCode::BAD_REQUEST,
700            Json(ErrorResponse {
701                error: "call_budget cannot exceed 1000000".into(),
702            }),
703        )
704            .into_response();
705    }
706    if req.declared_intent.trim().is_empty() {
707        return (
708            StatusCode::BAD_REQUEST,
709            Json(ErrorResponse {
710                error: "declared_intent must not be empty".into(),
711            }),
712        )
713            .into_response();
714    }
715
716    // P1: Validate rate_limit_window_secs bounds to prevent rate limit bypass.
717    // A window of 0 disables rate limiting entirely; a window of 1 second converts
718    // "per minute" limits into "per second" (100x amplification).
719    // (RT-003 F-03: rate limit window manipulation)
720    if let Some(window) = req.rate_limit_window_secs {
721        if window == 0 {
722            return (
723                StatusCode::BAD_REQUEST,
724                Json(ErrorResponse {
725                    error: "rate_limit_window_secs must be positive".into(),
726                }),
727            )
728                .into_response();
729        }
730        if window < 10 {
731            return (
732                StatusCode::BAD_REQUEST,
733                Json(ErrorResponse {
734                    error: "rate_limit_window_secs must be at least 10 seconds".into(),
735                }),
736            )
737                .into_response();
738        }
739        if window > 3600 {
740            return (
741                StatusCode::BAD_REQUEST,
742                Json(ErrorResponse {
743                    error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
744                }),
745            )
746                .into_response();
747        }
748    }
749
750    let create_req = CreateSessionRequest {
751        agent_id: req.agent_id,
752        delegation_chain_snapshot: req.delegation_chain_snapshot,
753        declared_intent: req.declared_intent,
754        authorized_tools: req.authorized_tools,
755        time_limit: chrono::Duration::seconds(req.time_limit_secs),
756        call_budget: req.call_budget,
757        rate_limit_per_minute: req.rate_limit_per_minute,
758        rate_limit_window_secs: req
759            .rate_limit_window_secs
760            .unwrap_or(state.default_rate_limit_window_secs),
761        data_sensitivity_ceiling: req.data_sensitivity_ceiling,
762    };
763
764    let session = state.session_store.create(create_req).await;
765
766    state.metrics.active_sessions.inc();
767
768    (
769        StatusCode::CREATED,
770        Json(CreateSessionResponse {
771            session_id: session.session_id,
772            declared_intent: session.declared_intent,
773            authorized_tools: session.authorized_tools,
774            call_budget: session.call_budget,
775            time_limit_secs: req.time_limit_secs,
776        }),
777    )
778        .into_response()
779}
780
781/// GET /sessions/:id: get live session status.
782pub async fn get_session(
783    State(state): State<AppState>,
784    headers: HeaderMap,
785    Path(id): Path<Uuid>,
786) -> impl IntoResponse {
787    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
788        return e.into_response();
789    }
790    if let Err(e) = check_admin_rate_limit(&state) {
791        return e.into_response();
792    }
793
794    state.admin_audit_log("get_session", None, &format!("session_id={}", id));
795
796    let session = match state.session_store.get(id).await {
797        Ok(s) => s,
798        Err(e) => {
799            return (
800                StatusCode::NOT_FOUND,
801                Json(ErrorResponse {
802                    error: e.to_string(),
803                }),
804            )
805                .into_response();
806        }
807    };
808
809    let now = Utc::now();
810    let expires_at = session.created_at + session.time_limit;
811    let seconds_remaining = (expires_at - now).num_seconds().max(0);
812    let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
813
814    let mut warnings = Vec::new();
815    let budget_pct_remaining = if session.call_budget > 0 {
816        (calls_remaining as f64 / session.call_budget as f64) * 100.0
817    } else {
818        0.0
819    };
820    let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
821        (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
822    } else {
823        0.0
824    };
825    if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
826        warnings.push(format!(
827            "budget low: {} of {} calls remaining",
828            calls_remaining, session.call_budget
829        ));
830    }
831    if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
832        warnings.push(format!("time low: {}s remaining", seconds_remaining));
833    }
834
835    let status_str = format!("{:?}", session.status).to_lowercase();
836    let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
837        .unwrap_or_default()
838        .as_str()
839        .unwrap_or("unknown")
840        .to_string();
841
842    (
843        StatusCode::OK,
844        Json(SessionStatusResponse {
845            session_id: session.session_id,
846            agent_id: session.agent_id,
847            status: status_str,
848            declared_intent: session.declared_intent,
849            authorized_tools: session.authorized_tools,
850            calls_made: session.calls_made,
851            call_budget: session.call_budget,
852            calls_remaining,
853            rate_limit_per_minute: session.rate_limit_per_minute,
854            data_sensitivity_ceiling: sensitivity_str,
855            created_at: session.created_at,
856            expires_at,
857            seconds_remaining,
858            warnings,
859        }),
860    )
861        .into_response()
862}
863
864/// Response body for POST /sessions/:id/close.
865#[derive(Debug, Serialize)]
866pub struct SessionCloseResponse {
867    pub session_id: Uuid,
868    pub status: String,
869    pub declared_intent: String,
870    pub total_calls: u64,
871    pub call_budget: u64,
872    pub budget_utilization_pct: f64,
873    pub time_used_secs: i64,
874    pub time_limit_secs: i64,
875    /// Number of denied requests during this session (from audit log).
876    pub denied_attempts: u64,
877    /// Number of requests that triggered anomaly flags (from audit log).
878    pub anomalies_detected: u64,
879}
880
881/// POST /sessions/:id/close: close a session and return summary.
882pub async fn close_session(
883    State(state): State<AppState>,
884    headers: HeaderMap,
885    Path(id): Path<Uuid>,
886) -> impl IntoResponse {
887    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
888        return e.into_response();
889    }
890    if let Err(e) = check_admin_rate_limit(&state) {
891        return e.into_response();
892    }
893
894    state.admin_audit_log("close_session", None, &format!("session_id={}", id));
895
896    let session = match state.session_store.close(id).await {
897        Ok(s) => {
898            state.metrics.active_sessions.dec();
899            s
900        }
901        Err(e) => {
902            let status = match &e {
903                arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
904                arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
905                _ => StatusCode::INTERNAL_SERVER_ERROR,
906            };
907            return (
908                status,
909                Json(ErrorResponse {
910                    error: e.to_string(),
911                }),
912            )
913                .into_response();
914        }
915    };
916
917    let now = Utc::now();
918    let time_used_secs = (now - session.created_at).num_seconds();
919    let budget_utilization_pct = if session.call_budget > 0 {
920        (session.calls_made as f64 / session.call_budget as f64) * 100.0
921    } else {
922        0.0
923    };
924
925    // Query audit stats for this session.
926    let session_id_str = id.to_string();
927    let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
928        let stats = sink.stats().stats_for_session(&session_id_str).await;
929        // Clean up stats now that session is closed.
930        sink.stats().remove_session(&session_id_str).await;
931        (stats.denied_count, stats.anomaly_count)
932    } else {
933        (0, 0)
934    };
935
936    (
937        StatusCode::OK,
938        Json(SessionCloseResponse {
939            session_id: session.session_id,
940            status: "closed".into(),
941            declared_intent: session.declared_intent,
942            total_calls: session.calls_made,
943            call_budget: session.call_budget,
944            budget_utilization_pct,
945            time_used_secs,
946            time_limit_secs: session.time_limit.num_seconds(),
947            denied_attempts,
948            anomalies_detected,
949        }),
950    )
951        .into_response()
952}
953
954/// Request body for POST /policy/explain.
955#[derive(Debug, Deserialize)]
956pub struct PolicyExplainRequest {
957    pub agent_id: Uuid,
958    pub declared_intent: String,
959    pub tool: String,
960    #[serde(default)]
961    pub arguments: Option<serde_json::Value>,
962    #[serde(default)]
963    pub principal: Option<String>,
964}
965
966/// Response body for POST /policy/explain.
967#[derive(Debug, Serialize)]
968pub struct PolicyExplainResponse {
969    pub decision: String,
970    pub matched_policy: Option<String>,
971    pub reason: Option<String>,
972    pub trace: Vec<PolicyTrace>,
973}
974
975/// POST /policy/explain: dry-run policy evaluation without executing.
976pub async fn explain_policy(
977    State(state): State<AppState>,
978    headers: HeaderMap,
979    Json(req): Json<PolicyExplainRequest>,
980) -> impl IntoResponse {
981    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
982        return e.into_response();
983    }
984    if let Err(e) = check_admin_rate_limit(&state) {
985        return e.into_response();
986    }
987
988    state.admin_audit_log(
989        "explain_policy",
990        Some(req.agent_id),
991        &format!("tool={}", sanitize_for_log(&req.tool)),
992    );
993
994    let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
995    let policy_config = match policy_snapshot.as_ref() {
996        Some(pc) => pc,
997        None => {
998            return (
999                StatusCode::OK,
1000                Json(PolicyExplainResponse {
1001                    decision: "allow".into(),
1002                    matched_policy: None,
1003                    reason: Some("no policies configured".into()),
1004                    trace: vec![],
1005                }),
1006            )
1007                .into_response();
1008        }
1009    };
1010
1011    let agent = match state.registry.get_agent(req.agent_id).await {
1012        Ok(a) => a,
1013        Err(e) => {
1014            return (
1015                StatusCode::NOT_FOUND,
1016                Json(ErrorResponse {
1017                    error: format!("agent not found: {e}"),
1018                }),
1019            )
1020                .into_response();
1021        }
1022    };
1023
1024    let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1025
1026    let eval_ctx = EvalContext {
1027        agent,
1028        delegation_chain: vec![],
1029        declared_intent: req.declared_intent,
1030        principal_sub: principal,
1031        principal_groups: vec![],
1032    };
1033
1034    let mcp_req = McpRequest {
1035        id: None,
1036        method: "tools/call".into(),
1037        tool_name: Some(req.tool.clone()),
1038        arguments: req.arguments,
1039        resource_uri: None,
1040    };
1041
1042    let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1043
1044    let (decision, matched_policy, reason) = match result.decision {
1045        arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1046        arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1047        arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1048        arbiter_policy::Decision::Annotate { policy_id, reason } => {
1049            ("annotate".into(), Some(policy_id), Some(reason))
1050        }
1051    };
1052
1053    (
1054        StatusCode::OK,
1055        Json(PolicyExplainResponse {
1056            decision,
1057            matched_policy,
1058            reason,
1059            trace: result.trace,
1060        }),
1061    )
1062        .into_response()
1063}
1064
1065/// Request body for POST /policy/validate.
1066#[derive(Debug, Deserialize)]
1067pub struct PolicyValidateRequest {
1068    pub policy_toml: String,
1069}
1070
1071/// POST /policy/validate: validate a policy TOML string without loading it.
1072pub async fn validate_policy(
1073    State(state): State<AppState>,
1074    headers: HeaderMap,
1075    Json(req): Json<PolicyValidateRequest>,
1076) -> impl IntoResponse {
1077    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1078        return e.into_response();
1079    }
1080    if let Err(e) = check_admin_rate_limit(&state) {
1081        return e.into_response();
1082    }
1083
1084    state.admin_audit_log("validate_policy", None, "");
1085
1086    let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1087    (StatusCode::OK, Json(result)).into_response()
1088}
1089
1090/// POST /policy/reload: re-read the policy file and atomically swap the config.
1091pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1092    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1093        return e.into_response();
1094    }
1095    if let Err(e) = check_admin_rate_limit(&state) {
1096        return e.into_response();
1097    }
1098
1099    state.admin_audit_log("reload_policy", None, "");
1100
1101    let path = match &state.policy_file_path {
1102        Some(p) => p.clone(),
1103        None => {
1104            return (
1105                StatusCode::BAD_REQUEST,
1106                Json(ErrorResponse {
1107                    error: "no policy file configured; policies are inline or absent".into(),
1108                }),
1109            )
1110                .into_response();
1111        }
1112    };
1113
1114    let contents = match std::fs::read_to_string(&path) {
1115        Ok(c) => c,
1116        Err(e) => {
1117            // Do not leak filesystem paths in error responses.
1118            tracing::error!(path = %path, error = %e, "failed to read policy file");
1119            return (
1120                StatusCode::INTERNAL_SERVER_ERROR,
1121                Json(ErrorResponse {
1122                    error: "failed to read policy file".into(),
1123                }),
1124            )
1125                .into_response();
1126        }
1127    };
1128
1129    let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1130        Ok(pc) => pc,
1131        Err(e) => {
1132            // Log parse errors internally, don't expose details.
1133            tracing::error!(error = %e, "failed to parse policy file");
1134            return (
1135                StatusCode::BAD_REQUEST,
1136                Json(ErrorResponse {
1137                    error: "failed to parse policy file".into(),
1138                }),
1139            )
1140                .into_response();
1141        }
1142    };
1143
1144    let policy_count = new_config.policies.len();
1145    let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1146
1147    // Audit log policy reload events for change tracking.
1148    tracing::warn!(
1149        path,
1150        policy_count,
1151        "AUDIT: policy configuration reloaded via admin API"
1152    );
1153    (
1154        StatusCode::OK,
1155        Json(serde_json::json!({
1156            "status": "ok",
1157            "reloaded": true,
1158            "policies_loaded": policy_count,
1159            "policy_count": policy_count,
1160            "file": path,
1161        })),
1162    )
1163        .into_response()
1164}
1165
1166/// GET /policy/schema: returns the policy TOML schema as JSON Schema.
1167pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1168    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1169        return e.into_response();
1170    }
1171    if let Err(e) = check_admin_rate_limit(&state) {
1172        return e.into_response();
1173    }
1174
1175    state.admin_audit_log("policy_schema", None, "");
1176
1177    let schema = serde_json::json!({
1178        "$schema": "https://json-schema.org/draft/2020-12/schema",
1179        "title": "Arbiter Policy Configuration",
1180        "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1181        "type": "object",
1182        "properties": {
1183            "policies": {
1184                "type": "array",
1185                "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1186                "items": {
1187                    "$ref": "#/$defs/Policy"
1188                }
1189            }
1190        },
1191        "$defs": {
1192            "Policy": {
1193                "type": "object",
1194                "description": "A single authorization policy rule.",
1195                "required": ["id", "effect"],
1196                "properties": {
1197                    "id": {
1198                        "type": "string",
1199                        "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1200                    },
1201                    "effect": {
1202                        "$ref": "#/$defs/Effect"
1203                    },
1204                    "agent_match": {
1205                        "$ref": "#/$defs/AgentMatch"
1206                    },
1207                    "principal_match": {
1208                        "$ref": "#/$defs/PrincipalMatch"
1209                    },
1210                    "intent_match": {
1211                        "$ref": "#/$defs/IntentMatch"
1212                    },
1213                    "allowed_tools": {
1214                        "type": "array",
1215                        "items": { "type": "string" },
1216                        "description": "Tools this policy applies to. Empty array means 'all tools'."
1217                    },
1218                    "parameter_constraints": {
1219                        "type": "array",
1220                        "items": { "$ref": "#/$defs/ParameterConstraint" },
1221                        "description": "Per-parameter bounds on tool arguments."
1222                    },
1223                    "priority": {
1224                        "type": "integer",
1225                        "default": 0,
1226                        "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1227                    }
1228                }
1229            },
1230            "Effect": {
1231                "type": "string",
1232                "enum": ["allow", "deny", "escalate"],
1233                "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1234            },
1235            "AgentMatch": {
1236                "type": "object",
1237                "description": "Criteria for matching the requesting agent. All specified fields must match.",
1238                "properties": {
1239                    "agent_id": {
1240                        "type": "string",
1241                        "format": "uuid",
1242                        "description": "Match a specific agent by UUID. Specificity: +100."
1243                    },
1244                    "trust_level": {
1245                        "$ref": "#/$defs/TrustLevel"
1246                    },
1247                    "capabilities": {
1248                        "type": "array",
1249                        "items": { "type": "string" },
1250                        "description": "Agent must have all listed capabilities. Specificity: +25 each."
1251                    }
1252                }
1253            },
1254            "TrustLevel": {
1255                "type": "string",
1256                "enum": ["untrusted", "basic", "verified", "trusted"],
1257                "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1258            },
1259            "PrincipalMatch": {
1260                "type": "object",
1261                "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1262                "properties": {
1263                    "sub": {
1264                        "type": "string",
1265                        "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1266                    },
1267                    "groups": {
1268                        "type": "array",
1269                        "items": { "type": "string" },
1270                        "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1271                    }
1272                }
1273            },
1274            "IntentMatch": {
1275                "type": "object",
1276                "description": "Criteria for matching the session's declared intent.",
1277                "properties": {
1278                    "keywords": {
1279                        "type": "array",
1280                        "items": { "type": "string" },
1281                        "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1282                    },
1283                    "regex": {
1284                        "type": "string",
1285                        "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1286                    }
1287                }
1288            },
1289            "ParameterConstraint": {
1290                "type": "object",
1291                "description": "A constraint on a tool call parameter.",
1292                "required": ["key"],
1293                "properties": {
1294                    "key": {
1295                        "type": "string",
1296                        "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1297                    },
1298                    "max_value": {
1299                        "type": "number",
1300                        "description": "Maximum numeric value allowed."
1301                    },
1302                    "min_value": {
1303                        "type": "number",
1304                        "description": "Minimum numeric value allowed."
1305                    },
1306                    "allowed_values": {
1307                        "type": "array",
1308                        "items": { "type": "string" },
1309                        "description": "Whitelist of allowed string values."
1310                    }
1311                }
1312            },
1313            "DataSensitivity": {
1314                "type": "string",
1315                "enum": ["public", "internal", "confidential", "restricted"],
1316                "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1317            }
1318        }
1319    });
1320
1321    (StatusCode::OK, Json(schema)).into_response()
1322}
1323
1324/// Build the axum router for the lifecycle API.
1325pub fn router(state: AppState) -> axum::Router {
1326    axum::Router::new()
1327        .route("/agents", axum::routing::post(register_agent))
1328        .route("/agents", axum::routing::get(list_agents))
1329        .route("/agents/{id}", axum::routing::get(get_agent))
1330        .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1331        .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1332        .route(
1333            "/agents/{id}/delegations",
1334            axum::routing::get(list_delegations),
1335        )
1336        .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1337        .route("/sessions", axum::routing::post(create_session))
1338        .route("/sessions/{id}", axum::routing::get(get_session))
1339        .route("/sessions/{id}/close", axum::routing::post(close_session))
1340        .route("/policy/explain", axum::routing::post(explain_policy))
1341        .route("/policy/validate", axum::routing::post(validate_policy))
1342        .route("/policy/reload", axum::routing::post(reload_policy))
1343        .route("/admin/policies/reload", axum::routing::post(reload_policy))
1344        .route("/policy/schema", axum::routing::get(policy_schema))
1345        .with_state(state)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350    use super::*;
1351
1352    /// RT-003 F-04: sanitize_for_log replaces control characters.
1353    #[test]
1354    fn sanitize_for_log_strips_newlines() {
1355        assert_eq!(
1356            sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1357            "read config\\nINFO ADMIN_AUDIT: fake"
1358        );
1359    }
1360
1361    #[test]
1362    fn sanitize_for_log_strips_carriage_return() {
1363        assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1364    }
1365
1366    #[test]
1367    fn sanitize_for_log_strips_tabs() {
1368        assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1369    }
1370
1371    #[test]
1372    fn sanitize_for_log_preserves_normal_text() {
1373        assert_eq!(
1374            sanitize_for_log("read configuration files"),
1375            "read configuration files"
1376        );
1377    }
1378}