Skip to main content

arbiter_lifecycle/
api.rs

1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18/// Request body for POST /agents.
19#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21    pub owner: String,
22    pub model: String,
23    #[serde(default)]
24    pub capabilities: Vec<String>,
25    #[serde(default = "default_trust_level")]
26    pub trust_level: TrustLevel,
27    #[serde(default)]
28    pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32    TrustLevel::Untrusted
33}
34
35/// Response body for POST /agents.
36#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38    pub agent_id: Uuid,
39    pub token: String,
40}
41
42/// Request body for POST /agents/:id/delegate.
43#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45    pub to: Uuid,
46    pub scopes: Vec<String>,
47    #[serde(default)]
48    pub expires_at: Option<DateTime<Utc>>,
49}
50
51/// Request body for POST /agents/:id/token.
52#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54    #[serde(default)]
55    pub expiry_seconds: Option<i64>,
56}
57
58/// Request body for POST /sessions.
59#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61    pub agent_id: Uuid,
62    #[serde(default)]
63    pub delegation_chain_snapshot: Vec<String>,
64    pub declared_intent: String,
65    #[serde(default)]
66    pub authorized_tools: Vec<String>,
67    #[serde(default = "default_time_limit")]
68    pub time_limit_secs: i64,
69    #[serde(default = "default_call_budget")]
70    pub call_budget: u64,
71    #[serde(default)]
72    pub rate_limit_per_minute: Option<u64>,
73    /// Override the global rate-limit window duration for this session (seconds).
74    /// If omitted, uses the server default from `[sessions].rate_limit_window_secs`.
75    #[serde(default)]
76    pub rate_limit_window_secs: Option<u64>,
77    #[serde(default = "default_data_sensitivity")]
78    pub data_sensitivity_ceiling: DataSensitivity,
79}
80
81fn default_time_limit() -> i64 {
82    3600
83}
84
85fn default_call_budget() -> u64 {
86    1000
87}
88
89fn default_data_sensitivity() -> DataSensitivity {
90    DataSensitivity::Internal
91}
92
93/// Response body for POST /sessions.
94#[derive(Debug, Serialize)]
95pub struct CreateSessionResponse {
96    pub session_id: Uuid,
97    pub declared_intent: String,
98    pub authorized_tools: Vec<String>,
99    pub call_budget: u64,
100    pub time_limit_secs: i64,
101}
102
103/// Response body for GET /sessions/:id.
104#[derive(Debug, Serialize)]
105pub struct SessionStatusResponse {
106    pub session_id: Uuid,
107    pub agent_id: Uuid,
108    pub status: String,
109    pub declared_intent: String,
110    pub authorized_tools: Vec<String>,
111    pub calls_made: u64,
112    pub call_budget: u64,
113    pub calls_remaining: u64,
114    pub rate_limit_per_minute: Option<u64>,
115    pub data_sensitivity_ceiling: String,
116    pub created_at: DateTime<Utc>,
117    pub expires_at: DateTime<Utc>,
118    pub seconds_remaining: i64,
119    pub warnings: Vec<String>,
120}
121
122/// Response for token issuance.
123#[derive(Debug, Serialize)]
124pub struct TokenResponse {
125    pub token: String,
126    pub expires_in: i64,
127}
128
129/// Error response body.
130#[derive(Debug, Serialize)]
131pub struct ErrorResponse {
132    pub error: String,
133}
134
135/// Constant-time byte comparison to prevent timing side-channel attacks
136/// on the admin API key (P0 credential fix).
137///
138/// Uses the `subtle` crate (`ConstantTimeEq`) instead
139/// of a hand-rolled implementation with fragile `black_box`/`#[inline(never)]`
140/// barriers. The `subtle` crate is the Rust cryptographic community standard
141/// and is designed to resist compiler optimizations that break constant-time
142/// guarantees. Pads both inputs to equal length and checks lengths separately
143/// to avoid leaking length information through timing.
144fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
145    use subtle::ConstantTimeEq;
146    // Pad both slices to the same length so ct_eq can compare equal-length
147    // slices, then separately check that the original lengths match.
148    // This avoids leaking length information through timing.
149    let max_len = std::cmp::max(a.len(), b.len());
150    let mut a_padded = vec![0u8; max_len];
151    let mut b_padded = vec![0u8; max_len];
152    a_padded[..a.len()].copy_from_slice(a);
153    b_padded[..b.len()].copy_from_slice(b);
154    let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
155    let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
156    bytes_equal & len_equal
157}
158
159/// Sanitize user-provided strings for safe inclusion in log messages.
160/// Replaces control characters (newlines, carriage returns, tabs) with
161/// their escaped representations to prevent log injection attacks.
162/// (RT-003 F-04: log injection via user input)
163fn sanitize_for_log(input: &str) -> String {
164    input
165        .replace('\n', "\\n")
166        .replace('\r', "\\r")
167        .replace('\t', "\\t")
168}
169
170/// Validate the admin API key from headers.
171///
172/// Uses constant-time comparison to prevent timing side-channel attacks.
173fn validate_admin_key(
174    headers: &HeaderMap,
175    expected: &str,
176) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
177    let key = headers
178        .get("x-api-key")
179        .and_then(|v| v.to_str().ok())
180        .ok_or_else(|| {
181            (
182                StatusCode::UNAUTHORIZED,
183                Json(ErrorResponse {
184                    error: "missing x-api-key header".into(),
185                }),
186            )
187        })?;
188
189    // P0 credential fix: constant-time comparison to prevent timing attacks.
190    // Return 401 (not 403) for invalid keys to avoid differentiating missing vs. wrong key,
191    // which would let an attacker confirm the header name without knowing the value.
192    if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
193        return Err((
194            StatusCode::UNAUTHORIZED,
195            Json(ErrorResponse {
196                error: "invalid API key".into(),
197            }),
198        ));
199    }
200
201    Ok(())
202}
203
204/// Check the admin API rate limiter and return 429 if the limit is exceeded.
205///
206/// Rate limiting prevents an attacker with a compromised API key
207/// from making unlimited requests to enumerate agents, create sessions, or
208/// overwhelm the control plane.
209fn check_admin_rate_limit(state: &AppState) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
210    if !state.admin_rate_limiter.check_rate_limit() {
211        tracing::warn!("ADMIN_AUDIT: rate limit exceeded on admin API");
212        return Err((
213            StatusCode::TOO_MANY_REQUESTS,
214            Json(ErrorResponse {
215                error: "admin API rate limit exceeded, try again later".into(),
216            }),
217        ));
218    }
219    Ok(())
220}
221
222/// POST /agents: register a new agent.
223pub async fn register_agent(
224    State(state): State<AppState>,
225    headers: HeaderMap,
226    Json(req): Json<RegisterAgentRequest>,
227) -> impl IntoResponse {
228    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
229        return e.into_response();
230    }
231    if let Err(e) = check_admin_rate_limit(&state) {
232        return e.into_response();
233    }
234
235    match state
236        .registry
237        .register_agent(
238            req.owner.clone(),
239            req.model,
240            req.capabilities,
241            req.trust_level,
242            req.expires_at,
243        )
244        .await
245    {
246        Ok(agent) => {
247            let token = match issue_token(agent.id, &req.owner, &state.token_config) {
248                Ok(t) => t,
249                Err(e) => {
250                    tracing::error!(error = %e, "failed to issue token");
251                    return (
252                        StatusCode::INTERNAL_SERVER_ERROR,
253                        Json(ErrorResponse {
254                            error: "token issuance failed".into(),
255                        }),
256                    )
257                        .into_response();
258                }
259            };
260
261            state.metrics.registered_agents.inc();
262            state.admin_audit_log(
263                "register_agent",
264                Some(agent.id),
265                &format!("owner={}", sanitize_for_log(&req.owner)),
266            );
267
268            (
269                StatusCode::CREATED,
270                Json(RegisterAgentResponse {
271                    agent_id: agent.id,
272                    token,
273                }),
274            )
275                .into_response()
276        }
277        Err(e) => {
278            tracing::error!(error = %e, "failed to register agent");
279            (
280                StatusCode::INTERNAL_SERVER_ERROR,
281                Json(ErrorResponse {
282                    error: e.to_string(),
283                }),
284            )
285                .into_response()
286        }
287    }
288}
289
290/// GET /agents/:id: get agent details.
291pub async fn get_agent(
292    State(state): State<AppState>,
293    headers: HeaderMap,
294    Path(id): Path<Uuid>,
295) -> impl IntoResponse {
296    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
297        return e.into_response();
298    }
299    if let Err(e) = check_admin_rate_limit(&state) {
300        return e.into_response();
301    }
302
303    state.admin_audit_log("get_agent", Some(id), "");
304
305    match state.registry.get_agent(id).await {
306        Ok(agent) => match serde_json::to_value(&agent) {
307            Ok(val) => (StatusCode::OK, Json(val)).into_response(),
308            Err(e) => (
309                StatusCode::INTERNAL_SERVER_ERROR,
310                Json(serde_json::json!({"error": e.to_string()})),
311            )
312                .into_response(),
313        },
314        Err(e) => (
315            StatusCode::NOT_FOUND,
316            Json(ErrorResponse {
317                error: e.to_string(),
318            }),
319        )
320            .into_response(),
321    }
322}
323
324/// POST /agents/:id/delegate: create delegation to sub-agent.
325pub async fn delegate_agent(
326    State(state): State<AppState>,
327    headers: HeaderMap,
328    Path(from_id): Path<Uuid>,
329    Json(req): Json<DelegateRequest>,
330) -> impl IntoResponse {
331    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
332        return e.into_response();
333    }
334    if let Err(e) = check_admin_rate_limit(&state) {
335        return e.into_response();
336    }
337
338    state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
339
340    match state
341        .registry
342        .create_delegation(from_id, req.to, req.scopes, req.expires_at)
343        .await
344    {
345        Ok(link) => match serde_json::to_value(&link) {
346            Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
347            Err(e) => (
348                StatusCode::INTERNAL_SERVER_ERROR,
349                Json(serde_json::json!({"error": e.to_string()})),
350            )
351                .into_response(),
352        },
353        Err(e) => {
354            let status = match &e {
355                arbiter_identity::IdentityError::DelegationSourceNotFound(_)
356                | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
357                    StatusCode::NOT_FOUND
358                }
359                arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
360                | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
361                | arbiter_identity::IdentityError::CircularDelegation { .. } => {
362                    StatusCode::BAD_REQUEST
363                }
364                _ => StatusCode::INTERNAL_SERVER_ERROR,
365            };
366            (
367                status,
368                Json(ErrorResponse {
369                    error: e.to_string(),
370                }),
371            )
372                .into_response()
373        }
374    }
375}
376
377/// GET /agents/:id/delegations: list incoming and outgoing delegations.
378pub async fn list_delegations(
379    State(state): State<AppState>,
380    headers: HeaderMap,
381    Path(id): Path<Uuid>,
382) -> impl IntoResponse {
383    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
384        return e.into_response();
385    }
386    if let Err(e) = check_admin_rate_limit(&state) {
387        return e.into_response();
388    }
389
390    state.admin_audit_log("list_delegations", Some(id), "");
391
392    // Verify agent exists.
393    if let Err(e) = state.registry.get_agent(id).await {
394        return (
395            StatusCode::NOT_FOUND,
396            Json(ErrorResponse {
397                error: e.to_string(),
398            }),
399        )
400            .into_response();
401    }
402
403    let outgoing = state.registry.list_delegations_from(id).await;
404    let incoming = state.registry.list_delegations_to(id).await;
405
406    (
407        StatusCode::OK,
408        Json(serde_json::json!({
409            "agent_id": id,
410            "outgoing": outgoing,
411            "incoming": incoming,
412            "outgoing_count": outgoing.len(),
413            "incoming_count": incoming.len(),
414        })),
415    )
416        .into_response()
417}
418
419/// DELETE /agents/:id: deactivate agent + cascade.
420pub async fn deactivate_agent(
421    State(state): State<AppState>,
422    headers: HeaderMap,
423    Path(id): Path<Uuid>,
424) -> impl IntoResponse {
425    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
426        return e.into_response();
427    }
428    if let Err(e) = check_admin_rate_limit(&state) {
429        return e.into_response();
430    }
431
432    state.admin_audit_log("deactivate_agent", Some(id), "cascade");
433
434    match state.registry.cascade_deactivate(id).await {
435        Ok(deactivated) => {
436            // Close sessions for all deactivated agents.
437            let mut total_sessions_closed = 0usize;
438            for &agent_id in &deactivated {
439                let closed = state.session_store.close_sessions_for_agent(agent_id).await;
440                total_sessions_closed += closed;
441                state.metrics.registered_agents.dec();
442            }
443            (
444                StatusCode::OK,
445                Json(serde_json::json!({
446                    "deactivated": deactivated,
447                    "count": deactivated.len(),
448                    "sessions_closed": total_sessions_closed,
449                })),
450            )
451                .into_response()
452        }
453        Err(e) => {
454            let status = match &e {
455                arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
456                arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
457                _ => StatusCode::INTERNAL_SERVER_ERROR,
458            };
459            (
460                status,
461                Json(ErrorResponse {
462                    error: e.to_string(),
463                }),
464            )
465                .into_response()
466        }
467    }
468}
469
470/// POST /agents/:id/token: issue new short-lived credential.
471pub async fn issue_agent_token(
472    State(state): State<AppState>,
473    headers: HeaderMap,
474    Path(id): Path<Uuid>,
475    body: Option<Json<TokenRequest>>,
476) -> impl IntoResponse {
477    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
478        return e.into_response();
479    }
480    if let Err(e) = check_admin_rate_limit(&state) {
481        return e.into_response();
482    }
483
484    state.admin_audit_log("issue_agent_token", Some(id), "");
485
486    let agent = match state.registry.get_agent(id).await {
487        Ok(a) => a,
488        Err(e) => {
489            return (
490                StatusCode::NOT_FOUND,
491                Json(ErrorResponse {
492                    error: e.to_string(),
493                }),
494            )
495                .into_response();
496        }
497    };
498
499    if !agent.active {
500        return (
501            StatusCode::BAD_REQUEST,
502            Json(ErrorResponse {
503                error: format!("agent {} is deactivated", id),
504            }),
505        )
506            .into_response();
507    }
508
509    let mut config = state.token_config.clone();
510    if let Some(Json(req)) = body
511        && let Some(expiry) = req.expiry_seconds
512    {
513        // P4: Reject non-positive expiry to prevent creating immediately-invalid tokens.
514        // (RT-003 F-09: token expiry accepts negative values)
515        if expiry <= 0 {
516            return (
517                StatusCode::BAD_REQUEST,
518                Json(ErrorResponse {
519                    error: "expiry_seconds must be positive".into(),
520                }),
521            )
522                .into_response();
523        }
524        config.expiry_seconds = expiry;
525    }
526
527    match issue_token(agent.id, &agent.owner, &config) {
528        Ok(token) => (
529            StatusCode::OK,
530            Json(TokenResponse {
531                token,
532                expires_in: config.expiry_seconds,
533            }),
534        )
535            .into_response(),
536        Err(e) => {
537            tracing::error!(error = %e, "failed to issue token");
538            (
539                StatusCode::INTERNAL_SERVER_ERROR,
540                Json(ErrorResponse {
541                    error: "token issuance failed".into(),
542                }),
543            )
544                .into_response()
545        }
546    }
547}
548
549/// GET /agents: list all agents.
550pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
551    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
552        return e.into_response();
553    }
554    if let Err(e) = check_admin_rate_limit(&state) {
555        return e.into_response();
556    }
557
558    state.admin_audit_log("list_agents", None, "");
559
560    let agents = state.registry.list_agents().await;
561    match serde_json::to_value(&agents) {
562        Ok(val) => (StatusCode::OK, Json(val)).into_response(),
563        Err(e) => (
564            StatusCode::INTERNAL_SERVER_ERROR,
565            Json(serde_json::json!({"error": e.to_string()})),
566        )
567            .into_response(),
568    }
569}
570
571/// POST /sessions: create a new task session.
572pub async fn create_session(
573    State(state): State<AppState>,
574    headers: HeaderMap,
575    Json(req): Json<CreateSessionApiRequest>,
576) -> impl IntoResponse {
577    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
578        return e.into_response();
579    }
580    if let Err(e) = check_admin_rate_limit(&state) {
581        return e.into_response();
582    }
583
584    state.admin_audit_log(
585        "create_session",
586        Some(req.agent_id),
587        &format!("intent={}", sanitize_for_log(&req.declared_intent)),
588    );
589
590    // P0: Validate agent exists, is active, and not expired before creating session.
591    // Without this check, sessions can be created for non-existent or deactivated agents,
592    // bypassing the entire identity model (RT-003 F-01: ghost agent sessions).
593    let agent = match state.registry.get_agent(req.agent_id).await {
594        Ok(a) => a,
595        Err(e) => {
596            tracing::warn!(
597                agent_id = %req.agent_id,
598                error = %e,
599                "session creation denied: agent not found"
600            );
601            return (
602                StatusCode::NOT_FOUND,
603                Json(ErrorResponse {
604                    error: format!("agent {} not found", req.agent_id),
605                }),
606            )
607                .into_response();
608        }
609    };
610
611    if !agent.active {
612        tracing::warn!(
613            agent_id = %req.agent_id,
614            "session creation denied: agent is deactivated"
615        );
616        return (
617            StatusCode::BAD_REQUEST,
618            Json(ErrorResponse {
619                error: format!("agent {} is deactivated", req.agent_id),
620            }),
621        )
622            .into_response();
623    }
624
625    if let Some(expires_at) = agent.expires_at
626        && expires_at < Utc::now()
627    {
628        tracing::warn!(
629            agent_id = %req.agent_id,
630            expires_at = %expires_at,
631            "session creation denied: agent has expired"
632        );
633        return (
634            StatusCode::BAD_REQUEST,
635            Json(ErrorResponse {
636                error: format!("agent {} has expired", req.agent_id),
637            }),
638        )
639            .into_response();
640    }
641
642    // P0: Per-agent session cap to prevent session multiplication attacks.
643    // An agent creating N sessions * M budget each = N*M tool calls,
644    // bypassing per-session rate limits.
645    if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
646        let active_count = state
647            .session_store
648            .count_active_for_agent(req.agent_id)
649            .await;
650        if active_count >= max_sessions {
651            tracing::warn!(
652                agent_id = %req.agent_id,
653                active = active_count,
654                max = max_sessions,
655                "session creation denied: per-agent concurrent session cap reached"
656            );
657            return (
658                StatusCode::TOO_MANY_REQUESTS,
659                Json(ErrorResponse {
660                    error: format!(
661                        "agent {} has too many concurrent sessions ({}/{})",
662                        req.agent_id, active_count, max_sessions
663                    ),
664                }),
665            )
666                .into_response();
667        }
668    }
669
670    // Validate session creation parameters.
671    if req.time_limit_secs <= 0 {
672        return (
673            StatusCode::BAD_REQUEST,
674            Json(ErrorResponse {
675                error: "time_limit_secs must be positive".into(),
676            }),
677        )
678            .into_response();
679    }
680    // Validate session creation parameters.
681    if req.time_limit_secs > 86400 {
682        return (
683            StatusCode::BAD_REQUEST,
684            Json(ErrorResponse {
685                error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
686            }),
687        )
688            .into_response();
689    }
690    if req.call_budget == 0 {
691        return (
692            StatusCode::BAD_REQUEST,
693            Json(ErrorResponse {
694                error: "call_budget must be positive".into(),
695            }),
696        )
697            .into_response();
698    }
699    if req.call_budget > 1_000_000 {
700        return (
701            StatusCode::BAD_REQUEST,
702            Json(ErrorResponse {
703                error: "call_budget cannot exceed 1000000".into(),
704            }),
705        )
706            .into_response();
707    }
708    if req.declared_intent.trim().is_empty() {
709        return (
710            StatusCode::BAD_REQUEST,
711            Json(ErrorResponse {
712                error: "declared_intent must not be empty".into(),
713            }),
714        )
715            .into_response();
716    }
717
718    // P1: Validate rate_limit_window_secs bounds to prevent rate limit bypass.
719    // A window of 0 disables rate limiting entirely; a window of 1 second converts
720    // "per minute" limits into "per second" (100x amplification).
721    // (RT-003 F-03: rate limit window manipulation)
722    if let Some(window) = req.rate_limit_window_secs {
723        if window == 0 {
724            return (
725                StatusCode::BAD_REQUEST,
726                Json(ErrorResponse {
727                    error: "rate_limit_window_secs must be positive".into(),
728                }),
729            )
730                .into_response();
731        }
732        if window < 10 {
733            return (
734                StatusCode::BAD_REQUEST,
735                Json(ErrorResponse {
736                    error: "rate_limit_window_secs must be at least 10 seconds".into(),
737                }),
738            )
739                .into_response();
740        }
741        if window > 3600 {
742            return (
743                StatusCode::BAD_REQUEST,
744                Json(ErrorResponse {
745                    error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
746                }),
747            )
748                .into_response();
749        }
750    }
751
752    if req.authorized_tools.is_empty() {
753        tracing::warn!(
754            agent_id = %req.agent_id,
755            "session created with empty authorized_tools; all tools will be permitted. \
756             Specify an explicit tool allowlist for least-privilege enforcement."
757        );
758    }
759
760    let create_req = CreateSessionRequest {
761        agent_id: req.agent_id,
762        delegation_chain_snapshot: req.delegation_chain_snapshot,
763        declared_intent: req.declared_intent,
764        authorized_tools: req.authorized_tools,
765        time_limit: chrono::Duration::seconds(req.time_limit_secs),
766        call_budget: req.call_budget,
767        rate_limit_per_minute: req.rate_limit_per_minute,
768        rate_limit_window_secs: req
769            .rate_limit_window_secs
770            .unwrap_or(state.default_rate_limit_window_secs),
771        data_sensitivity_ceiling: req.data_sensitivity_ceiling,
772    };
773
774    let session = state.session_store.create(create_req).await;
775
776    state.metrics.active_sessions.inc();
777
778    (
779        StatusCode::CREATED,
780        Json(CreateSessionResponse {
781            session_id: session.session_id,
782            declared_intent: session.declared_intent,
783            authorized_tools: session.authorized_tools,
784            call_budget: session.call_budget,
785            time_limit_secs: req.time_limit_secs,
786        }),
787    )
788        .into_response()
789}
790
791/// GET /sessions/:id: get live session status.
792pub async fn get_session(
793    State(state): State<AppState>,
794    headers: HeaderMap,
795    Path(id): Path<Uuid>,
796) -> impl IntoResponse {
797    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
798        return e.into_response();
799    }
800    if let Err(e) = check_admin_rate_limit(&state) {
801        return e.into_response();
802    }
803
804    state.admin_audit_log("get_session", None, &format!("session_id={}", id));
805
806    let session = match state.session_store.get(id).await {
807        Ok(s) => s,
808        Err(e) => {
809            return (
810                StatusCode::NOT_FOUND,
811                Json(ErrorResponse {
812                    error: e.to_string(),
813                }),
814            )
815                .into_response();
816        }
817    };
818
819    let now = Utc::now();
820    let expires_at = session.created_at + session.time_limit;
821    let seconds_remaining = (expires_at - now).num_seconds().max(0);
822    let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
823
824    let mut warnings = Vec::new();
825    let budget_pct_remaining = if session.call_budget > 0 {
826        (calls_remaining as f64 / session.call_budget as f64) * 100.0
827    } else {
828        0.0
829    };
830    let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
831        (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
832    } else {
833        0.0
834    };
835    if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
836        warnings.push(format!(
837            "budget low: {} of {} calls remaining",
838            calls_remaining, session.call_budget
839        ));
840    }
841    if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
842        warnings.push(format!("time low: {}s remaining", seconds_remaining));
843    }
844
845    let status_str = format!("{:?}", session.status).to_lowercase();
846    let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
847        .unwrap_or_default()
848        .as_str()
849        .unwrap_or("unknown")
850        .to_string();
851
852    (
853        StatusCode::OK,
854        Json(SessionStatusResponse {
855            session_id: session.session_id,
856            agent_id: session.agent_id,
857            status: status_str,
858            declared_intent: session.declared_intent,
859            authorized_tools: session.authorized_tools,
860            calls_made: session.calls_made,
861            call_budget: session.call_budget,
862            calls_remaining,
863            rate_limit_per_minute: session.rate_limit_per_minute,
864            data_sensitivity_ceiling: sensitivity_str,
865            created_at: session.created_at,
866            expires_at,
867            seconds_remaining,
868            warnings,
869        }),
870    )
871        .into_response()
872}
873
874/// Response body for POST /sessions/:id/close.
875#[derive(Debug, Serialize)]
876pub struct SessionCloseResponse {
877    pub session_id: Uuid,
878    pub status: String,
879    pub declared_intent: String,
880    pub total_calls: u64,
881    pub call_budget: u64,
882    pub budget_utilization_pct: f64,
883    pub time_used_secs: i64,
884    pub time_limit_secs: i64,
885    /// Number of denied requests during this session (from audit log).
886    pub denied_attempts: u64,
887    /// Number of requests that triggered anomaly flags (from audit log).
888    pub anomalies_detected: u64,
889}
890
891/// POST /sessions/:id/close: close a session and return summary.
892pub async fn close_session(
893    State(state): State<AppState>,
894    headers: HeaderMap,
895    Path(id): Path<Uuid>,
896) -> impl IntoResponse {
897    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
898        return e.into_response();
899    }
900    if let Err(e) = check_admin_rate_limit(&state) {
901        return e.into_response();
902    }
903
904    state.admin_audit_log("close_session", None, &format!("session_id={}", id));
905
906    let session = match state.session_store.close(id).await {
907        Ok(s) => {
908            state.metrics.active_sessions.dec();
909            s
910        }
911        Err(e) => {
912            let status = match &e {
913                arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
914                arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
915                _ => StatusCode::INTERNAL_SERVER_ERROR,
916            };
917            return (
918                status,
919                Json(ErrorResponse {
920                    error: e.to_string(),
921                }),
922            )
923                .into_response();
924        }
925    };
926
927    let now = Utc::now();
928    let time_used_secs = (now - session.created_at).num_seconds();
929    let budget_utilization_pct = if session.call_budget > 0 {
930        (session.calls_made as f64 / session.call_budget as f64) * 100.0
931    } else {
932        0.0
933    };
934
935    // Query audit stats for this session.
936    let session_id_str = id.to_string();
937    let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
938        let stats = sink.stats().stats_for_session(&session_id_str).await;
939        // Clean up stats now that session is closed.
940        sink.stats().remove_session(&session_id_str).await;
941        (stats.denied_count, stats.anomaly_count)
942    } else {
943        (0, 0)
944    };
945
946    (
947        StatusCode::OK,
948        Json(SessionCloseResponse {
949            session_id: session.session_id,
950            status: "closed".into(),
951            declared_intent: session.declared_intent,
952            total_calls: session.calls_made,
953            call_budget: session.call_budget,
954            budget_utilization_pct,
955            time_used_secs,
956            time_limit_secs: session.time_limit.num_seconds(),
957            denied_attempts,
958            anomalies_detected,
959        }),
960    )
961        .into_response()
962}
963
964/// Request body for POST /policy/explain.
965#[derive(Debug, Deserialize)]
966pub struct PolicyExplainRequest {
967    pub agent_id: Uuid,
968    pub declared_intent: String,
969    pub tool: String,
970    #[serde(default)]
971    pub arguments: Option<serde_json::Value>,
972    #[serde(default)]
973    pub principal: Option<String>,
974}
975
976/// Response body for POST /policy/explain.
977#[derive(Debug, Serialize)]
978pub struct PolicyExplainResponse {
979    pub decision: String,
980    pub matched_policy: Option<String>,
981    pub reason: Option<String>,
982    pub trace: Vec<PolicyTrace>,
983}
984
985/// POST /policy/explain: dry-run policy evaluation without executing.
986pub async fn explain_policy(
987    State(state): State<AppState>,
988    headers: HeaderMap,
989    Json(req): Json<PolicyExplainRequest>,
990) -> impl IntoResponse {
991    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
992        return e.into_response();
993    }
994    if let Err(e) = check_admin_rate_limit(&state) {
995        return e.into_response();
996    }
997
998    state.admin_audit_log(
999        "explain_policy",
1000        Some(req.agent_id),
1001        &format!("tool={}", sanitize_for_log(&req.tool)),
1002    );
1003
1004    let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
1005    let policy_config = match policy_snapshot.as_ref() {
1006        Some(pc) => pc,
1007        None => {
1008            return (
1009                StatusCode::OK,
1010                Json(PolicyExplainResponse {
1011                    decision: "allow".into(),
1012                    matched_policy: None,
1013                    reason: Some("no policies configured".into()),
1014                    trace: vec![],
1015                }),
1016            )
1017                .into_response();
1018        }
1019    };
1020
1021    let agent = match state.registry.get_agent(req.agent_id).await {
1022        Ok(a) => a,
1023        Err(e) => {
1024            return (
1025                StatusCode::NOT_FOUND,
1026                Json(ErrorResponse {
1027                    error: format!("agent not found: {e}"),
1028                }),
1029            )
1030                .into_response();
1031        }
1032    };
1033
1034    let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1035
1036    let eval_ctx = EvalContext {
1037        agent,
1038        delegation_chain: vec![],
1039        declared_intent: req.declared_intent,
1040        principal_sub: principal,
1041        principal_groups: vec![],
1042    };
1043
1044    let mcp_req = McpRequest {
1045        id: None,
1046        method: "tools/call".into(),
1047        tool_name: Some(req.tool.clone()),
1048        arguments: req.arguments,
1049        resource_uri: None,
1050    };
1051
1052    let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1053
1054    let (decision, matched_policy, reason) = match result.decision {
1055        arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1056        arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1057        arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1058        arbiter_policy::Decision::Annotate { policy_id, reason } => {
1059            ("annotate".into(), Some(policy_id), Some(reason))
1060        }
1061    };
1062
1063    (
1064        StatusCode::OK,
1065        Json(PolicyExplainResponse {
1066            decision,
1067            matched_policy,
1068            reason,
1069            trace: result.trace,
1070        }),
1071    )
1072        .into_response()
1073}
1074
1075/// Request body for POST /policy/validate.
1076#[derive(Debug, Deserialize)]
1077pub struct PolicyValidateRequest {
1078    pub policy_toml: String,
1079}
1080
1081/// POST /policy/validate: validate a policy TOML string without loading it.
1082pub async fn validate_policy(
1083    State(state): State<AppState>,
1084    headers: HeaderMap,
1085    Json(req): Json<PolicyValidateRequest>,
1086) -> impl IntoResponse {
1087    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1088        return e.into_response();
1089    }
1090    if let Err(e) = check_admin_rate_limit(&state) {
1091        return e.into_response();
1092    }
1093
1094    state.admin_audit_log("validate_policy", None, "");
1095
1096    let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1097    (StatusCode::OK, Json(result)).into_response()
1098}
1099
1100/// POST /policy/reload: re-read the policy file and atomically swap the config.
1101pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1102    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1103        return e.into_response();
1104    }
1105    if let Err(e) = check_admin_rate_limit(&state) {
1106        return e.into_response();
1107    }
1108
1109    state.admin_audit_log("reload_policy", None, "");
1110
1111    let path = match &state.policy_file_path {
1112        Some(p) => p.clone(),
1113        None => {
1114            return (
1115                StatusCode::BAD_REQUEST,
1116                Json(ErrorResponse {
1117                    error: "no policy file configured; policies are inline or absent".into(),
1118                }),
1119            )
1120                .into_response();
1121        }
1122    };
1123
1124    let contents = match std::fs::read_to_string(&path) {
1125        Ok(c) => c,
1126        Err(e) => {
1127            // Do not leak filesystem paths in error responses.
1128            tracing::error!(path = %path, error = %e, "failed to read policy file");
1129            return (
1130                StatusCode::INTERNAL_SERVER_ERROR,
1131                Json(ErrorResponse {
1132                    error: "failed to read policy file".into(),
1133                }),
1134            )
1135                .into_response();
1136        }
1137    };
1138
1139    let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1140        Ok(pc) => pc,
1141        Err(e) => {
1142            // Log parse errors internally, don't expose details.
1143            tracing::error!(error = %e, "failed to parse policy file");
1144            return (
1145                StatusCode::BAD_REQUEST,
1146                Json(ErrorResponse {
1147                    error: "failed to parse policy file".into(),
1148                }),
1149            )
1150                .into_response();
1151        }
1152    };
1153
1154    let policy_count = new_config.policies.len();
1155    let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1156
1157    // Audit log policy reload events for change tracking.
1158    tracing::warn!(
1159        path,
1160        policy_count,
1161        "AUDIT: policy configuration reloaded via admin API"
1162    );
1163    (
1164        StatusCode::OK,
1165        Json(serde_json::json!({
1166            "status": "ok",
1167            "reloaded": true,
1168            "policies_loaded": policy_count,
1169            "policy_count": policy_count,
1170            "file": path,
1171        })),
1172    )
1173        .into_response()
1174}
1175
1176/// GET /policy/schema: returns the policy TOML schema as JSON Schema.
1177pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1178    if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1179        return e.into_response();
1180    }
1181    if let Err(e) = check_admin_rate_limit(&state) {
1182        return e.into_response();
1183    }
1184
1185    state.admin_audit_log("policy_schema", None, "");
1186
1187    let schema = serde_json::json!({
1188        "$schema": "https://json-schema.org/draft/2020-12/schema",
1189        "title": "Arbiter Policy Configuration",
1190        "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1191        "type": "object",
1192        "properties": {
1193            "policies": {
1194                "type": "array",
1195                "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1196                "items": {
1197                    "$ref": "#/$defs/Policy"
1198                }
1199            }
1200        },
1201        "$defs": {
1202            "Policy": {
1203                "type": "object",
1204                "description": "A single authorization policy rule.",
1205                "required": ["id", "effect"],
1206                "properties": {
1207                    "id": {
1208                        "type": "string",
1209                        "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1210                    },
1211                    "effect": {
1212                        "$ref": "#/$defs/Effect"
1213                    },
1214                    "agent_match": {
1215                        "$ref": "#/$defs/AgentMatch"
1216                    },
1217                    "principal_match": {
1218                        "$ref": "#/$defs/PrincipalMatch"
1219                    },
1220                    "intent_match": {
1221                        "$ref": "#/$defs/IntentMatch"
1222                    },
1223                    "allowed_tools": {
1224                        "type": "array",
1225                        "items": { "type": "string" },
1226                        "description": "Tools this policy applies to. Empty array means 'all tools'."
1227                    },
1228                    "parameter_constraints": {
1229                        "type": "array",
1230                        "items": { "$ref": "#/$defs/ParameterConstraint" },
1231                        "description": "Per-parameter bounds on tool arguments."
1232                    },
1233                    "priority": {
1234                        "type": "integer",
1235                        "default": 0,
1236                        "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1237                    }
1238                }
1239            },
1240            "Effect": {
1241                "type": "string",
1242                "enum": ["allow", "deny", "escalate"],
1243                "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1244            },
1245            "AgentMatch": {
1246                "type": "object",
1247                "description": "Criteria for matching the requesting agent. All specified fields must match.",
1248                "properties": {
1249                    "agent_id": {
1250                        "type": "string",
1251                        "format": "uuid",
1252                        "description": "Match a specific agent by UUID. Specificity: +100."
1253                    },
1254                    "trust_level": {
1255                        "$ref": "#/$defs/TrustLevel"
1256                    },
1257                    "capabilities": {
1258                        "type": "array",
1259                        "items": { "type": "string" },
1260                        "description": "Agent must have all listed capabilities. Specificity: +25 each."
1261                    }
1262                }
1263            },
1264            "TrustLevel": {
1265                "type": "string",
1266                "enum": ["untrusted", "basic", "verified", "trusted"],
1267                "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1268            },
1269            "PrincipalMatch": {
1270                "type": "object",
1271                "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1272                "properties": {
1273                    "sub": {
1274                        "type": "string",
1275                        "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1276                    },
1277                    "groups": {
1278                        "type": "array",
1279                        "items": { "type": "string" },
1280                        "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1281                    }
1282                }
1283            },
1284            "IntentMatch": {
1285                "type": "object",
1286                "description": "Criteria for matching the session's declared intent.",
1287                "properties": {
1288                    "keywords": {
1289                        "type": "array",
1290                        "items": { "type": "string" },
1291                        "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1292                    },
1293                    "regex": {
1294                        "type": "string",
1295                        "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1296                    }
1297                }
1298            },
1299            "ParameterConstraint": {
1300                "type": "object",
1301                "description": "A constraint on a tool call parameter.",
1302                "required": ["key"],
1303                "properties": {
1304                    "key": {
1305                        "type": "string",
1306                        "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1307                    },
1308                    "max_value": {
1309                        "type": "number",
1310                        "description": "Maximum numeric value allowed."
1311                    },
1312                    "min_value": {
1313                        "type": "number",
1314                        "description": "Minimum numeric value allowed."
1315                    },
1316                    "allowed_values": {
1317                        "type": "array",
1318                        "items": { "type": "string" },
1319                        "description": "Whitelist of allowed string values."
1320                    }
1321                }
1322            },
1323            "DataSensitivity": {
1324                "type": "string",
1325                "enum": ["public", "internal", "confidential", "restricted"],
1326                "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1327            }
1328        }
1329    });
1330
1331    (StatusCode::OK, Json(schema)).into_response()
1332}
1333
1334/// Build the axum router for the lifecycle API.
1335pub fn router(state: AppState) -> axum::Router {
1336    axum::Router::new()
1337        .route("/agents", axum::routing::post(register_agent))
1338        .route("/agents", axum::routing::get(list_agents))
1339        .route("/agents/{id}", axum::routing::get(get_agent))
1340        .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1341        .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1342        .route(
1343            "/agents/{id}/delegations",
1344            axum::routing::get(list_delegations),
1345        )
1346        .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1347        .route("/sessions", axum::routing::post(create_session))
1348        .route("/sessions/{id}", axum::routing::get(get_session))
1349        .route("/sessions/{id}/close", axum::routing::post(close_session))
1350        .route("/policy/explain", axum::routing::post(explain_policy))
1351        .route("/policy/validate", axum::routing::post(validate_policy))
1352        .route("/policy/reload", axum::routing::post(reload_policy))
1353        .route("/admin/policies/reload", axum::routing::post(reload_policy))
1354        .route("/policy/schema", axum::routing::get(policy_schema))
1355        .with_state(state)
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361
1362    /// RT-003 F-04: sanitize_for_log replaces control characters.
1363    #[test]
1364    fn sanitize_for_log_strips_newlines() {
1365        assert_eq!(
1366            sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1367            "read config\\nINFO ADMIN_AUDIT: fake"
1368        );
1369    }
1370
1371    #[test]
1372    fn sanitize_for_log_strips_carriage_return() {
1373        assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1374    }
1375
1376    #[test]
1377    fn sanitize_for_log_strips_tabs() {
1378        assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1379    }
1380
1381    #[test]
1382    fn sanitize_for_log_preserves_normal_text() {
1383        assert_eq!(
1384            sanitize_for_log("read configuration files"),
1385            "read configuration files"
1386        );
1387    }
1388}