1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21 pub owner: String,
22 pub model: String,
23 #[serde(default)]
24 pub capabilities: Vec<String>,
25 #[serde(default = "default_trust_level")]
26 pub trust_level: TrustLevel,
27 #[serde(default)]
28 pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32 TrustLevel::Untrusted
33}
34
35#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38 pub agent_id: Uuid,
39 pub token: String,
40}
41
42#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45 pub to: Uuid,
46 pub scopes: Vec<String>,
47 #[serde(default)]
48 pub expires_at: Option<DateTime<Utc>>,
49}
50
51#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54 #[serde(default)]
55 pub expiry_seconds: Option<i64>,
56}
57
58#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61 pub agent_id: Uuid,
62 #[serde(default)]
63 pub delegation_chain_snapshot: Vec<String>,
64 pub declared_intent: String,
65 #[serde(default)]
66 pub authorized_tools: Vec<String>,
67 #[serde(default = "default_time_limit")]
68 pub time_limit_secs: i64,
69 #[serde(default = "default_call_budget")]
70 pub call_budget: u64,
71 #[serde(default)]
72 pub rate_limit_per_minute: Option<u64>,
73 #[serde(default)]
76 pub rate_limit_window_secs: Option<u64>,
77 #[serde(default = "default_data_sensitivity")]
78 pub data_sensitivity_ceiling: DataSensitivity,
79}
80
81fn default_time_limit() -> i64 {
82 3600
83}
84
85fn default_call_budget() -> u64 {
86 1000
87}
88
89fn default_data_sensitivity() -> DataSensitivity {
90 DataSensitivity::Internal
91}
92
93#[derive(Debug, Serialize)]
95pub struct CreateSessionResponse {
96 pub session_id: Uuid,
97 pub declared_intent: String,
98 pub authorized_tools: Vec<String>,
99 pub call_budget: u64,
100 pub time_limit_secs: i64,
101}
102
103#[derive(Debug, Serialize)]
105pub struct SessionStatusResponse {
106 pub session_id: Uuid,
107 pub agent_id: Uuid,
108 pub status: String,
109 pub declared_intent: String,
110 pub authorized_tools: Vec<String>,
111 pub calls_made: u64,
112 pub call_budget: u64,
113 pub calls_remaining: u64,
114 pub rate_limit_per_minute: Option<u64>,
115 pub data_sensitivity_ceiling: String,
116 pub created_at: DateTime<Utc>,
117 pub expires_at: DateTime<Utc>,
118 pub seconds_remaining: i64,
119 pub warnings: Vec<String>,
120}
121
122#[derive(Debug, Serialize)]
124pub struct TokenResponse {
125 pub token: String,
126 pub expires_in: i64,
127}
128
129#[derive(Debug, Serialize)]
131pub struct ErrorResponse {
132 pub error: String,
133}
134
135fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
145 use subtle::ConstantTimeEq;
146 let max_len = std::cmp::max(a.len(), b.len());
150 let mut a_padded = vec![0u8; max_len];
151 let mut b_padded = vec![0u8; max_len];
152 a_padded[..a.len()].copy_from_slice(a);
153 b_padded[..b.len()].copy_from_slice(b);
154 let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
155 let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
156 bytes_equal & len_equal
157}
158
159fn sanitize_for_log(input: &str) -> String {
164 input
165 .replace('\n', "\\n")
166 .replace('\r', "\\r")
167 .replace('\t', "\\t")
168}
169
170fn validate_admin_key(
174 headers: &HeaderMap,
175 expected: &str,
176) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
177 let key = headers
178 .get("x-api-key")
179 .and_then(|v| v.to_str().ok())
180 .ok_or_else(|| {
181 (
182 StatusCode::UNAUTHORIZED,
183 Json(ErrorResponse {
184 error: "missing x-api-key header".into(),
185 }),
186 )
187 })?;
188
189 if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
191 return Err((
192 StatusCode::FORBIDDEN,
193 Json(ErrorResponse {
194 error: "invalid API key".into(),
195 }),
196 ));
197 }
198
199 Ok(())
200}
201
202fn check_admin_rate_limit(state: &AppState) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
208 if !state.admin_rate_limiter.check_rate_limit() {
209 tracing::warn!("ADMIN_AUDIT: rate limit exceeded on admin API");
210 return Err((
211 StatusCode::TOO_MANY_REQUESTS,
212 Json(ErrorResponse {
213 error: "admin API rate limit exceeded, try again later".into(),
214 }),
215 ));
216 }
217 Ok(())
218}
219
220pub async fn register_agent(
222 State(state): State<AppState>,
223 headers: HeaderMap,
224 Json(req): Json<RegisterAgentRequest>,
225) -> impl IntoResponse {
226 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
227 return e.into_response();
228 }
229 if let Err(e) = check_admin_rate_limit(&state) {
230 return e.into_response();
231 }
232
233 match state
234 .registry
235 .register_agent(
236 req.owner.clone(),
237 req.model,
238 req.capabilities,
239 req.trust_level,
240 req.expires_at,
241 )
242 .await
243 {
244 Ok(agent) => {
245 let token = match issue_token(agent.id, &req.owner, &state.token_config) {
246 Ok(t) => t,
247 Err(e) => {
248 tracing::error!(error = %e, "failed to issue token");
249 return (
250 StatusCode::INTERNAL_SERVER_ERROR,
251 Json(ErrorResponse {
252 error: "token issuance failed".into(),
253 }),
254 )
255 .into_response();
256 }
257 };
258
259 state.metrics.registered_agents.inc();
260 state.admin_audit_log(
261 "register_agent",
262 Some(agent.id),
263 &format!("owner={}", sanitize_for_log(&req.owner)),
264 );
265
266 (
267 StatusCode::CREATED,
268 Json(RegisterAgentResponse {
269 agent_id: agent.id,
270 token,
271 }),
272 )
273 .into_response()
274 }
275 Err(e) => {
276 tracing::error!(error = %e, "failed to register agent");
277 (
278 StatusCode::INTERNAL_SERVER_ERROR,
279 Json(ErrorResponse {
280 error: e.to_string(),
281 }),
282 )
283 .into_response()
284 }
285 }
286}
287
288pub async fn get_agent(
290 State(state): State<AppState>,
291 headers: HeaderMap,
292 Path(id): Path<Uuid>,
293) -> impl IntoResponse {
294 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
295 return e.into_response();
296 }
297 if let Err(e) = check_admin_rate_limit(&state) {
298 return e.into_response();
299 }
300
301 state.admin_audit_log("get_agent", Some(id), "");
302
303 match state.registry.get_agent(id).await {
304 Ok(agent) => match serde_json::to_value(&agent) {
305 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
306 Err(e) => (
307 StatusCode::INTERNAL_SERVER_ERROR,
308 Json(serde_json::json!({"error": e.to_string()})),
309 )
310 .into_response(),
311 },
312 Err(e) => (
313 StatusCode::NOT_FOUND,
314 Json(ErrorResponse {
315 error: e.to_string(),
316 }),
317 )
318 .into_response(),
319 }
320}
321
322pub async fn delegate_agent(
324 State(state): State<AppState>,
325 headers: HeaderMap,
326 Path(from_id): Path<Uuid>,
327 Json(req): Json<DelegateRequest>,
328) -> impl IntoResponse {
329 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
330 return e.into_response();
331 }
332 if let Err(e) = check_admin_rate_limit(&state) {
333 return e.into_response();
334 }
335
336 state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
337
338 match state
339 .registry
340 .create_delegation(from_id, req.to, req.scopes, req.expires_at)
341 .await
342 {
343 Ok(link) => match serde_json::to_value(&link) {
344 Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
345 Err(e) => (
346 StatusCode::INTERNAL_SERVER_ERROR,
347 Json(serde_json::json!({"error": e.to_string()})),
348 )
349 .into_response(),
350 },
351 Err(e) => {
352 let status = match &e {
353 arbiter_identity::IdentityError::DelegationSourceNotFound(_)
354 | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
355 StatusCode::NOT_FOUND
356 }
357 arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
358 | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
359 | arbiter_identity::IdentityError::CircularDelegation { .. } => {
360 StatusCode::BAD_REQUEST
361 }
362 _ => StatusCode::INTERNAL_SERVER_ERROR,
363 };
364 (
365 status,
366 Json(ErrorResponse {
367 error: e.to_string(),
368 }),
369 )
370 .into_response()
371 }
372 }
373}
374
375pub async fn list_delegations(
377 State(state): State<AppState>,
378 headers: HeaderMap,
379 Path(id): Path<Uuid>,
380) -> impl IntoResponse {
381 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
382 return e.into_response();
383 }
384 if let Err(e) = check_admin_rate_limit(&state) {
385 return e.into_response();
386 }
387
388 state.admin_audit_log("list_delegations", Some(id), "");
389
390 if let Err(e) = state.registry.get_agent(id).await {
392 return (
393 StatusCode::NOT_FOUND,
394 Json(ErrorResponse {
395 error: e.to_string(),
396 }),
397 )
398 .into_response();
399 }
400
401 let outgoing = state.registry.list_delegations_from(id).await;
402 let incoming = state.registry.list_delegations_to(id).await;
403
404 (
405 StatusCode::OK,
406 Json(serde_json::json!({
407 "agent_id": id,
408 "outgoing": outgoing,
409 "incoming": incoming,
410 "outgoing_count": outgoing.len(),
411 "incoming_count": incoming.len(),
412 })),
413 )
414 .into_response()
415}
416
417pub async fn deactivate_agent(
419 State(state): State<AppState>,
420 headers: HeaderMap,
421 Path(id): Path<Uuid>,
422) -> impl IntoResponse {
423 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
424 return e.into_response();
425 }
426 if let Err(e) = check_admin_rate_limit(&state) {
427 return e.into_response();
428 }
429
430 state.admin_audit_log("deactivate_agent", Some(id), "cascade");
431
432 match state.registry.cascade_deactivate(id).await {
433 Ok(deactivated) => {
434 let mut total_sessions_closed = 0usize;
436 for &agent_id in &deactivated {
437 let closed = state.session_store.close_sessions_for_agent(agent_id).await;
438 total_sessions_closed += closed;
439 state.metrics.registered_agents.dec();
440 }
441 (
442 StatusCode::OK,
443 Json(serde_json::json!({
444 "deactivated": deactivated,
445 "count": deactivated.len(),
446 "sessions_closed": total_sessions_closed,
447 })),
448 )
449 .into_response()
450 }
451 Err(e) => {
452 let status = match &e {
453 arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
454 arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
455 _ => StatusCode::INTERNAL_SERVER_ERROR,
456 };
457 (
458 status,
459 Json(ErrorResponse {
460 error: e.to_string(),
461 }),
462 )
463 .into_response()
464 }
465 }
466}
467
468pub async fn issue_agent_token(
470 State(state): State<AppState>,
471 headers: HeaderMap,
472 Path(id): Path<Uuid>,
473 body: Option<Json<TokenRequest>>,
474) -> impl IntoResponse {
475 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
476 return e.into_response();
477 }
478 if let Err(e) = check_admin_rate_limit(&state) {
479 return e.into_response();
480 }
481
482 state.admin_audit_log("issue_agent_token", Some(id), "");
483
484 let agent = match state.registry.get_agent(id).await {
485 Ok(a) => a,
486 Err(e) => {
487 return (
488 StatusCode::NOT_FOUND,
489 Json(ErrorResponse {
490 error: e.to_string(),
491 }),
492 )
493 .into_response();
494 }
495 };
496
497 if !agent.active {
498 return (
499 StatusCode::BAD_REQUEST,
500 Json(ErrorResponse {
501 error: format!("agent {} is deactivated", id),
502 }),
503 )
504 .into_response();
505 }
506
507 let mut config = state.token_config.clone();
508 if let Some(Json(req)) = body
509 && let Some(expiry) = req.expiry_seconds
510 {
511 if expiry <= 0 {
514 return (
515 StatusCode::BAD_REQUEST,
516 Json(ErrorResponse {
517 error: "expiry_seconds must be positive".into(),
518 }),
519 )
520 .into_response();
521 }
522 config.expiry_seconds = expiry;
523 }
524
525 match issue_token(agent.id, &agent.owner, &config) {
526 Ok(token) => (
527 StatusCode::OK,
528 Json(TokenResponse {
529 token,
530 expires_in: config.expiry_seconds,
531 }),
532 )
533 .into_response(),
534 Err(e) => {
535 tracing::error!(error = %e, "failed to issue token");
536 (
537 StatusCode::INTERNAL_SERVER_ERROR,
538 Json(ErrorResponse {
539 error: "token issuance failed".into(),
540 }),
541 )
542 .into_response()
543 }
544 }
545}
546
547pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
549 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
550 return e.into_response();
551 }
552 if let Err(e) = check_admin_rate_limit(&state) {
553 return e.into_response();
554 }
555
556 state.admin_audit_log("list_agents", None, "");
557
558 let agents = state.registry.list_agents().await;
559 match serde_json::to_value(&agents) {
560 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
561 Err(e) => (
562 StatusCode::INTERNAL_SERVER_ERROR,
563 Json(serde_json::json!({"error": e.to_string()})),
564 )
565 .into_response(),
566 }
567}
568
569pub async fn create_session(
571 State(state): State<AppState>,
572 headers: HeaderMap,
573 Json(req): Json<CreateSessionApiRequest>,
574) -> impl IntoResponse {
575 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
576 return e.into_response();
577 }
578 if let Err(e) = check_admin_rate_limit(&state) {
579 return e.into_response();
580 }
581
582 state.admin_audit_log(
583 "create_session",
584 Some(req.agent_id),
585 &format!("intent={}", sanitize_for_log(&req.declared_intent)),
586 );
587
588 let agent = match state.registry.get_agent(req.agent_id).await {
592 Ok(a) => a,
593 Err(e) => {
594 tracing::warn!(
595 agent_id = %req.agent_id,
596 error = %e,
597 "session creation denied: agent not found"
598 );
599 return (
600 StatusCode::NOT_FOUND,
601 Json(ErrorResponse {
602 error: format!("agent {} not found", req.agent_id),
603 }),
604 )
605 .into_response();
606 }
607 };
608
609 if !agent.active {
610 tracing::warn!(
611 agent_id = %req.agent_id,
612 "session creation denied: agent is deactivated"
613 );
614 return (
615 StatusCode::BAD_REQUEST,
616 Json(ErrorResponse {
617 error: format!("agent {} is deactivated", req.agent_id),
618 }),
619 )
620 .into_response();
621 }
622
623 if let Some(expires_at) = agent.expires_at
624 && expires_at < Utc::now()
625 {
626 tracing::warn!(
627 agent_id = %req.agent_id,
628 expires_at = %expires_at,
629 "session creation denied: agent has expired"
630 );
631 return (
632 StatusCode::BAD_REQUEST,
633 Json(ErrorResponse {
634 error: format!("agent {} has expired", req.agent_id),
635 }),
636 )
637 .into_response();
638 }
639
640 if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
644 let active_count = state
645 .session_store
646 .count_active_for_agent(req.agent_id)
647 .await;
648 if active_count >= max_sessions {
649 tracing::warn!(
650 agent_id = %req.agent_id,
651 active = active_count,
652 max = max_sessions,
653 "session creation denied: per-agent concurrent session cap reached"
654 );
655 return (
656 StatusCode::TOO_MANY_REQUESTS,
657 Json(ErrorResponse {
658 error: format!(
659 "agent {} has too many concurrent sessions ({}/{})",
660 req.agent_id, active_count, max_sessions
661 ),
662 }),
663 )
664 .into_response();
665 }
666 }
667
668 if req.time_limit_secs <= 0 {
670 return (
671 StatusCode::BAD_REQUEST,
672 Json(ErrorResponse {
673 error: "time_limit_secs must be positive".into(),
674 }),
675 )
676 .into_response();
677 }
678 if req.time_limit_secs > 86400 {
680 return (
681 StatusCode::BAD_REQUEST,
682 Json(ErrorResponse {
683 error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
684 }),
685 )
686 .into_response();
687 }
688 if req.call_budget == 0 {
689 return (
690 StatusCode::BAD_REQUEST,
691 Json(ErrorResponse {
692 error: "call_budget must be positive".into(),
693 }),
694 )
695 .into_response();
696 }
697 if req.call_budget > 1_000_000 {
698 return (
699 StatusCode::BAD_REQUEST,
700 Json(ErrorResponse {
701 error: "call_budget cannot exceed 1000000".into(),
702 }),
703 )
704 .into_response();
705 }
706 if req.declared_intent.trim().is_empty() {
707 return (
708 StatusCode::BAD_REQUEST,
709 Json(ErrorResponse {
710 error: "declared_intent must not be empty".into(),
711 }),
712 )
713 .into_response();
714 }
715
716 if let Some(window) = req.rate_limit_window_secs {
721 if window == 0 {
722 return (
723 StatusCode::BAD_REQUEST,
724 Json(ErrorResponse {
725 error: "rate_limit_window_secs must be positive".into(),
726 }),
727 )
728 .into_response();
729 }
730 if window < 10 {
731 return (
732 StatusCode::BAD_REQUEST,
733 Json(ErrorResponse {
734 error: "rate_limit_window_secs must be at least 10 seconds".into(),
735 }),
736 )
737 .into_response();
738 }
739 if window > 3600 {
740 return (
741 StatusCode::BAD_REQUEST,
742 Json(ErrorResponse {
743 error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
744 }),
745 )
746 .into_response();
747 }
748 }
749
750 let create_req = CreateSessionRequest {
751 agent_id: req.agent_id,
752 delegation_chain_snapshot: req.delegation_chain_snapshot,
753 declared_intent: req.declared_intent,
754 authorized_tools: req.authorized_tools,
755 time_limit: chrono::Duration::seconds(req.time_limit_secs),
756 call_budget: req.call_budget,
757 rate_limit_per_minute: req.rate_limit_per_minute,
758 rate_limit_window_secs: req
759 .rate_limit_window_secs
760 .unwrap_or(state.default_rate_limit_window_secs),
761 data_sensitivity_ceiling: req.data_sensitivity_ceiling,
762 };
763
764 let session = state.session_store.create(create_req).await;
765
766 state.metrics.active_sessions.inc();
767
768 (
769 StatusCode::CREATED,
770 Json(CreateSessionResponse {
771 session_id: session.session_id,
772 declared_intent: session.declared_intent,
773 authorized_tools: session.authorized_tools,
774 call_budget: session.call_budget,
775 time_limit_secs: req.time_limit_secs,
776 }),
777 )
778 .into_response()
779}
780
781pub async fn get_session(
783 State(state): State<AppState>,
784 headers: HeaderMap,
785 Path(id): Path<Uuid>,
786) -> impl IntoResponse {
787 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
788 return e.into_response();
789 }
790 if let Err(e) = check_admin_rate_limit(&state) {
791 return e.into_response();
792 }
793
794 state.admin_audit_log("get_session", None, &format!("session_id={}", id));
795
796 let session = match state.session_store.get(id).await {
797 Ok(s) => s,
798 Err(e) => {
799 return (
800 StatusCode::NOT_FOUND,
801 Json(ErrorResponse {
802 error: e.to_string(),
803 }),
804 )
805 .into_response();
806 }
807 };
808
809 let now = Utc::now();
810 let expires_at = session.created_at + session.time_limit;
811 let seconds_remaining = (expires_at - now).num_seconds().max(0);
812 let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
813
814 let mut warnings = Vec::new();
815 let budget_pct_remaining = if session.call_budget > 0 {
816 (calls_remaining as f64 / session.call_budget as f64) * 100.0
817 } else {
818 0.0
819 };
820 let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
821 (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
822 } else {
823 0.0
824 };
825 if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
826 warnings.push(format!(
827 "budget low: {} of {} calls remaining",
828 calls_remaining, session.call_budget
829 ));
830 }
831 if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
832 warnings.push(format!("time low: {}s remaining", seconds_remaining));
833 }
834
835 let status_str = format!("{:?}", session.status).to_lowercase();
836 let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
837 .unwrap_or_default()
838 .as_str()
839 .unwrap_or("unknown")
840 .to_string();
841
842 (
843 StatusCode::OK,
844 Json(SessionStatusResponse {
845 session_id: session.session_id,
846 agent_id: session.agent_id,
847 status: status_str,
848 declared_intent: session.declared_intent,
849 authorized_tools: session.authorized_tools,
850 calls_made: session.calls_made,
851 call_budget: session.call_budget,
852 calls_remaining,
853 rate_limit_per_minute: session.rate_limit_per_minute,
854 data_sensitivity_ceiling: sensitivity_str,
855 created_at: session.created_at,
856 expires_at,
857 seconds_remaining,
858 warnings,
859 }),
860 )
861 .into_response()
862}
863
864#[derive(Debug, Serialize)]
866pub struct SessionCloseResponse {
867 pub session_id: Uuid,
868 pub status: String,
869 pub declared_intent: String,
870 pub total_calls: u64,
871 pub call_budget: u64,
872 pub budget_utilization_pct: f64,
873 pub time_used_secs: i64,
874 pub time_limit_secs: i64,
875 pub denied_attempts: u64,
877 pub anomalies_detected: u64,
879}
880
881pub async fn close_session(
883 State(state): State<AppState>,
884 headers: HeaderMap,
885 Path(id): Path<Uuid>,
886) -> impl IntoResponse {
887 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
888 return e.into_response();
889 }
890 if let Err(e) = check_admin_rate_limit(&state) {
891 return e.into_response();
892 }
893
894 state.admin_audit_log("close_session", None, &format!("session_id={}", id));
895
896 let session = match state.session_store.close(id).await {
897 Ok(s) => {
898 state.metrics.active_sessions.dec();
899 s
900 }
901 Err(e) => {
902 let status = match &e {
903 arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
904 arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
905 _ => StatusCode::INTERNAL_SERVER_ERROR,
906 };
907 return (
908 status,
909 Json(ErrorResponse {
910 error: e.to_string(),
911 }),
912 )
913 .into_response();
914 }
915 };
916
917 let now = Utc::now();
918 let time_used_secs = (now - session.created_at).num_seconds();
919 let budget_utilization_pct = if session.call_budget > 0 {
920 (session.calls_made as f64 / session.call_budget as f64) * 100.0
921 } else {
922 0.0
923 };
924
925 let session_id_str = id.to_string();
927 let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
928 let stats = sink.stats().stats_for_session(&session_id_str).await;
929 sink.stats().remove_session(&session_id_str).await;
931 (stats.denied_count, stats.anomaly_count)
932 } else {
933 (0, 0)
934 };
935
936 (
937 StatusCode::OK,
938 Json(SessionCloseResponse {
939 session_id: session.session_id,
940 status: "closed".into(),
941 declared_intent: session.declared_intent,
942 total_calls: session.calls_made,
943 call_budget: session.call_budget,
944 budget_utilization_pct,
945 time_used_secs,
946 time_limit_secs: session.time_limit.num_seconds(),
947 denied_attempts,
948 anomalies_detected,
949 }),
950 )
951 .into_response()
952}
953
954#[derive(Debug, Deserialize)]
956pub struct PolicyExplainRequest {
957 pub agent_id: Uuid,
958 pub declared_intent: String,
959 pub tool: String,
960 #[serde(default)]
961 pub arguments: Option<serde_json::Value>,
962 #[serde(default)]
963 pub principal: Option<String>,
964}
965
966#[derive(Debug, Serialize)]
968pub struct PolicyExplainResponse {
969 pub decision: String,
970 pub matched_policy: Option<String>,
971 pub reason: Option<String>,
972 pub trace: Vec<PolicyTrace>,
973}
974
975pub async fn explain_policy(
977 State(state): State<AppState>,
978 headers: HeaderMap,
979 Json(req): Json<PolicyExplainRequest>,
980) -> impl IntoResponse {
981 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
982 return e.into_response();
983 }
984 if let Err(e) = check_admin_rate_limit(&state) {
985 return e.into_response();
986 }
987
988 state.admin_audit_log(
989 "explain_policy",
990 Some(req.agent_id),
991 &format!("tool={}", sanitize_for_log(&req.tool)),
992 );
993
994 let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
995 let policy_config = match policy_snapshot.as_ref() {
996 Some(pc) => pc,
997 None => {
998 return (
999 StatusCode::OK,
1000 Json(PolicyExplainResponse {
1001 decision: "allow".into(),
1002 matched_policy: None,
1003 reason: Some("no policies configured".into()),
1004 trace: vec![],
1005 }),
1006 )
1007 .into_response();
1008 }
1009 };
1010
1011 let agent = match state.registry.get_agent(req.agent_id).await {
1012 Ok(a) => a,
1013 Err(e) => {
1014 return (
1015 StatusCode::NOT_FOUND,
1016 Json(ErrorResponse {
1017 error: format!("agent not found: {e}"),
1018 }),
1019 )
1020 .into_response();
1021 }
1022 };
1023
1024 let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1025
1026 let eval_ctx = EvalContext {
1027 agent,
1028 delegation_chain: vec![],
1029 declared_intent: req.declared_intent,
1030 principal_sub: principal,
1031 principal_groups: vec![],
1032 };
1033
1034 let mcp_req = McpRequest {
1035 id: None,
1036 method: "tools/call".into(),
1037 tool_name: Some(req.tool.clone()),
1038 arguments: req.arguments,
1039 resource_uri: None,
1040 };
1041
1042 let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1043
1044 let (decision, matched_policy, reason) = match result.decision {
1045 arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1046 arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1047 arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1048 arbiter_policy::Decision::Annotate { policy_id, reason } => {
1049 ("annotate".into(), Some(policy_id), Some(reason))
1050 }
1051 };
1052
1053 (
1054 StatusCode::OK,
1055 Json(PolicyExplainResponse {
1056 decision,
1057 matched_policy,
1058 reason,
1059 trace: result.trace,
1060 }),
1061 )
1062 .into_response()
1063}
1064
1065#[derive(Debug, Deserialize)]
1067pub struct PolicyValidateRequest {
1068 pub policy_toml: String,
1069}
1070
1071pub async fn validate_policy(
1073 State(state): State<AppState>,
1074 headers: HeaderMap,
1075 Json(req): Json<PolicyValidateRequest>,
1076) -> impl IntoResponse {
1077 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1078 return e.into_response();
1079 }
1080 if let Err(e) = check_admin_rate_limit(&state) {
1081 return e.into_response();
1082 }
1083
1084 state.admin_audit_log("validate_policy", None, "");
1085
1086 let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1087 (StatusCode::OK, Json(result)).into_response()
1088}
1089
1090pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1092 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1093 return e.into_response();
1094 }
1095 if let Err(e) = check_admin_rate_limit(&state) {
1096 return e.into_response();
1097 }
1098
1099 state.admin_audit_log("reload_policy", None, "");
1100
1101 let path = match &state.policy_file_path {
1102 Some(p) => p.clone(),
1103 None => {
1104 return (
1105 StatusCode::BAD_REQUEST,
1106 Json(ErrorResponse {
1107 error: "no policy file configured; policies are inline or absent".into(),
1108 }),
1109 )
1110 .into_response();
1111 }
1112 };
1113
1114 let contents = match std::fs::read_to_string(&path) {
1115 Ok(c) => c,
1116 Err(e) => {
1117 tracing::error!(path = %path, error = %e, "failed to read policy file");
1119 return (
1120 StatusCode::INTERNAL_SERVER_ERROR,
1121 Json(ErrorResponse {
1122 error: "failed to read policy file".into(),
1123 }),
1124 )
1125 .into_response();
1126 }
1127 };
1128
1129 let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1130 Ok(pc) => pc,
1131 Err(e) => {
1132 tracing::error!(error = %e, "failed to parse policy file");
1134 return (
1135 StatusCode::BAD_REQUEST,
1136 Json(ErrorResponse {
1137 error: "failed to parse policy file".into(),
1138 }),
1139 )
1140 .into_response();
1141 }
1142 };
1143
1144 let policy_count = new_config.policies.len();
1145 let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1146
1147 tracing::warn!(
1149 path,
1150 policy_count,
1151 "AUDIT: policy configuration reloaded via admin API"
1152 );
1153 (
1154 StatusCode::OK,
1155 Json(serde_json::json!({
1156 "status": "ok",
1157 "reloaded": true,
1158 "policies_loaded": policy_count,
1159 "policy_count": policy_count,
1160 "file": path,
1161 })),
1162 )
1163 .into_response()
1164}
1165
1166pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1168 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1169 return e.into_response();
1170 }
1171 if let Err(e) = check_admin_rate_limit(&state) {
1172 return e.into_response();
1173 }
1174
1175 state.admin_audit_log("policy_schema", None, "");
1176
1177 let schema = serde_json::json!({
1178 "$schema": "https://json-schema.org/draft/2020-12/schema",
1179 "title": "Arbiter Policy Configuration",
1180 "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1181 "type": "object",
1182 "properties": {
1183 "policies": {
1184 "type": "array",
1185 "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1186 "items": {
1187 "$ref": "#/$defs/Policy"
1188 }
1189 }
1190 },
1191 "$defs": {
1192 "Policy": {
1193 "type": "object",
1194 "description": "A single authorization policy rule.",
1195 "required": ["id", "effect"],
1196 "properties": {
1197 "id": {
1198 "type": "string",
1199 "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1200 },
1201 "effect": {
1202 "$ref": "#/$defs/Effect"
1203 },
1204 "agent_match": {
1205 "$ref": "#/$defs/AgentMatch"
1206 },
1207 "principal_match": {
1208 "$ref": "#/$defs/PrincipalMatch"
1209 },
1210 "intent_match": {
1211 "$ref": "#/$defs/IntentMatch"
1212 },
1213 "allowed_tools": {
1214 "type": "array",
1215 "items": { "type": "string" },
1216 "description": "Tools this policy applies to. Empty array means 'all tools'."
1217 },
1218 "parameter_constraints": {
1219 "type": "array",
1220 "items": { "$ref": "#/$defs/ParameterConstraint" },
1221 "description": "Per-parameter bounds on tool arguments."
1222 },
1223 "priority": {
1224 "type": "integer",
1225 "default": 0,
1226 "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1227 }
1228 }
1229 },
1230 "Effect": {
1231 "type": "string",
1232 "enum": ["allow", "deny", "escalate"],
1233 "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1234 },
1235 "AgentMatch": {
1236 "type": "object",
1237 "description": "Criteria for matching the requesting agent. All specified fields must match.",
1238 "properties": {
1239 "agent_id": {
1240 "type": "string",
1241 "format": "uuid",
1242 "description": "Match a specific agent by UUID. Specificity: +100."
1243 },
1244 "trust_level": {
1245 "$ref": "#/$defs/TrustLevel"
1246 },
1247 "capabilities": {
1248 "type": "array",
1249 "items": { "type": "string" },
1250 "description": "Agent must have all listed capabilities. Specificity: +25 each."
1251 }
1252 }
1253 },
1254 "TrustLevel": {
1255 "type": "string",
1256 "enum": ["untrusted", "basic", "verified", "trusted"],
1257 "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1258 },
1259 "PrincipalMatch": {
1260 "type": "object",
1261 "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1262 "properties": {
1263 "sub": {
1264 "type": "string",
1265 "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1266 },
1267 "groups": {
1268 "type": "array",
1269 "items": { "type": "string" },
1270 "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1271 }
1272 }
1273 },
1274 "IntentMatch": {
1275 "type": "object",
1276 "description": "Criteria for matching the session's declared intent.",
1277 "properties": {
1278 "keywords": {
1279 "type": "array",
1280 "items": { "type": "string" },
1281 "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1282 },
1283 "regex": {
1284 "type": "string",
1285 "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1286 }
1287 }
1288 },
1289 "ParameterConstraint": {
1290 "type": "object",
1291 "description": "A constraint on a tool call parameter.",
1292 "required": ["key"],
1293 "properties": {
1294 "key": {
1295 "type": "string",
1296 "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1297 },
1298 "max_value": {
1299 "type": "number",
1300 "description": "Maximum numeric value allowed."
1301 },
1302 "min_value": {
1303 "type": "number",
1304 "description": "Minimum numeric value allowed."
1305 },
1306 "allowed_values": {
1307 "type": "array",
1308 "items": { "type": "string" },
1309 "description": "Whitelist of allowed string values."
1310 }
1311 }
1312 },
1313 "DataSensitivity": {
1314 "type": "string",
1315 "enum": ["public", "internal", "confidential", "restricted"],
1316 "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1317 }
1318 }
1319 });
1320
1321 (StatusCode::OK, Json(schema)).into_response()
1322}
1323
1324pub fn router(state: AppState) -> axum::Router {
1326 axum::Router::new()
1327 .route("/agents", axum::routing::post(register_agent))
1328 .route("/agents", axum::routing::get(list_agents))
1329 .route("/agents/{id}", axum::routing::get(get_agent))
1330 .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1331 .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1332 .route(
1333 "/agents/{id}/delegations",
1334 axum::routing::get(list_delegations),
1335 )
1336 .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1337 .route("/sessions", axum::routing::post(create_session))
1338 .route("/sessions/{id}", axum::routing::get(get_session))
1339 .route("/sessions/{id}/close", axum::routing::post(close_session))
1340 .route("/policy/explain", axum::routing::post(explain_policy))
1341 .route("/policy/validate", axum::routing::post(validate_policy))
1342 .route("/policy/reload", axum::routing::post(reload_policy))
1343 .route("/admin/policies/reload", axum::routing::post(reload_policy))
1344 .route("/policy/schema", axum::routing::get(policy_schema))
1345 .with_state(state)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 use super::*;
1351
1352 #[test]
1354 fn sanitize_for_log_strips_newlines() {
1355 assert_eq!(
1356 sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1357 "read config\\nINFO ADMIN_AUDIT: fake"
1358 );
1359 }
1360
1361 #[test]
1362 fn sanitize_for_log_strips_carriage_return() {
1363 assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1364 }
1365
1366 #[test]
1367 fn sanitize_for_log_strips_tabs() {
1368 assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1369 }
1370
1371 #[test]
1372 fn sanitize_for_log_preserves_normal_text() {
1373 assert_eq!(
1374 sanitize_for_log("read configuration files"),
1375 "read configuration files"
1376 );
1377 }
1378}