1use std::sync::Arc;
2
3use arbiter_identity::{AgentRegistry, TrustLevel};
4use arbiter_mcp::context::McpRequest;
5use arbiter_policy::{EvalContext, PolicyTrace};
6use arbiter_session::{CreateSessionRequest, DataSensitivity};
7use axum::Json;
8use axum::extract::{Path, State};
9use axum::http::{HeaderMap, StatusCode};
10use axum::response::IntoResponse;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::state::AppState;
16use crate::token::issue_token;
17
18#[derive(Debug, Deserialize)]
20pub struct RegisterAgentRequest {
21 pub owner: String,
22 pub model: String,
23 #[serde(default)]
24 pub capabilities: Vec<String>,
25 #[serde(default = "default_trust_level")]
26 pub trust_level: TrustLevel,
27 #[serde(default)]
28 pub expires_at: Option<DateTime<Utc>>,
29}
30
31fn default_trust_level() -> TrustLevel {
32 TrustLevel::Untrusted
33}
34
35#[derive(Debug, Serialize)]
37pub struct RegisterAgentResponse {
38 pub agent_id: Uuid,
39 pub token: String,
40}
41
42#[derive(Debug, Deserialize)]
44pub struct DelegateRequest {
45 pub to: Uuid,
46 pub scopes: Vec<String>,
47 #[serde(default)]
48 pub expires_at: Option<DateTime<Utc>>,
49}
50
51#[derive(Debug, Deserialize)]
53pub struct TokenRequest {
54 #[serde(default)]
55 pub expiry_seconds: Option<i64>,
56}
57
58#[derive(Debug, Deserialize)]
60pub struct CreateSessionApiRequest {
61 pub agent_id: Uuid,
62 #[serde(default)]
63 pub delegation_chain_snapshot: Vec<String>,
64 pub declared_intent: String,
65 #[serde(default)]
66 pub authorized_tools: Vec<String>,
67 #[serde(default)]
69 pub authorized_credentials: Option<Vec<String>>,
70 #[serde(default = "default_time_limit")]
71 pub time_limit_secs: i64,
72 #[serde(default = "default_call_budget")]
73 pub call_budget: u64,
74 #[serde(default)]
75 pub rate_limit_per_minute: Option<u64>,
76 #[serde(default)]
79 pub rate_limit_window_secs: Option<u64>,
80 #[serde(default = "default_data_sensitivity")]
81 pub data_sensitivity_ceiling: DataSensitivity,
82}
83
84fn default_time_limit() -> i64 {
85 3600
86}
87
88fn default_call_budget() -> u64 {
89 1000
90}
91
92fn default_data_sensitivity() -> DataSensitivity {
93 DataSensitivity::Internal
94}
95
96#[derive(Debug, Serialize)]
98pub struct CreateSessionResponse {
99 pub session_id: Uuid,
100 pub declared_intent: String,
101 pub authorized_tools: Vec<String>,
102 pub authorized_credentials: Vec<String>,
103 pub call_budget: u64,
104 pub time_limit_secs: i64,
105}
106
107#[derive(Debug, Serialize)]
109pub struct SessionStatusResponse {
110 pub session_id: Uuid,
111 pub agent_id: Uuid,
112 pub status: String,
113 pub declared_intent: String,
114 pub authorized_tools: Vec<String>,
115 pub authorized_credentials: Vec<String>,
116 pub calls_made: u64,
117 pub call_budget: u64,
118 pub calls_remaining: u64,
119 pub rate_limit_per_minute: Option<u64>,
120 pub data_sensitivity_ceiling: String,
121 pub created_at: DateTime<Utc>,
122 pub expires_at: DateTime<Utc>,
123 pub seconds_remaining: i64,
124 pub warnings: Vec<String>,
125}
126
127#[derive(Debug, Serialize)]
129pub struct TokenResponse {
130 pub token: String,
131 pub expires_in: i64,
132}
133
134#[derive(Debug, Serialize)]
136pub struct ErrorResponse {
137 pub error: String,
138}
139
140fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
150 use subtle::ConstantTimeEq;
151 let max_len = std::cmp::max(a.len(), b.len());
155 let mut a_padded = vec![0u8; max_len];
156 let mut b_padded = vec![0u8; max_len];
157 a_padded[..a.len()].copy_from_slice(a);
158 b_padded[..b.len()].copy_from_slice(b);
159 let bytes_equal: bool = a_padded.ct_eq(&b_padded).into();
160 let len_equal: bool = (a.len() as u64).ct_eq(&(b.len() as u64)).into();
161 bytes_equal & len_equal
162}
163
164fn sanitize_for_log(input: &str) -> String {
169 input
170 .replace('\n', "\\n")
171 .replace('\r', "\\r")
172 .replace('\t', "\\t")
173}
174
175fn validate_admin_key(
179 headers: &HeaderMap,
180 expected: &str,
181) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
182 let key = headers
183 .get("x-api-key")
184 .and_then(|v| v.to_str().ok())
185 .ok_or_else(|| {
186 (
187 StatusCode::UNAUTHORIZED,
188 Json(ErrorResponse {
189 error: "missing x-api-key header".into(),
190 }),
191 )
192 })?;
193
194 if !constant_time_eq(key.as_bytes(), expected.as_bytes()) {
198 return Err((
199 StatusCode::UNAUTHORIZED,
200 Json(ErrorResponse {
201 error: "invalid API key".into(),
202 }),
203 ));
204 }
205
206 Ok(())
207}
208
209fn check_admin_rate_limit(
215 state: &AppState,
216 headers: &axum::http::HeaderMap,
217) -> Result<(), (StatusCode, Json<ErrorResponse>)> {
218 let client_key = headers
220 .get("x-api-key")
221 .and_then(|v| v.to_str().ok())
222 .map(|k| {
223 use std::hash::{Hash, Hasher};
224 let mut hasher = std::collections::hash_map::DefaultHasher::new();
225 k.hash(&mut hasher);
226 format!("key:{:x}", hasher.finish())
227 })
228 .unwrap_or_else(|| "__unauth__".to_string());
229
230 if !state.admin_rate_limiter.check_rate_limit(&client_key) {
231 tracing::warn!(client = %client_key, "ADMIN_AUDIT: rate limit exceeded on admin API");
232 return Err((
233 StatusCode::TOO_MANY_REQUESTS,
234 Json(ErrorResponse {
235 error: "admin API rate limit exceeded, try again later".into(),
236 }),
237 ));
238 }
239 Ok(())
240}
241
242pub async fn register_agent(
244 State(state): State<AppState>,
245 headers: HeaderMap,
246 Json(req): Json<RegisterAgentRequest>,
247) -> impl IntoResponse {
248 if let Err(e) = check_admin_rate_limit(&state, &headers) {
249 return e.into_response();
250 }
251 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
252 return e.into_response();
253 }
254
255 match state
256 .registry
257 .register_agent(
258 req.owner.clone(),
259 req.model,
260 req.capabilities,
261 req.trust_level,
262 req.expires_at,
263 )
264 .await
265 {
266 Ok(agent) => {
267 let token = match issue_token(agent.id, &req.owner, &state.token_config) {
268 Ok(t) => t,
269 Err(e) => {
270 tracing::error!(error = %e, "failed to issue token");
271 return (
272 StatusCode::INTERNAL_SERVER_ERROR,
273 Json(ErrorResponse {
274 error: "token issuance failed".into(),
275 }),
276 )
277 .into_response();
278 }
279 };
280
281 state.metrics.registered_agents.inc();
282 state.admin_audit_log(
283 "register_agent",
284 Some(agent.id),
285 &format!("owner={}", sanitize_for_log(&req.owner)),
286 );
287
288 (
289 StatusCode::CREATED,
290 Json(RegisterAgentResponse {
291 agent_id: agent.id,
292 token,
293 }),
294 )
295 .into_response()
296 }
297 Err(e) => {
298 tracing::error!(error = %e, "failed to register agent");
299 (
300 StatusCode::INTERNAL_SERVER_ERROR,
301 Json(ErrorResponse {
302 error: "internal error".into(),
303 }),
304 )
305 .into_response()
306 }
307 }
308}
309
310pub async fn get_agent(
312 State(state): State<AppState>,
313 headers: HeaderMap,
314 Path(id): Path<Uuid>,
315) -> impl IntoResponse {
316 if let Err(e) = check_admin_rate_limit(&state, &headers) {
317 return e.into_response();
318 }
319 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
320 return e.into_response();
321 }
322
323 state.admin_audit_log("get_agent", Some(id), "");
324
325 match state.registry.get_agent(id).await {
326 Ok(agent) => match serde_json::to_value(&agent) {
327 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
328 Err(e) => (
329 StatusCode::INTERNAL_SERVER_ERROR,
330 Json(serde_json::json!({"error": e.to_string()})),
331 )
332 .into_response(),
333 },
334 Err(e) => {
335 tracing::debug!(error = %e, agent_id = %id, "get_agent failed");
339 (
340 StatusCode::NOT_FOUND,
341 Json(ErrorResponse {
342 error: "agent not found".into(),
343 }),
344 )
345 .into_response()
346 }
347 }
348}
349
350pub async fn delegate_agent(
352 State(state): State<AppState>,
353 headers: HeaderMap,
354 Path(from_id): Path<Uuid>,
355 Json(req): Json<DelegateRequest>,
356) -> impl IntoResponse {
357 if let Err(e) = check_admin_rate_limit(&state, &headers) {
358 return e.into_response();
359 }
360 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
361 return e.into_response();
362 }
363
364 state.admin_audit_log("delegate_agent", Some(from_id), &format!("to={}", req.to));
365
366 match state
367 .registry
368 .create_delegation(from_id, req.to, req.scopes, req.expires_at)
369 .await
370 {
371 Ok(link) => match serde_json::to_value(&link) {
372 Ok(val) => (StatusCode::CREATED, Json(val)).into_response(),
373 Err(e) => (
374 StatusCode::INTERNAL_SERVER_ERROR,
375 Json(serde_json::json!({"error": e.to_string()})),
376 )
377 .into_response(),
378 },
379 Err(e) => {
380 let status = match &e {
381 arbiter_identity::IdentityError::DelegationSourceNotFound(_)
382 | arbiter_identity::IdentityError::DelegationTargetNotFound(_) => {
383 StatusCode::NOT_FOUND
384 }
385 arbiter_identity::IdentityError::ScopeNarrowingViolation { .. }
386 | arbiter_identity::IdentityError::DelegateFromDeactivated(_)
387 | arbiter_identity::IdentityError::CircularDelegation { .. } => {
388 StatusCode::BAD_REQUEST
389 }
390 _ => StatusCode::INTERNAL_SERVER_ERROR,
391 };
392 (
393 status,
394 Json(ErrorResponse {
395 error: "internal error".into(),
396 }),
397 )
398 .into_response()
399 }
400 }
401}
402
403pub async fn list_delegations(
405 State(state): State<AppState>,
406 headers: HeaderMap,
407 Path(id): Path<Uuid>,
408) -> impl IntoResponse {
409 if let Err(e) = check_admin_rate_limit(&state, &headers) {
410 return e.into_response();
411 }
412 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
413 return e.into_response();
414 }
415
416 state.admin_audit_log("list_delegations", Some(id), "");
417
418 if let Err(_e) = state.registry.get_agent(id).await {
420 return (
421 StatusCode::NOT_FOUND,
422 Json(ErrorResponse {
423 error: "internal error".into(),
424 }),
425 )
426 .into_response();
427 }
428
429 let outgoing = state.registry.list_delegations_from(id).await;
430 let incoming = state.registry.list_delegations_to(id).await;
431
432 (
433 StatusCode::OK,
434 Json(serde_json::json!({
435 "agent_id": id,
436 "outgoing": outgoing,
437 "incoming": incoming,
438 "outgoing_count": outgoing.len(),
439 "incoming_count": incoming.len(),
440 })),
441 )
442 .into_response()
443}
444
445pub async fn deactivate_agent(
447 State(state): State<AppState>,
448 headers: HeaderMap,
449 Path(id): Path<Uuid>,
450) -> impl IntoResponse {
451 if let Err(e) = check_admin_rate_limit(&state, &headers) {
452 return e.into_response();
453 }
454 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
455 return e.into_response();
456 }
457
458 state.admin_audit_log("deactivate_agent", Some(id), "cascade");
459
460 match state.registry.cascade_deactivate(id).await {
461 Ok(deactivated) => {
462 let mut total_sessions_closed = 0usize;
464 for &agent_id in &deactivated {
465 let closed = state.session_store.close_sessions_for_agent(agent_id).await;
466 total_sessions_closed += closed;
467 state.metrics.registered_agents.dec();
468 }
469 (
470 StatusCode::OK,
471 Json(serde_json::json!({
472 "deactivated": deactivated,
473 "count": deactivated.len(),
474 "sessions_closed": total_sessions_closed,
475 })),
476 )
477 .into_response()
478 }
479 Err(e) => {
480 let status = match &e {
481 arbiter_identity::IdentityError::AgentNotFound(_) => StatusCode::NOT_FOUND,
482 arbiter_identity::IdentityError::AgentDeactivated(_) => StatusCode::CONFLICT,
483 _ => StatusCode::INTERNAL_SERVER_ERROR,
484 };
485 (
486 status,
487 Json(ErrorResponse {
488 error: "internal error".into(),
489 }),
490 )
491 .into_response()
492 }
493 }
494}
495
496pub async fn issue_agent_token(
498 State(state): State<AppState>,
499 headers: HeaderMap,
500 Path(id): Path<Uuid>,
501 body: Option<Json<TokenRequest>>,
502) -> impl IntoResponse {
503 if let Err(e) = check_admin_rate_limit(&state, &headers) {
504 return e.into_response();
505 }
506 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
507 return e.into_response();
508 }
509
510 state.admin_audit_log("issue_agent_token", Some(id), "");
511
512 let agent = match state.registry.get_agent(id).await {
513 Ok(a) => a,
514 Err(_e) => {
515 return (
516 StatusCode::NOT_FOUND,
517 Json(ErrorResponse {
518 error: "internal error".into(),
519 }),
520 )
521 .into_response();
522 }
523 };
524
525 if !agent.active {
526 return (
527 StatusCode::BAD_REQUEST,
528 Json(ErrorResponse {
529 error: format!("agent {} is deactivated", id),
530 }),
531 )
532 .into_response();
533 }
534
535 let mut config = state.token_config.clone();
536 if let Some(Json(req)) = body
537 && let Some(expiry) = req.expiry_seconds
538 {
539 if expiry <= 0 {
542 return (
543 StatusCode::BAD_REQUEST,
544 Json(ErrorResponse {
545 error: "expiry_seconds must be positive".into(),
546 }),
547 )
548 .into_response();
549 }
550 config.expiry_seconds = expiry;
551 }
552
553 match issue_token(agent.id, &agent.owner, &config) {
554 Ok(token) => (
555 StatusCode::OK,
556 Json(TokenResponse {
557 token,
558 expires_in: config.expiry_seconds,
559 }),
560 )
561 .into_response(),
562 Err(e) => {
563 tracing::error!(error = %e, "failed to issue token");
564 (
565 StatusCode::INTERNAL_SERVER_ERROR,
566 Json(ErrorResponse {
567 error: "token issuance failed".into(),
568 }),
569 )
570 .into_response()
571 }
572 }
573}
574
575pub async fn list_agents(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
577 if let Err(e) = check_admin_rate_limit(&state, &headers) {
578 return e.into_response();
579 }
580 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
581 return e.into_response();
582 }
583
584 state.admin_audit_log("list_agents", None, "");
585
586 let agents = state.registry.list_agents().await;
587 match serde_json::to_value(&agents) {
588 Ok(val) => (StatusCode::OK, Json(val)).into_response(),
589 Err(e) => (
590 StatusCode::INTERNAL_SERVER_ERROR,
591 Json(serde_json::json!({"error": e.to_string()})),
592 )
593 .into_response(),
594 }
595}
596
597pub async fn create_session(
599 State(state): State<AppState>,
600 headers: HeaderMap,
601 Json(req): Json<CreateSessionApiRequest>,
602) -> impl IntoResponse {
603 if let Err(e) = check_admin_rate_limit(&state, &headers) {
604 return e.into_response();
605 }
606 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
607 return e.into_response();
608 }
609
610 state.admin_audit_log(
611 "create_session",
612 Some(req.agent_id),
613 &format!("intent={}", sanitize_for_log(&req.declared_intent)),
614 );
615
616 let agent = match state.registry.get_agent(req.agent_id).await {
620 Ok(a) => a,
621 Err(e) => {
622 tracing::warn!(
623 agent_id = %req.agent_id,
624 error = %e,
625 "session creation denied: agent not found"
626 );
627 return (
628 StatusCode::NOT_FOUND,
629 Json(ErrorResponse {
630 error: format!("agent {} not found", req.agent_id),
631 }),
632 )
633 .into_response();
634 }
635 };
636
637 if !agent.active {
638 tracing::warn!(
639 agent_id = %req.agent_id,
640 "session creation denied: agent is deactivated"
641 );
642 return (
643 StatusCode::BAD_REQUEST,
644 Json(ErrorResponse {
645 error: format!("agent {} is deactivated", req.agent_id),
646 }),
647 )
648 .into_response();
649 }
650
651 if let Some(expires_at) = agent.expires_at
652 && expires_at < Utc::now()
653 {
654 tracing::warn!(
655 agent_id = %req.agent_id,
656 expires_at = %expires_at,
657 "session creation denied: agent has expired"
658 );
659 return (
660 StatusCode::BAD_REQUEST,
661 Json(ErrorResponse {
662 error: format!("agent {} has expired", req.agent_id),
663 }),
664 )
665 .into_response();
666 }
667
668 if let Some(max_sessions) = state.max_concurrent_sessions_per_agent {
672 let active_count = state
673 .session_store
674 .count_active_for_agent(req.agent_id)
675 .await;
676 if active_count >= max_sessions {
677 tracing::warn!(
678 agent_id = %req.agent_id,
679 active = active_count,
680 max = max_sessions,
681 "session creation denied: per-agent concurrent session cap reached"
682 );
683 return (
684 StatusCode::TOO_MANY_REQUESTS,
685 Json(ErrorResponse {
686 error: format!(
687 "agent {} has too many concurrent sessions ({}/{})",
688 req.agent_id, active_count, max_sessions
689 ),
690 }),
691 )
692 .into_response();
693 }
694 }
695
696 if req.time_limit_secs <= 0 {
698 return (
699 StatusCode::BAD_REQUEST,
700 Json(ErrorResponse {
701 error: "time_limit_secs must be positive".into(),
702 }),
703 )
704 .into_response();
705 }
706 if req.time_limit_secs > 86400 {
708 return (
709 StatusCode::BAD_REQUEST,
710 Json(ErrorResponse {
711 error: "time_limit_secs cannot exceed 86400 (24 hours)".into(),
712 }),
713 )
714 .into_response();
715 }
716 if req.call_budget == 0 {
717 return (
718 StatusCode::BAD_REQUEST,
719 Json(ErrorResponse {
720 error: "call_budget must be positive".into(),
721 }),
722 )
723 .into_response();
724 }
725 if req.call_budget > 1_000_000 {
726 return (
727 StatusCode::BAD_REQUEST,
728 Json(ErrorResponse {
729 error: "call_budget cannot exceed 1000000".into(),
730 }),
731 )
732 .into_response();
733 }
734 if req.declared_intent.trim().is_empty() {
735 return (
736 StatusCode::BAD_REQUEST,
737 Json(ErrorResponse {
738 error: "declared_intent must not be empty".into(),
739 }),
740 )
741 .into_response();
742 }
743
744 const MAX_INTENT_LEN: usize = 512;
748 if req.declared_intent.len() > MAX_INTENT_LEN {
749 return (
750 StatusCode::BAD_REQUEST,
751 Json(ErrorResponse {
752 error: format!("declared_intent exceeds maximum length of {MAX_INTENT_LEN} bytes"),
753 }),
754 )
755 .into_response();
756 }
757
758 if let Some(window) = req.rate_limit_window_secs {
763 if window == 0 {
764 return (
765 StatusCode::BAD_REQUEST,
766 Json(ErrorResponse {
767 error: "rate_limit_window_secs must be positive".into(),
768 }),
769 )
770 .into_response();
771 }
772 if window < 10 {
773 return (
774 StatusCode::BAD_REQUEST,
775 Json(ErrorResponse {
776 error: "rate_limit_window_secs must be at least 10 seconds".into(),
777 }),
778 )
779 .into_response();
780 }
781 if window > 3600 {
782 return (
783 StatusCode::BAD_REQUEST,
784 Json(ErrorResponse {
785 error: "rate_limit_window_secs cannot exceed 3600 (1 hour)".into(),
786 }),
787 )
788 .into_response();
789 }
790 }
791
792 if req.authorized_tools.is_empty() {
793 tracing::warn!(
794 agent_id = %req.agent_id,
795 "session created with empty authorized_tools; all tools will be permitted. \
796 Specify an explicit tool allowlist for least-privilege enforcement."
797 );
798 }
799
800 let create_req = CreateSessionRequest {
801 agent_id: req.agent_id,
802 delegation_chain_snapshot: req.delegation_chain_snapshot,
803 declared_intent: req.declared_intent,
804 authorized_tools: req.authorized_tools,
805 authorized_credentials: req.authorized_credentials.unwrap_or_default(),
806 time_limit: chrono::Duration::seconds(req.time_limit_secs),
807 call_budget: req.call_budget,
808 rate_limit_per_minute: req.rate_limit_per_minute,
809 rate_limit_window_secs: req
810 .rate_limit_window_secs
811 .unwrap_or(state.default_rate_limit_window_secs),
812 data_sensitivity_ceiling: req.data_sensitivity_ceiling,
813 };
814
815 let session = state.session_store.create(create_req).await;
816
817 state.metrics.active_sessions.inc();
818
819 (
820 StatusCode::CREATED,
821 Json(CreateSessionResponse {
822 session_id: session.session_id,
823 declared_intent: session.declared_intent,
824 authorized_tools: session.authorized_tools.clone(),
825 authorized_credentials: session.authorized_credentials,
826 call_budget: session.call_budget,
827 time_limit_secs: req.time_limit_secs,
828 }),
829 )
830 .into_response()
831}
832
833pub async fn get_session(
835 State(state): State<AppState>,
836 headers: HeaderMap,
837 Path(id): Path<Uuid>,
838) -> impl IntoResponse {
839 if let Err(e) = check_admin_rate_limit(&state, &headers) {
840 return e.into_response();
841 }
842 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
843 return e.into_response();
844 }
845
846 state.admin_audit_log("get_session", None, &format!("session_id={}", id));
847
848 let session = match state.session_store.get(id).await {
849 Ok(s) => s,
850 Err(_e) => {
851 return (
852 StatusCode::NOT_FOUND,
853 Json(ErrorResponse {
854 error: "internal error".into(),
855 }),
856 )
857 .into_response();
858 }
859 };
860
861 let now = Utc::now();
862 let expires_at = session.created_at + session.time_limit;
863 let seconds_remaining = (expires_at - now).num_seconds().max(0);
864 let calls_remaining = session.call_budget.saturating_sub(session.calls_made);
865
866 let mut warnings = Vec::new();
867 let budget_pct_remaining = if session.call_budget > 0 {
868 (calls_remaining as f64 / session.call_budget as f64) * 100.0
869 } else {
870 0.0
871 };
872 let time_pct_remaining = if session.time_limit.num_seconds() > 0 {
873 (seconds_remaining as f64 / session.time_limit.num_seconds() as f64) * 100.0
874 } else {
875 0.0
876 };
877 if budget_pct_remaining <= state.warning_threshold_pct && session.call_budget > 0 {
878 warnings.push(format!(
879 "budget low: {} of {} calls remaining",
880 calls_remaining, session.call_budget
881 ));
882 }
883 if time_pct_remaining <= state.warning_threshold_pct && seconds_remaining > 0 {
884 warnings.push(format!("time low: {}s remaining", seconds_remaining));
885 }
886
887 let status_str = format!("{:?}", session.status).to_lowercase();
888 let sensitivity_str = serde_json::to_value(session.data_sensitivity_ceiling)
889 .unwrap_or_default()
890 .as_str()
891 .unwrap_or("unknown")
892 .to_string();
893
894 (
895 StatusCode::OK,
896 Json(SessionStatusResponse {
897 session_id: session.session_id,
898 agent_id: session.agent_id,
899 status: status_str,
900 declared_intent: session.declared_intent,
901 authorized_tools: session.authorized_tools.clone(),
902 authorized_credentials: session.authorized_credentials,
903 calls_made: session.calls_made,
904 call_budget: session.call_budget,
905 calls_remaining,
906 rate_limit_per_minute: session.rate_limit_per_minute,
907 data_sensitivity_ceiling: sensitivity_str,
908 created_at: session.created_at,
909 expires_at,
910 seconds_remaining,
911 warnings,
912 }),
913 )
914 .into_response()
915}
916
917#[derive(Debug, Serialize)]
919pub struct SessionCloseResponse {
920 pub session_id: Uuid,
921 pub status: String,
922 pub declared_intent: String,
923 pub total_calls: u64,
924 pub call_budget: u64,
925 pub budget_utilization_pct: f64,
926 pub time_used_secs: i64,
927 pub time_limit_secs: i64,
928 pub denied_attempts: u64,
930 pub anomalies_detected: u64,
932}
933
934pub async fn close_session(
936 State(state): State<AppState>,
937 headers: HeaderMap,
938 Path(id): Path<Uuid>,
939) -> impl IntoResponse {
940 if let Err(e) = check_admin_rate_limit(&state, &headers) {
941 return e.into_response();
942 }
943 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
944 return e.into_response();
945 }
946
947 state.admin_audit_log("close_session", None, &format!("session_id={}", id));
948
949 let session = match state.session_store.close(id).await {
950 Ok(s) => {
951 state.metrics.active_sessions.dec();
952 s
953 }
954 Err(e) => {
955 let status = match &e {
956 arbiter_session::SessionError::NotFound(_) => StatusCode::NOT_FOUND,
957 arbiter_session::SessionError::AlreadyClosed(_) => StatusCode::CONFLICT,
958 _ => StatusCode::INTERNAL_SERVER_ERROR,
959 };
960 return (
961 status,
962 Json(ErrorResponse {
963 error: "internal error".into(),
964 }),
965 )
966 .into_response();
967 }
968 };
969
970 let now = Utc::now();
971 let time_used_secs = (now - session.created_at).num_seconds();
972 let budget_utilization_pct = if session.call_budget > 0 {
973 (session.calls_made as f64 / session.call_budget as f64) * 100.0
974 } else {
975 0.0
976 };
977
978 let session_id_str = id.to_string();
980 let (denied_attempts, anomalies_detected) = if let Some(ref sink) = state.audit_sink {
981 let stats = sink.stats().stats_for_session(&session_id_str).await;
982 sink.stats().remove_session(&session_id_str).await;
984 (stats.denied_count, stats.anomaly_count)
985 } else {
986 (0, 0)
987 };
988
989 (
990 StatusCode::OK,
991 Json(SessionCloseResponse {
992 session_id: session.session_id,
993 status: "closed".into(),
994 declared_intent: session.declared_intent,
995 total_calls: session.calls_made,
996 call_budget: session.call_budget,
997 budget_utilization_pct,
998 time_used_secs,
999 time_limit_secs: session.time_limit.num_seconds(),
1000 denied_attempts,
1001 anomalies_detected,
1002 }),
1003 )
1004 .into_response()
1005}
1006
1007#[derive(Debug, Deserialize)]
1009pub struct PolicyExplainRequest {
1010 pub agent_id: Uuid,
1011 pub declared_intent: String,
1012 pub tool: String,
1013 #[serde(default)]
1014 pub arguments: Option<serde_json::Value>,
1015 #[serde(default)]
1016 pub principal: Option<String>,
1017}
1018
1019#[derive(Debug, Serialize)]
1021pub struct PolicyExplainResponse {
1022 pub decision: String,
1023 pub matched_policy: Option<String>,
1024 pub reason: Option<String>,
1025 pub trace: Vec<PolicyTrace>,
1026}
1027
1028pub async fn explain_policy(
1030 State(state): State<AppState>,
1031 headers: HeaderMap,
1032 Json(req): Json<PolicyExplainRequest>,
1033) -> impl IntoResponse {
1034 if let Err(e) = check_admin_rate_limit(&state, &headers) {
1035 return e.into_response();
1036 }
1037 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1038 return e.into_response();
1039 }
1040
1041 state.admin_audit_log(
1042 "explain_policy",
1043 Some(req.agent_id),
1044 &format!("tool={}", sanitize_for_log(&req.tool)),
1045 );
1046
1047 let policy_snapshot = tokio::sync::watch::Sender::borrow(&state.policy_config).clone();
1048 let policy_config = match policy_snapshot.as_ref() {
1049 Some(pc) => pc,
1050 None => {
1051 return (
1052 StatusCode::OK,
1053 Json(PolicyExplainResponse {
1054 decision: "allow".into(),
1055 matched_policy: None,
1056 reason: Some("no policies configured".into()),
1057 trace: vec![],
1058 }),
1059 )
1060 .into_response();
1061 }
1062 };
1063
1064 let agent = match state.registry.get_agent(req.agent_id).await {
1065 Ok(a) => a,
1066 Err(e) => {
1067 return (
1068 StatusCode::NOT_FOUND,
1069 Json(ErrorResponse {
1070 error: format!("agent not found: {e}"),
1071 }),
1072 )
1073 .into_response();
1074 }
1075 };
1076
1077 let principal = req.principal.unwrap_or_else(|| agent.owner.clone());
1078
1079 let eval_ctx = EvalContext {
1080 agent,
1081 delegation_chain: vec![],
1082 declared_intent: req.declared_intent,
1083 principal_sub: principal,
1084 principal_groups: vec![],
1085 };
1086
1087 let mcp_req = McpRequest {
1088 id: None,
1089 method: "tools/call".into(),
1090 tool_name: Some(req.tool.clone()),
1091 arguments: req.arguments,
1092 resource_uri: None,
1093 };
1094
1095 let result = arbiter_policy::evaluate_explained(policy_config, &eval_ctx, &mcp_req);
1096
1097 let (decision, matched_policy, reason) = match result.decision {
1098 arbiter_policy::Decision::Allow { policy_id } => ("allow".into(), Some(policy_id), None),
1099 arbiter_policy::Decision::Deny { reason } => ("deny".into(), None, Some(reason)),
1100 arbiter_policy::Decision::Escalate { reason } => ("escalate".into(), None, Some(reason)),
1101 arbiter_policy::Decision::Annotate { policy_id, reason } => {
1102 ("annotate".into(), Some(policy_id), Some(reason))
1103 }
1104 };
1105
1106 (
1107 StatusCode::OK,
1108 Json(PolicyExplainResponse {
1109 decision,
1110 matched_policy,
1111 reason,
1112 trace: result.trace,
1113 }),
1114 )
1115 .into_response()
1116}
1117
1118#[derive(Debug, Deserialize)]
1120pub struct PolicyValidateRequest {
1121 pub policy_toml: String,
1122}
1123
1124pub async fn validate_policy(
1126 State(state): State<AppState>,
1127 headers: HeaderMap,
1128 Json(req): Json<PolicyValidateRequest>,
1129) -> impl IntoResponse {
1130 if let Err(e) = check_admin_rate_limit(&state, &headers) {
1131 return e.into_response();
1132 }
1133 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1134 return e.into_response();
1135 }
1136
1137 state.admin_audit_log("validate_policy", None, "");
1138
1139 let result = arbiter_policy::PolicyConfig::validate_toml(&req.policy_toml);
1140 (StatusCode::OK, Json(result)).into_response()
1141}
1142
1143pub async fn reload_policy(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1145 if let Err(e) = check_admin_rate_limit(&state, &headers) {
1146 return e.into_response();
1147 }
1148 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1149 return e.into_response();
1150 }
1151
1152 state.admin_audit_log("reload_policy", None, "");
1153
1154 let path = match &state.policy_file_path {
1155 Some(p) => p.clone(),
1156 None => {
1157 return (
1158 StatusCode::BAD_REQUEST,
1159 Json(ErrorResponse {
1160 error: "no policy file configured; policies are inline or absent".into(),
1161 }),
1162 )
1163 .into_response();
1164 }
1165 };
1166
1167 let contents = match std::fs::read_to_string(&path) {
1168 Ok(c) => c,
1169 Err(e) => {
1170 tracing::error!(path = %path, error = %e, "failed to read policy file");
1172 return (
1173 StatusCode::INTERNAL_SERVER_ERROR,
1174 Json(ErrorResponse {
1175 error: "failed to read policy file".into(),
1176 }),
1177 )
1178 .into_response();
1179 }
1180 };
1181
1182 let new_config = match arbiter_policy::PolicyConfig::from_toml(&contents) {
1183 Ok(pc) => pc,
1184 Err(e) => {
1185 tracing::error!(error = %e, "failed to parse policy file");
1187 return (
1188 StatusCode::BAD_REQUEST,
1189 Json(ErrorResponse {
1190 error: "failed to parse policy file".into(),
1191 }),
1192 )
1193 .into_response();
1194 }
1195 };
1196
1197 let policy_count = new_config.policies.len();
1198 let _ = state.policy_config.send_replace(Arc::new(Some(new_config)));
1199
1200 tracing::warn!(
1202 path,
1203 policy_count,
1204 "AUDIT: policy configuration reloaded via admin API"
1205 );
1206 (
1207 StatusCode::OK,
1208 Json(serde_json::json!({
1209 "status": "ok",
1210 "reloaded": true,
1211 "policies_loaded": policy_count,
1212 "policy_count": policy_count,
1213 })),
1214 )
1215 .into_response()
1216}
1217
1218pub async fn policy_schema(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
1220 if let Err(e) = check_admin_rate_limit(&state, &headers) {
1221 return e.into_response();
1222 }
1223 if let Err(e) = validate_admin_key(&headers, &state.admin_api_key) {
1224 return e.into_response();
1225 }
1226
1227 state.admin_audit_log("policy_schema", None, "");
1228
1229 let schema = serde_json::json!({
1230 "$schema": "https://json-schema.org/draft/2020-12/schema",
1231 "title": "Arbiter Policy Configuration",
1232 "description": "Defines authorization policies for the Arbiter MCP gateway. Policies are evaluated top-to-bottom; the most specific match wins. If no policy matches, the request is denied (deny-by-default).",
1233 "type": "object",
1234 "properties": {
1235 "policies": {
1236 "type": "array",
1237 "description": "Ordered list of authorization policies. Evaluated by specificity score; the most specific matching policy's effect applies.",
1238 "items": {
1239 "$ref": "#/$defs/Policy"
1240 }
1241 }
1242 },
1243 "$defs": {
1244 "Policy": {
1245 "type": "object",
1246 "description": "A single authorization policy rule.",
1247 "required": ["id", "effect"],
1248 "properties": {
1249 "id": {
1250 "type": "string",
1251 "description": "Unique identifier for this policy. Used in audit logs and policy traces."
1252 },
1253 "effect": {
1254 "$ref": "#/$defs/Effect"
1255 },
1256 "agent_match": {
1257 "$ref": "#/$defs/AgentMatch"
1258 },
1259 "principal_match": {
1260 "$ref": "#/$defs/PrincipalMatch"
1261 },
1262 "intent_match": {
1263 "$ref": "#/$defs/IntentMatch"
1264 },
1265 "allowed_tools": {
1266 "type": "array",
1267 "items": { "type": "string" },
1268 "description": "Tools this policy applies to. Empty array means 'all tools'."
1269 },
1270 "parameter_constraints": {
1271 "type": "array",
1272 "items": { "$ref": "#/$defs/ParameterConstraint" },
1273 "description": "Per-parameter bounds on tool arguments."
1274 },
1275 "priority": {
1276 "type": "integer",
1277 "default": 0,
1278 "description": "Manual priority override. 0 = auto-computed from match specificity. Higher wins ties."
1279 }
1280 }
1281 },
1282 "Effect": {
1283 "type": "string",
1284 "enum": ["allow", "deny", "escalate"],
1285 "description": "allow = permit the request. deny = block it. escalate = require human-in-the-loop approval."
1286 },
1287 "AgentMatch": {
1288 "type": "object",
1289 "description": "Criteria for matching the requesting agent. All specified fields must match.",
1290 "properties": {
1291 "agent_id": {
1292 "type": "string",
1293 "format": "uuid",
1294 "description": "Match a specific agent by UUID. Specificity: +100."
1295 },
1296 "trust_level": {
1297 "$ref": "#/$defs/TrustLevel"
1298 },
1299 "capabilities": {
1300 "type": "array",
1301 "items": { "type": "string" },
1302 "description": "Agent must have all listed capabilities. Specificity: +25 each."
1303 }
1304 }
1305 },
1306 "TrustLevel": {
1307 "type": "string",
1308 "enum": ["untrusted", "basic", "verified", "trusted"],
1309 "description": "Agent trust tier. untrusted < basic < verified < trusted. Matches agents at or above the specified level. Specificity: +50."
1310 },
1311 "PrincipalMatch": {
1312 "type": "object",
1313 "description": "Criteria for matching the human principal on whose behalf the agent acts.",
1314 "properties": {
1315 "sub": {
1316 "type": "string",
1317 "description": "Exact principal subject identifier (e.g., 'user:alice'). Specificity: +40."
1318 },
1319 "groups": {
1320 "type": "array",
1321 "items": { "type": "string" },
1322 "description": "Principal must belong to at least one of these groups. Specificity: +20 each."
1323 }
1324 }
1325 },
1326 "IntentMatch": {
1327 "type": "object",
1328 "description": "Criteria for matching the session's declared intent.",
1329 "properties": {
1330 "keywords": {
1331 "type": "array",
1332 "items": { "type": "string" },
1333 "description": "Case-insensitive substrings that must appear in the declared intent. Specificity: +10 each."
1334 },
1335 "regex": {
1336 "type": "string",
1337 "description": "Regex pattern the declared intent must match. Compiled at config load time. Specificity: +30."
1338 }
1339 }
1340 },
1341 "ParameterConstraint": {
1342 "type": "object",
1343 "description": "A constraint on a tool call parameter.",
1344 "required": ["key"],
1345 "properties": {
1346 "key": {
1347 "type": "string",
1348 "description": "Dotted path to the parameter (e.g., 'arguments.max_tokens')."
1349 },
1350 "max_value": {
1351 "type": "number",
1352 "description": "Maximum numeric value allowed."
1353 },
1354 "min_value": {
1355 "type": "number",
1356 "description": "Minimum numeric value allowed."
1357 },
1358 "allowed_values": {
1359 "type": "array",
1360 "items": { "type": "string" },
1361 "description": "Whitelist of allowed string values."
1362 }
1363 }
1364 },
1365 "DataSensitivity": {
1366 "type": "string",
1367 "enum": ["public", "internal", "confidential", "restricted"],
1368 "description": "Data sensitivity ceiling for sessions. public < internal < confidential < restricted."
1369 }
1370 }
1371 });
1372
1373 (StatusCode::OK, Json(schema)).into_response()
1374}
1375
1376pub fn router(state: AppState) -> axum::Router {
1378 axum::Router::new()
1379 .route("/agents", axum::routing::post(register_agent))
1380 .route("/agents", axum::routing::get(list_agents))
1381 .route("/agents/{id}", axum::routing::get(get_agent))
1382 .route("/agents/{id}", axum::routing::delete(deactivate_agent))
1383 .route("/agents/{id}/delegate", axum::routing::post(delegate_agent))
1384 .route(
1385 "/agents/{id}/delegations",
1386 axum::routing::get(list_delegations),
1387 )
1388 .route("/agents/{id}/token", axum::routing::post(issue_agent_token))
1389 .route("/sessions", axum::routing::post(create_session))
1390 .route("/sessions/{id}", axum::routing::get(get_session))
1391 .route("/sessions/{id}/close", axum::routing::post(close_session))
1392 .route("/policy/explain", axum::routing::post(explain_policy))
1393 .route("/policy/validate", axum::routing::post(validate_policy))
1394 .route("/policy/reload", axum::routing::post(reload_policy))
1395 .route("/admin/policies/reload", axum::routing::post(reload_policy))
1396 .route("/policy/schema", axum::routing::get(policy_schema))
1397 .with_state(state)
1398}
1399
1400#[cfg(test)]
1401mod tests {
1402 use super::*;
1403
1404 #[test]
1406 fn sanitize_for_log_strips_newlines() {
1407 assert_eq!(
1408 sanitize_for_log("read config\nINFO ADMIN_AUDIT: fake"),
1409 "read config\\nINFO ADMIN_AUDIT: fake"
1410 );
1411 }
1412
1413 #[test]
1414 fn sanitize_for_log_strips_carriage_return() {
1415 assert_eq!(sanitize_for_log("line1\r\nline2"), "line1\\r\\nline2");
1416 }
1417
1418 #[test]
1419 fn sanitize_for_log_strips_tabs() {
1420 assert_eq!(sanitize_for_log("key\tvalue"), "key\\tvalue");
1421 }
1422
1423 #[test]
1424 fn sanitize_for_log_preserves_normal_text() {
1425 assert_eq!(
1426 sanitize_for_log("read configuration files"),
1427 "read configuration files"
1428 );
1429 }
1430}