1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21 pub owner: String,
22 pub model: String,
23 #[serde(default)]
24 pub capabilities: Vec<String>,
25 #[serde(default = "default_trust_level")]
26 pub trust_level: TrustLevel,
27 #[serde(default)]
28 pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32 TrustLevel::Untrusted
33}
34
35#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38 pub agent_id: Uuid,
39 pub token: String,
40}
41
42#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45 pub to: Uuid,
46 pub scopes: Vec<String>,
47 #[serde(default)]
48 pub expires_at: Option<DateTime<Utc>>,
49}
50
51#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54 #[serde(default)]
55 pub expiry_seconds: Option<i64>,
56}
57
58#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61 pub agent_id: Uuid,
62 #[serde(default)]
63 pub delegation_chain_snapshot: Vec<String>,
64 pub declared_intent: String,
65 #[serde(default)]
66 pub authorized_tools: Vec<String>,
67 #[serde(default = "default_time_limit")]
68 pub time_limit_secs: i64,
69 #[serde(default = "default_call_budget")]
70 pub call_budget: u64,
71 #[serde(default)]
72 pub rate_limit_per_minute: Option<u64>,
73 #[serde(default)]
76 pub rate_limit_window_secs: Option<u64>,
77 #[serde(default = "default_data_sensitivity")]
78 pub data_sensitivity_ceiling: DataSensitivity,
79}
80
81fn default_time_limit() -> i64 {
82 3600
83}
84
85fn default_call_budget() -> u64 {
86 1000
87}
88
89fn default_data_sensitivity() -> DataSensitivity {
90 DataSensitivity::Internal
91}
92
93#[derive(Debug, Serialize)]
95pub struct CreateSessionResponse {
96 pub session_id: Uuid,
97 pub declared_intent: String,
98 pub authorized_tools: Vec<String>,
99 pub call_budget: u64,
100 pub time_limit_secs: i64,
101}
102
103#[derive(Debug, Serialize)]
105pub struct SessionStatusResponse {
106 pub session_id: Uuid,
107 pub agent_id: Uuid,
108 pub status: String,
109 pub declared_intent: String,
110 pub authorized_tools: Vec<String>,
111 pub calls_made: u64,
112 pub call_budget: u64,
113 pub calls_remaining: u64,
114 pub rate_limit_per_minute: Option<u64>,
115 pub data_sensitivity_ceiling: String,
116 pub created_at: DateTime<Utc>,
117 pub expires_at: DateTime<Utc>,
118 pub seconds_remaining: i64,
119 pub warnings: Vec<String>,
120}
121
122#[derive(Debug, Serialize)]
124pub struct TokenResponse {
125 pub token: String,
126 pub expires_in: i64,
127}
128
129#[derive(Debug, Serialize)]
131pub struct ErrorResponse {
132 pub error: String,
133}
134
135fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
145 use subtle::ConstantTimeEq;
146 let max_len = std::cmp::max(a.len(), b.len());
150 let mut a_padded = vec![0u8; max_len];
151 let mut b_padded = vec![0u8; max_len];
152 a_padded[..a.len()].copy_from_slice(a);
153 b_padded[..b.len()].copy_from_slice(b);
154 let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
155 let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
156 bytes_equal & len_equal
157}
158
159fn sanitize_for_log(input: &str) -> String {
164 input
165 .replace('\n', "\\n")
166 .replace('\r', "\\r")
167 .replace('\t', "\\t")
168}
169
170fn validate_admin_key(
174 headers: &HeaderMap,
175 expected: &str,
176) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
177 let key = headers
178 .get("x-api-key")
179 .and_then(|v| v.to_str().ok())
180 .ok_or_else(|| {
181 (
182 StatusCode::UNAUTHORIZED,
183 Json(ErrorResponse {
184 error: "missing x-api-key header".into(),
185 }),
186 )
187 })?;
188
189 if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
193 return Err((
194 StatusCode::UNAUTHORIZED,
195 Json(ErrorResponse {
196 error: "invalid API key".into(),
197 }),
198 ));
199 }
200
201 Ok(())
202}
203
204fn check_admin_rate_limit(state: &AppState) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
210 if !state.admin_rate_limiter.check_rate_limit() {
211 tracing::warn!("ADMIN_AUDIT: rate limit exceeded on admin API");
212 return Err((
213 StatusCode::TOO_MANY_REQUESTS,
214 Json(ErrorResponse {
215 error: "admin API rate limit exceeded, try again later".into(),
216 }),
217 ));
218 }
219 Ok(())
220}
221
222pub async fn register_agent(
224 State(state): State<AppState>,
225 headers: HeaderMap,
226 Json(req): Json<RegisterAgentRequest>,
227) -> impl IntoResponse {
228 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
229 return e.into_response();
230 }
231 if let Err(e) = check_admin_rate_limit(&state) {
232 return e.into_response();
233 }
234
235 match state
236 .registry
237 .register_agent(
238 req.owner.clone(),
239 req.model,
240 req.capabilities,
241 req.trust_level,
242 req.expires_at,
243 )
244 .await
245 {
246 Ok(agent) => {
247 let token = match issue_token(agent.id, &req.owner, &state.token_config) {
248 Ok(t) => t,
249 Err(e) => {
250 tracing::error!(error = %e, "failed to issue token");
251 return (
252 StatusCode::INTERNAL_SERVER_ERROR,
253 Json(ErrorResponse {
254 error: "token issuance failed".into(),
255 }),
256 )
257 .into_response();
258 }
259 };
260
261 state.metrics.registered_agents.inc();
262 state.admin_audit_log(
263 "register_agent",
264 Some(agent.id),
265 &format!("owner={}", sanitize_for_log(&req.owner)),
266 );
267
268 (
269 StatusCode::CREATED,
270 Json(RegisterAgentResponse {
271 agent_id: agent.id,
272 token,
273 }),
274 )
275 .into_response()
276 }
277 Err(e) => {
278 tracing::error!(error = %e, "failed to register agent");
279 (
280 StatusCode::INTERNAL_SERVER_ERROR,
281 Json(ErrorResponse {
282 error: e.to_string(),
283 }),
284 )
285 .into_response()
286 }
287 }
288}
289
290pub async fn get_agent(
292 State(state): State<AppState>,
293 headers: HeaderMap,
294 Path(id): Path<Uuid>,
295) -> impl IntoResponse {
296 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
297 return e.into_response();
298 }
299 if let Err(e) = check_admin_rate_limit(&state) {
300 return e.into_response();
301 }
302
303 state.admin_audit_log("get_agent", Some(id), "");
304
305 match state.registry.get_agent(id).await {
306 Ok(agent) => match serde_json::to_value(&agent) {
307 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
308 Err(e) => (
309 StatusCode::INTERNAL_SERVER_ERROR,
310 Json(serde_json::json!({"error": e.to_string()})),
311 )
312 .into_response(),
313 },
314 Err(e) => (
315 StatusCode::NOT_FOUND,
316 Json(ErrorResponse {
317 error: e.to_string(),
318 }),
319 )
320 .into_response(),
321 }
322}
323
324pub async fn delegate_agent(
326 State(state): State<AppState>,
327 headers: HeaderMap,
328 Path(from_id): Path<Uuid>,
329 Json(req): Json<DelegateRequest>,
330) -> impl IntoResponse {
331 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
332 return e.into_response();
333 }
334 if let Err(e) = check_admin_rate_limit(&state) {
335 return e.into_response();
336 }
337
338 state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
339
340 match state
341 .registry
342 .create_delegation(from_id, req.to, req.scopes, req.expires_at)
343 .await
344 {
345 Ok(link) => match serde_json::to_value(&link) {
346 Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
347 Err(e) => (
348 StatusCode::INTERNAL_SERVER_ERROR,
349 Json(serde_json::json!({"error": e.to_string()})),
350 )
351 .into_response(),
352 },
353 Err(e) => {
354 let status = match &e {
355 arbiter_identity::IdentityError::DelegationSourceNotFound(_)
356 | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
357 StatusCode::NOT_FOUND
358 }
359 arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
360 | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
361 | arbiter_identity::IdentityError::CircularDelegation { .. } => {
362 StatusCode::BAD_REQUEST
363 }
364 _ => StatusCode::INTERNAL_SERVER_ERROR,
365 };
366 (
367 status,
368 Json(ErrorResponse {
369 error: e.to_string(),
370 }),
371 )
372 .into_response()
373 }
374 }
375}
376
377pub async fn list_delegations(
379 State(state): State<AppState>,
380 headers: HeaderMap,
381 Path(id): Path<Uuid>,
382) -> impl IntoResponse {
383 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
384 return e.into_response();
385 }
386 if let Err(e) = check_admin_rate_limit(&state) {
387 return e.into_response();
388 }
389
390 state.admin_audit_log("list_delegations", Some(id), "");
391
392 if let Err(e) = state.registry.get_agent(id).await {
394 return (
395 StatusCode::NOT_FOUND,
396 Json(ErrorResponse {
397 error: e.to_string(),
398 }),
399 )
400 .into_response();
401 }
402
403 let outgoing = state.registry.list_delegations_from(id).await;
404 let incoming = state.registry.list_delegations_to(id).await;
405
406 (
407 StatusCode::OK,
408 Json(serde_json::json!({
409 "agent_id": id,
410 "outgoing": outgoing,
411 "incoming": incoming,
412 "outgoing_count": outgoing.len(),
413 "incoming_count": incoming.len(),
414 })),
415 )
416 .into_response()
417}
418
419pub async fn deactivate_agent(
421 State(state): State<AppState>,
422 headers: HeaderMap,
423 Path(id): Path<Uuid>,
424) -> impl IntoResponse {
425 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
426 return e.into_response();
427 }
428 if let Err(e) = check_admin_rate_limit(&state) {
429 return e.into_response();
430 }
431
432 state.admin_audit_log("deactivate_agent", Some(id), "cascade");
433
434 match state.registry.cascade_deactivate(id).await {
435 Ok(deactivated) => {
436 let mut total_sessions_closed = 0usize;
438 for &agent_id in &deactivated {
439 let closed = state.session_store.close_sessions_for_agent(agent_id).await;
440 total_sessions_closed += closed;
441 state.metrics.registered_agents.dec();
442 }
443 (
444 StatusCode::OK,
445 Json(serde_json::json!({
446 "deactivated": deactivated,
447 "count": deactivated.len(),
448 "sessions_closed": total_sessions_closed,
449 })),
450 )
451 .into_response()
452 }
453 Err(e) => {
454 let status = match &e {
455 arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
456 arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
457 _ => StatusCode::INTERNAL_SERVER_ERROR,
458 };
459 (
460 status,
461 Json(ErrorResponse {
462 error: e.to_string(),
463 }),
464 )
465 .into_response()
466 }
467 }
468}
469
470pub async fn issue_agent_token(
472 State(state): State<AppState>,
473 headers: HeaderMap,
474 Path(id): Path<Uuid>,
475 body: Option<Json<TokenRequest>>,
476) -> impl IntoResponse {
477 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
478 return e.into_response();
479 }
480 if let Err(e) = check_admin_rate_limit(&state) {
481 return e.into_response();
482 }
483
484 state.admin_audit_log("issue_agent_token", Some(id), "");
485
486 let agent = match state.registry.get_agent(id).await {
487 Ok(a) => a,
488 Err(e) => {
489 return (
490 StatusCode::NOT_FOUND,
491 Json(ErrorResponse {
492 error: e.to_string(),
493 }),
494 )
495 .into_response();
496 }
497 };
498
499 if !agent.active {
500 return (
501 StatusCode::BAD_REQUEST,
502 Json(ErrorResponse {
503 error: format!("agent {} is deactivated", id),
504 }),
505 )
506 .into_response();
507 }
508
509 let mut config = state.token_config.clone();
510 if let Some(Json(req)) = body
511 && let Some(expiry) = req.expiry_seconds
512 {
513 if expiry <= 0 {
516 return (
517 StatusCode::BAD_REQUEST,
518 Json(ErrorResponse {
519 error: "expiry_seconds must be positive".into(),
520 }),
521 )
522 .into_response();
523 }
524 config.expiry_seconds = expiry;
525 }
526
527 match issue_token(agent.id, &agent.owner, &config) {
528 Ok(token) => (
529 StatusCode::OK,
530 Json(TokenResponse {
531 token,
532 expires_in: config.expiry_seconds,
533 }),
534 )
535 .into_response(),
536 Err(e) => {
537 tracing::error!(error = %e, "failed to issue token");
538 (
539 StatusCode::INTERNAL_SERVER_ERROR,
540 Json(ErrorResponse {
541 error: "token issuance failed".into(),
542 }),
543 )
544 .into_response()
545 }
546 }
547}
548
549pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
551 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
552 return e.into_response();
553 }
554 if let Err(e) = check_admin_rate_limit(&state) {
555 return e.into_response();
556 }
557
558 state.admin_audit_log("list_agents", None, "");
559
560 let agents = state.registry.list_agents().await;
561 match serde_json::to_value(&agents) {
562 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
563 Err(e) => (
564 StatusCode::INTERNAL_SERVER_ERROR,
565 Json(serde_json::json!({"error": e.to_string()})),
566 )
567 .into_response(),
568 }
569}
570
571pub async fn create_session(
573 State(state): State<AppState>,
574 headers: HeaderMap,
575 Json(req): Json<CreateSessionApiRequest>,
576) -> impl IntoResponse {
577 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
578 return e.into_response();
579 }
580 if let Err(e) = check_admin_rate_limit(&state) {
581 return e.into_response();
582 }
583
584 state.admin_audit_log(
585 "create_session",
586 Some(req.agent_id),
587 &format!("intent={}", sanitize_for_log(&req.declared_intent)),
588 );
589
590 let agent = match state.registry.get_agent(req.agent_id).await {
594 Ok(a) => a,
595 Err(e) => {
596 tracing::warn!(
597 agent_id = %req.agent_id,
598 error = %e,
599 "session creation denied: agent not found"
600 );
601 return (
602 StatusCode::NOT_FOUND,
603 Json(ErrorResponse {
604 error: format!("agent {} not found", req.agent_id),
605 }),
606 )
607 .into_response();
608 }
609 };
610
611 if !agent.active {
612 tracing::warn!(
613 agent_id = %req.agent_id,
614 "session creation denied: agent is deactivated"
615 );
616 return (
617 StatusCode::BAD_REQUEST,
618 Json(ErrorResponse {
619 error: format!("agent {} is deactivated", req.agent_id),
620 }),
621 )
622 .into_response();
623 }
624
625 if let Some(expires_at) = agent.expires_at
626 && expires_at < Utc::now()
627 {
628 tracing::warn!(
629 agent_id = %req.agent_id,
630 expires_at = %expires_at,
631 "session creation denied: agent has expired"
632 );
633 return (
634 StatusCode::BAD_REQUEST,
635 Json(ErrorResponse {
636 error: format!("agent {} has expired", req.agent_id),
637 }),
638 )
639 .into_response();
640 }
641
642 if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
646 let active_count = state
647 .session_store
648 .count_active_for_agent(req.agent_id)
649 .await;
650 if active_count >= max_sessions {
651 tracing::warn!(
652 agent_id = %req.agent_id,
653 active = active_count,
654 max = max_sessions,
655 "session creation denied: per-agent concurrent session cap reached"
656 );
657 return (
658 StatusCode::TOO_MANY_REQUESTS,
659 Json(ErrorResponse {
660 error: format!(
661 "agent {} has too many concurrent sessions ({}/{})",
662 req.agent_id, active_count, max_sessions
663 ),
664 }),
665 )
666 .into_response();
667 }
668 }
669
670 if req.time_limit_secs <= 0 {
672 return (
673 StatusCode::BAD_REQUEST,
674 Json(ErrorResponse {
675 error: "time_limit_secs must be positive".into(),
676 }),
677 )
678 .into_response();
679 }
680 if req.time_limit_secs > 86400 {
682 return (
683 StatusCode::BAD_REQUEST,
684 Json(ErrorResponse {
685 error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
686 }),
687 )
688 .into_response();
689 }
690 if req.call_budget == 0 {
691 return (
692 StatusCode::BAD_REQUEST,
693 Json(ErrorResponse {
694 error: "call_budget must be positive".into(),
695 }),
696 )
697 .into_response();
698 }
699 if req.call_budget > 1_000_000 {
700 return (
701 StatusCode::BAD_REQUEST,
702 Json(ErrorResponse {
703 error: "call_budget cannot exceed 1000000".into(),
704 }),
705 )
706 .into_response();
707 }
708 if req.declared_intent.trim().is_empty() {
709 return (
710 StatusCode::BAD_REQUEST,
711 Json(ErrorResponse {
712 error: "declared_intent must not be empty".into(),
713 }),
714 )
715 .into_response();
716 }
717
718 if let Some(window) = req.rate_limit_window_secs {
723 if window == 0 {
724 return (
725 StatusCode::BAD_REQUEST,
726 Json(ErrorResponse {
727 error: "rate_limit_window_secs must be positive".into(),
728 }),
729 )
730 .into_response();
731 }
732 if window < 10 {
733 return (
734 StatusCode::BAD_REQUEST,
735 Json(ErrorResponse {
736 error: "rate_limit_window_secs must be at least 10 seconds".into(),
737 }),
738 )
739 .into_response();
740 }
741 if window > 3600 {
742 return (
743 StatusCode::BAD_REQUEST,
744 Json(ErrorResponse {
745 error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
746 }),
747 )
748 .into_response();
749 }
750 }
751
752 if req.authorized_tools.is_empty() {
753 tracing::warn!(
754 agent_id = %req.agent_id,
755 "session created with empty authorized_tools; all tools will be permitted. \
756 Specify an explicit tool allowlist for least-privilege enforcement."
757 );
758 }
759
760 let create_req = CreateSessionRequest {
761 agent_id: req.agent_id,
762 delegation_chain_snapshot: req.delegation_chain_snapshot,
763 declared_intent: req.declared_intent,
764 authorized_tools: req.authorized_tools,
765 time_limit: chrono::Duration::seconds(req.time_limit_secs),
766 call_budget: req.call_budget,
767 rate_limit_per_minute: req.rate_limit_per_minute,
768 rate_limit_window_secs: req
769 .rate_limit_window_secs
770 .unwrap_or(state.default_rate_limit_window_secs),
771 data_sensitivity_ceiling: req.data_sensitivity_ceiling,
772 };
773
774 let session = state.session_store.create(create_req).await;
775
776 state.metrics.active_sessions.inc();
777
778 (
779 StatusCode::CREATED,
780 Json(CreateSessionResponse {
781 session_id: session.session_id,
782 declared_intent: session.declared_intent,
783 authorized_tools: session.authorized_tools,
784 call_budget: session.call_budget,
785 time_limit_secs: req.time_limit_secs,
786 }),
787 )
788 .into_response()
789}
790
791pub async fn get_session(
793 State(state): State<AppState>,
794 headers: HeaderMap,
795 Path(id): Path<Uuid>,
796) -> impl IntoResponse {
797 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
798 return e.into_response();
799 }
800 if let Err(e) = check_admin_rate_limit(&state) {
801 return e.into_response();
802 }
803
804 state.admin_audit_log("get_session", None, &format!("session_id={}", id));
805
806 let session = match state.session_store.get(id).await {
807 Ok(s) => s,
808 Err(e) => {
809 return (
810 StatusCode::NOT_FOUND,
811 Json(ErrorResponse {
812 error: e.to_string(),
813 }),
814 )
815 .into_response();
816 }
817 };
818
819 let now = Utc::now();
820 let expires_at = session.created_at + session.time_limit;
821 let seconds_remaining = (expires_at - now).num_seconds().max(0);
822 let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
823
824 let mut warnings = Vec::new();
825 let budget_pct_remaining = if session.call_budget > 0 {
826 (calls_remaining as f64 / session.call_budget as f64) * 100.0
827 } else {
828 0.0
829 };
830 let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
831 (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
832 } else {
833 0.0
834 };
835 if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
836 warnings.push(format!(
837 "budget low: {} of {} calls remaining",
838 calls_remaining, session.call_budget
839 ));
840 }
841 if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
842 warnings.push(format!("time low: {}s remaining", seconds_remaining));
843 }
844
845 let status_str = format!("{:?}", session.status).to_lowercase();
846 let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
847 .unwrap_or_default()
848 .as_str()
849 .unwrap_or("unknown")
850 .to_string();
851
852 (
853 StatusCode::OK,
854 Json(SessionStatusResponse {
855 session_id: session.session_id,
856 agent_id: session.agent_id,
857 status: status_str,
858 declared_intent: session.declared_intent,
859 authorized_tools: session.authorized_tools,
860 calls_made: session.calls_made,
861 call_budget: session.call_budget,
862 calls_remaining,
863 rate_limit_per_minute: session.rate_limit_per_minute,
864 data_sensitivity_ceiling: sensitivity_str,
865 created_at: session.created_at,
866 expires_at,
867 seconds_remaining,
868 warnings,
869 }),
870 )
871 .into_response()
872}
873
874#[derive(Debug, Serialize)]
876pub struct SessionCloseResponse {
877 pub session_id: Uuid,
878 pub status: String,
879 pub declared_intent: String,
880 pub total_calls: u64,
881 pub call_budget: u64,
882 pub budget_utilization_pct: f64,
883 pub time_used_secs: i64,
884 pub time_limit_secs: i64,
885 pub denied_attempts: u64,
887 pub anomalies_detected: u64,
889}
890
891pub async fn close_session(
893 State(state): State<AppState>,
894 headers: HeaderMap,
895 Path(id): Path<Uuid>,
896) -> impl IntoResponse {
897 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
898 return e.into_response();
899 }
900 if let Err(e) = check_admin_rate_limit(&state) {
901 return e.into_response();
902 }
903
904 state.admin_audit_log("close_session", None, &format!("session_id={}", id));
905
906 let session = match state.session_store.close(id).await {
907 Ok(s) => {
908 state.metrics.active_sessions.dec();
909 s
910 }
911 Err(e) => {
912 let status = match &e {
913 arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
914 arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
915 _ => StatusCode::INTERNAL_SERVER_ERROR,
916 };
917 return (
918 status,
919 Json(ErrorResponse {
920 error: e.to_string(),
921 }),
922 )
923 .into_response();
924 }
925 };
926
927 let now = Utc::now();
928 let time_used_secs = (now - session.created_at).num_seconds();
929 let budget_utilization_pct = if session.call_budget > 0 {
930 (session.calls_made as f64 / session.call_budget as f64) * 100.0
931 } else {
932 0.0
933 };
934
935 let session_id_str = id.to_string();
937 let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
938 let stats = sink.stats().stats_for_session(&session_id_str).await;
939 sink.stats().remove_session(&session_id_str).await;
941 (stats.denied_count, stats.anomaly_count)
942 } else {
943 (0, 0)
944 };
945
946 (
947 StatusCode::OK,
948 Json(SessionCloseResponse {
949 session_id: session.session_id,
950 status: "closed".into(),
951 declared_intent: session.declared_intent,
952 total_calls: session.calls_made,
953 call_budget: session.call_budget,
954 budget_utilization_pct,
955 time_used_secs,
956 time_limit_secs: session.time_limit.num_seconds(),
957 denied_attempts,
958 anomalies_detected,
959 }),
960 )
961 .into_response()
962}
963
964#[derive(Debug, Deserialize)]
966pub struct PolicyExplainRequest {
967 pub agent_id: Uuid,
968 pub declared_intent: String,
969 pub tool: String,
970 #[serde(default)]
971 pub arguments: Option<serde_json::Value>,
972 #[serde(default)]
973 pub principal: Option<String>,
974}
975
976#[derive(Debug, Serialize)]
978pub struct PolicyExplainResponse {
979 pub decision: String,
980 pub matched_policy: Option<String>,
981 pub reason: Option<String>,
982 pub trace: Vec<PolicyTrace>,
983}
984
985pub async fn explain_policy(
987 State(state): State<AppState>,
988 headers: HeaderMap,
989 Json(req): Json<PolicyExplainRequest>,
990) -> impl IntoResponse {
991 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
992 return e.into_response();
993 }
994 if let Err(e) = check_admin_rate_limit(&state) {
995 return e.into_response();
996 }
997
998 state.admin_audit_log(
999 "explain_policy",
1000 Some(req.agent_id),
1001 &format!("tool={}", sanitize_for_log(&req.tool)),
1002 );
1003
1004 let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
1005 let policy_config = match policy_snapshot.as_ref() {
1006 Some(pc) => pc,
1007 None => {
1008 return (
1009 StatusCode::OK,
1010 Json(PolicyExplainResponse {
1011 decision: "allow".into(),
1012 matched_policy: None,
1013 reason: Some("no policies configured".into()),
1014 trace: vec![],
1015 }),
1016 )
1017 .into_response();
1018 }
1019 };
1020
1021 let agent = match state.registry.get_agent(req.agent_id).await {
1022 Ok(a) => a,
1023 Err(e) => {
1024 return (
1025 StatusCode::NOT_FOUND,
1026 Json(ErrorResponse {
1027 error: format!("agent not found: {e}"),
1028 }),
1029 )
1030 .into_response();
1031 }
1032 };
1033
1034 let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1035
1036 let eval_ctx = EvalContext {
1037 agent,
1038 delegation_chain: vec![],
1039 declared_intent: req.declared_intent,
1040 principal_sub: principal,
1041 principal_groups: vec![],
1042 };
1043
1044 let mcp_req = McpRequest {
1045 id: None,
1046 method: "tools/call".into(),
1047 tool_name: Some(req.tool.clone()),
1048 arguments: req.arguments,
1049 resource_uri: None,
1050 };
1051
1052 let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1053
1054 let (decision, matched_policy, reason) = match result.decision {
1055 arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1056 arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1057 arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1058 arbiter_policy::Decision::Annotate { policy_id, reason } => {
1059 ("annotate".into(), Some(policy_id), Some(reason))
1060 }
1061 };
1062
1063 (
1064 StatusCode::OK,
1065 Json(PolicyExplainResponse {
1066 decision,
1067 matched_policy,
1068 reason,
1069 trace: result.trace,
1070 }),
1071 )
1072 .into_response()
1073}
1074
1075#[derive(Debug, Deserialize)]
1077pub struct PolicyValidateRequest {
1078 pub policy_toml: String,
1079}
1080
1081pub async fn validate_policy(
1083 State(state): State<AppState>,
1084 headers: HeaderMap,
1085 Json(req): Json<PolicyValidateRequest>,
1086) -> impl IntoResponse {
1087 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1088 return e.into_response();
1089 }
1090 if let Err(e) = check_admin_rate_limit(&state) {
1091 return e.into_response();
1092 }
1093
1094 state.admin_audit_log("validate_policy", None, "");
1095
1096 let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1097 (StatusCode::OK, Json(result)).into_response()
1098}
1099
1100pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1102 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1103 return e.into_response();
1104 }
1105 if let Err(e) = check_admin_rate_limit(&state) {
1106 return e.into_response();
1107 }
1108
1109 state.admin_audit_log("reload_policy", None, "");
1110
1111 let path = match &state.policy_file_path {
1112 Some(p) => p.clone(),
1113 None => {
1114 return (
1115 StatusCode::BAD_REQUEST,
1116 Json(ErrorResponse {
1117 error: "no policy file configured; policies are inline or absent".into(),
1118 }),
1119 )
1120 .into_response();
1121 }
1122 };
1123
1124 let contents = match std::fs::read_to_string(&path) {
1125 Ok(c) => c,
1126 Err(e) => {
1127 tracing::error!(path = %path, error = %e, "failed to read policy file");
1129 return (
1130 StatusCode::INTERNAL_SERVER_ERROR,
1131 Json(ErrorResponse {
1132 error: "failed to read policy file".into(),
1133 }),
1134 )
1135 .into_response();
1136 }
1137 };
1138
1139 let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1140 Ok(pc) => pc,
1141 Err(e) => {
1142 tracing::error!(error = %e, "failed to parse policy file");
1144 return (
1145 StatusCode::BAD_REQUEST,
1146 Json(ErrorResponse {
1147 error: "failed to parse policy file".into(),
1148 }),
1149 )
1150 .into_response();
1151 }
1152 };
1153
1154 let policy_count = new_config.policies.len();
1155 let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1156
1157 tracing::warn!(
1159 path,
1160 policy_count,
1161 "AUDIT: policy configuration reloaded via admin API"
1162 );
1163 (
1164 StatusCode::OK,
1165 Json(serde_json::json!({
1166 "status": "ok",
1167 "reloaded": true,
1168 "policies_loaded": policy_count,
1169 "policy_count": policy_count,
1170 "file": path,
1171 })),
1172 )
1173 .into_response()
1174}
1175
1176pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1178 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1179 return e.into_response();
1180 }
1181 if let Err(e) = check_admin_rate_limit(&state) {
1182 return e.into_response();
1183 }
1184
1185 state.admin_audit_log("policy_schema", None, "");
1186
1187 let schema = serde_json::json!({
1188 "$schema": "https://json-schema.org/draft/2020-12/schema",
1189 "title": "Arbiter Policy Configuration",
1190 "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1191 "type": "object",
1192 "properties": {
1193 "policies": {
1194 "type": "array",
1195 "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1196 "items": {
1197 "$ref": "#/$defs/Policy"
1198 }
1199 }
1200 },
1201 "$defs": {
1202 "Policy": {
1203 "type": "object",
1204 "description": "A single authorization policy rule.",
1205 "required": ["id", "effect"],
1206 "properties": {
1207 "id": {
1208 "type": "string",
1209 "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1210 },
1211 "effect": {
1212 "$ref": "#/$defs/Effect"
1213 },
1214 "agent_match": {
1215 "$ref": "#/$defs/AgentMatch"
1216 },
1217 "principal_match": {
1218 "$ref": "#/$defs/PrincipalMatch"
1219 },
1220 "intent_match": {
1221 "$ref": "#/$defs/IntentMatch"
1222 },
1223 "allowed_tools": {
1224 "type": "array",
1225 "items": { "type": "string" },
1226 "description": "Tools this policy applies to. Empty array means 'all tools'."
1227 },
1228 "parameter_constraints": {
1229 "type": "array",
1230 "items": { "$ref": "#/$defs/ParameterConstraint" },
1231 "description": "Per-parameter bounds on tool arguments."
1232 },
1233 "priority": {
1234 "type": "integer",
1235 "default": 0,
1236 "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1237 }
1238 }
1239 },
1240 "Effect": {
1241 "type": "string",
1242 "enum": ["allow", "deny", "escalate"],
1243 "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1244 },
1245 "AgentMatch": {
1246 "type": "object",
1247 "description": "Criteria for matching the requesting agent. All specified fields must match.",
1248 "properties": {
1249 "agent_id": {
1250 "type": "string",
1251 "format": "uuid",
1252 "description": "Match a specific agent by UUID. Specificity: +100."
1253 },
1254 "trust_level": {
1255 "$ref": "#/$defs/TrustLevel"
1256 },
1257 "capabilities": {
1258 "type": "array",
1259 "items": { "type": "string" },
1260 "description": "Agent must have all listed capabilities. Specificity: +25 each."
1261 }
1262 }
1263 },
1264 "TrustLevel": {
1265 "type": "string",
1266 "enum": ["untrusted", "basic", "verified", "trusted"],
1267 "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1268 },
1269 "PrincipalMatch": {
1270 "type": "object",
1271 "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1272 "properties": {
1273 "sub": {
1274 "type": "string",
1275 "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1276 },
1277 "groups": {
1278 "type": "array",
1279 "items": { "type": "string" },
1280 "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1281 }
1282 }
1283 },
1284 "IntentMatch": {
1285 "type": "object",
1286 "description": "Criteria for matching the session's declared intent.",
1287 "properties": {
1288 "keywords": {
1289 "type": "array",
1290 "items": { "type": "string" },
1291 "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1292 },
1293 "regex": {
1294 "type": "string",
1295 "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1296 }
1297 }
1298 },
1299 "ParameterConstraint": {
1300 "type": "object",
1301 "description": "A constraint on a tool call parameter.",
1302 "required": ["key"],
1303 "properties": {
1304 "key": {
1305 "type": "string",
1306 "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1307 },
1308 "max_value": {
1309 "type": "number",
1310 "description": "Maximum numeric value allowed."
1311 },
1312 "min_value": {
1313 "type": "number",
1314 "description": "Minimum numeric value allowed."
1315 },
1316 "allowed_values": {
1317 "type": "array",
1318 "items": { "type": "string" },
1319 "description": "Whitelist of allowed string values."
1320 }
1321 }
1322 },
1323 "DataSensitivity": {
1324 "type": "string",
1325 "enum": ["public", "internal", "confidential", "restricted"],
1326 "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1327 }
1328 }
1329 });
1330
1331 (StatusCode::OK, Json(schema)).into_response()
1332}
1333
1334pub fn router(state: AppState) -> axum::Router {
1336 axum::Router::new()
1337 .route("/agents", axum::routing::post(register_agent))
1338 .route("/agents", axum::routing::get(list_agents))
1339 .route("/agents/{id}", axum::routing::get(get_agent))
1340 .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1341 .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1342 .route(
1343 "/agents/{id}/delegations",
1344 axum::routing::get(list_delegations),
1345 )
1346 .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1347 .route("/sessions", axum::routing::post(create_session))
1348 .route("/sessions/{id}", axum::routing::get(get_session))
1349 .route("/sessions/{id}/close", axum::routing::post(close_session))
1350 .route("/policy/explain", axum::routing::post(explain_policy))
1351 .route("/policy/validate", axum::routing::post(validate_policy))
1352 .route("/policy/reload", axum::routing::post(reload_policy))
1353 .route("/admin/policies/reload", axum::routing::post(reload_policy))
1354 .route("/policy/schema", axum::routing::get(policy_schema))
1355 .with_state(state)
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361
1362 #[test]
1364 fn sanitize_for_log_strips_newlines() {
1365 assert_eq!(
1366 sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1367 "read config\\nINFO ADMIN_AUDIT: fake"
1368 );
1369 }
1370
1371 #[test]
1372 fn sanitize_for_log_strips_carriage_return() {
1373 assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1374 }
1375
1376 #[test]
1377 fn sanitize_for_log_strips_tabs() {
1378 assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1379 }
1380
1381 #[test]
1382 fn sanitize_for_log_preserves_normal_text() {
1383 assert_eq!(
1384 sanitize_for_log("read configuration files"),
1385 "read configuration files"
1386 );
1387 }
1388}