atd_runtime/dispatch.rs
1//! Transport-neutral dispatch — the shared core of ATD-speaking servers.
2//!
3//! SP-streamable-http §4.3 + §6.3: the body of `atd-server::connection.rs`'s
4//! per-request switch moves here so the Unix-socket listener and the
5//! Streamable-HTTP listener (`atd-server-http`) drive the **same** dispatch
6//! state machine over the **same** `Arc<Registry>`. The wire shape returned
7//! is byte-identical regardless of which transport delivered the request —
8//! this property is exercised by `atd-server-http`'s UDS↔HTTP parity test.
9//!
10//! Two entry points:
11//!
12//! 1. [`dispatch_request`] — handles the full `Request` enum (Ping / Hello /
13//! ToolList / ToolSchema / RunTool). UDS uses this from
14//! `handle_connection`'s read loop; capability set + caller identity are
15//! mutated in place because `Hello` rewrites both for the lifetime of the
16//! connection.
17//!
18//! 2. [`run_tool`] — the `RunTool` arm extracted for HTTP. HTTP requests
19//! derive their `CapabilitySet` and `caller_id` fresh from a Bearer-token
20//! lookup per request (SP-streamable-http §4.3), so there is no
21//! connection state to carry forward — the listener calls `run_tool`
22//! directly with the freshly-derived inputs.
23//!
24//! Both routes share the same audit emission, capability gate, semaphore
25//! permit, token-broker resolution, binding-dispatch, middleware, and error
26//! mapping. Any future SP that adds a third transport (vsock, quic, …)
27//! should reach for these two entry points rather than re-implementing the
28//! switch.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::time::Instant;
33
34use atd_protocol::{Request, Response};
35
36use crate::audit::AuditSink;
37use crate::capability::CapabilitySet;
38use crate::context::CallContext;
39use crate::error::ToolCallError;
40use crate::middleware::Middleware;
41use crate::registry::Registry;
42use crate::secrets::TokenBroker;
43use crate::tier::{TierPolicy, ToolTier};
44use crate::tracker::ReadTracker;
45
46/// Transport-neutral configuration shared across listeners.
47///
48/// Carries the fields every listener needs to dispatch a `RunTool`: cwd for
49/// relative-path tools, the output / timeout budgets, the operator
50/// capability allow-list, plus the pluggable audit sink, token broker, and
51/// server identity. The Unix-socket-specific `socket_path` and the HTTP-
52/// specific `listen` / `extra_origins` / `require_bearer` live in
53/// `atd-server::ServerConfig` and `atd-server-http::HttpServerConfig`
54/// respectively (SP-streamable-http §6.3 — the configs share fields by
55/// composition rather than by trait, to keep struct-literal construction
56/// across crate boundaries ergonomic).
57pub struct SharedServerConfig {
58 pub cwd: PathBuf,
59 pub max_output_bytes: usize,
60 pub default_call_timeout_ms: u64,
61 /// Server-operator capability allow-list. The set the `Hello` handshake
62 /// intersects with on UDS, and the set the HTTP listener intersects
63 /// `BearerIdentity::granted_capabilities` against per request.
64 pub granted_capabilities: Vec<String>,
65 /// Optional audit sink for per-call observability. SP-operability-v1 C1.
66 pub audit_sink: Option<Arc<dyn AuditSink>>,
67 /// Identity string echoed in the `Hello` ack (and in the MCP
68 /// `initialize` response on HTTP). Concretely the deployed server's
69 /// name + version, e.g. `"atd-ref-server 0.3.0"`.
70 pub server_version: String,
71 /// Optional `TokenBroker` for multi-tenant secret routing.
72 /// SP-token-broker-phase1.
73 pub token_broker: Option<Arc<dyn TokenBroker>>,
74 /// Maximum UCAN-lite chain depth accepted by the verifier. Default
75 /// `5` per SP-capability-v2 spec §4.6 — prevents stack-exhaustion
76 /// attacks via pathologically deep proof chains. Override via the
77 /// listener crate's CLI flag if a specific deployment justifies it.
78 pub max_ucan_chain_depth: u8,
79 /// Optional revocation store for UCAN-lite tokens (SP-capability-v2
80 /// §4.7). When `None`, no revocation check is performed; the
81 /// connection-scoped allow-list is the only authority bound.
82 /// Adopters wrap their existing revocation table (e.g. celia's
83 /// `consent.status='revoked'`) behind this trait.
84 pub ucan_revocation_store: Option<Arc<dyn crate::ucan::UcanRevocationStore>>,
85 /// Per-frame deadline applied to reads/writes on a connection that has
86 /// already completed the `Hello` handshake. Long enough to cover a
87 /// reasonable tool call's slowest reply (e.g. `host:media.convert` at
88 /// 25s). Default `30_000` ms. SP-concurrency-baseline §5.2.
89 pub frame_deadline_active_ms: u64,
90 /// Per-frame deadline applied to the pre-Hello handshake window. Short
91 /// enough to fail fast under a single-threaded server starvation
92 /// (the §1.2 root cause of the 2026-05-12 celia incident) so the SDK
93 /// retry path can reissue against a less contended worker. Default
94 /// `5_000` ms. SP-concurrency-baseline §5.2.
95 pub frame_deadline_handshake_ms: u64,
96 /// HMAC signing key for paginated-result cursors. SP-pagination-v1 §4.5.
97 /// Production: random per server startup (so cursor forgery requires
98 /// process-state compromise). Multi-instance load-balanced deployments
99 /// share a key via env (`ATD_CURSOR_SIGNING_KEY=base64...`); the
100 /// listener crates apply this on `Server::new`. Test fixtures use a
101 /// fixed zero key — safe because they don't span processes.
102 pub cursor_signing_key: [u8; 32],
103 /// Time-to-live for paginated-result cursors, in seconds. Cursors older
104 /// than this fail verification with `ERR_CURSOR_EXPIRED` (1020).
105 /// Default `300`s (5 minutes) — long enough for one human "think"
106 /// round-trip without indefinite server-side state retention.
107 pub cursor_ttl_seconds: u64,
108}
109
110impl SharedServerConfig {
111 /// Test/CLI helper — minimal default that compiles wherever a
112 /// `SharedServerConfig` is required, but with empty allow-list (fail-
113 /// closed for capability-gated tools) and no audit sink.
114 pub fn for_test() -> Self {
115 Self {
116 cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
117 max_output_bytes: 1_048_576,
118 default_call_timeout_ms: 60_000,
119 granted_capabilities: vec![],
120 audit_sink: None,
121 server_version: "atd-runtime-test 0.0.0".into(),
122 token_broker: None,
123 max_ucan_chain_depth: 5,
124 ucan_revocation_store: None,
125 frame_deadline_active_ms: 30_000,
126 frame_deadline_handshake_ms: 5_000,
127 cursor_signing_key: [0u8; 32],
128 cursor_ttl_seconds: 300,
129 }
130 }
131}
132
133/// Shared dispatch state. Listener crates own one `Arc<ServerState>` and
134/// hand the clone to per-connection / per-request dispatch.
135///
136/// SP-streamable-http §4.3: every listener holds the **same**
137/// `Arc<Registry>`, exactly one `TierPolicy`, exactly one middleware chain,
138/// and exactly one `SharedServerConfig` snapshot. The HTTP listener does
139/// not duplicate any of this — it composes the same struct.
140pub struct ServerState {
141 pub registry: Registry,
142 pub config: SharedServerConfig,
143 pub tier_policy: TierPolicy,
144 pub middleware: Vec<Arc<dyn Middleware>>,
145 /// SP-concurrency-baseline §5.7 — hot-path counters. Updated by the
146 /// dispatch error arms below, by the per-transport accept loops
147 /// (atd-server / atd-server-http), and by Phase F follow-up wiring
148 /// in audit. Always present; counters default to zero.
149 pub metrics: Arc<crate::metrics::MetricsCounters>,
150 /// SP-pagination-v1 — process-wide HMAC issuer for paginated-result
151 /// cursors. Built once at server startup from
152 /// `SharedServerConfig.cursor_signing_key` and a fresh-random
153 /// `session_nonce`. Server restart → new nonce → outstanding cursors
154 /// expire (`ERR_CURSOR_EXPIRED`). Shared `Arc` so dispatch (verify-
155 /// continuation) and tools (issue-next-cursor via `CallContext`) see
156 /// the same nonce.
157 pub cursor_issuer: Arc<crate::cursor::CursorIssuer>,
158}
159
160/// Run the full `Request` state machine and produce a `Response`.
161///
162/// Intended for stream-oriented transports (UDS) where the listener keeps
163/// per-connection capability + caller-id state across many requests on the
164/// same socket. `caps` and `caller_id` are mutated in place when `Request`
165/// is a `Hello` — UDS `connection.rs::handle_connection` relies on this.
166///
167/// HTTP listeners should not use `dispatch_request` for `tools/call`
168/// translation; they should call [`run_tool`] directly after building a
169/// fresh per-request `CapabilitySet` from the bearer-resolved
170/// `BearerIdentity`. The `Hello` / `Ping` / `ToolList` / `ToolSchema`
171/// branches are still useful in introspection-only contexts and remain
172/// available via the same fn.
173pub async fn dispatch_request(
174 state: &Arc<ServerState>,
175 tracker: &Arc<ReadTracker>,
176 caps: &mut Arc<CapabilitySet>,
177 caller_id: &mut Option<String>,
178 req: Request,
179) -> Response {
180 state
181 .metrics
182 .dispatched_requests
183 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184 let resp = dispatch_request_inner(state, tracker, caps, caller_id, req).await;
185 if let Response::Error { code: Some(c), .. } = &resp {
186 state.metrics.record_error(*c);
187 }
188 resp
189}
190
191async fn dispatch_request_inner(
192 state: &Arc<ServerState>,
193 tracker: &Arc<ReadTracker>,
194 caps: &mut Arc<CapabilitySet>,
195 caller_id: &mut Option<String>,
196 req: Request,
197) -> Response {
198 match req {
199 Request::Ping => Response::Pong,
200 Request::Hello {
201 client_id,
202 requested_capabilities,
203 ucan_tokens,
204 } => {
205 *caller_id = client_id.clone();
206 let allow = CapabilitySet::from_iter(state.config.granted_capabilities.iter().cloned());
207 let (granted_strings_vec, _denied) = allow.intersect(&requested_capabilities);
208 let granted_strings = CapabilitySet::from_iter(granted_strings_vec);
209
210 // SP-capability-v2 Phase C: verify any presented UCAN-lite
211 // tokens and union the resulting caps with the SP-12 string
212 // allow-list result. Pre-SP-capability-v2 clients pass an
213 // empty `ucan_tokens` vec (per `#[serde(default)]`); their
214 // path is byte-identical to SP-12.
215 let granted_ucan = if ucan_tokens.is_empty() {
216 CapabilitySet::empty()
217 } else {
218 // Audience-pin requires a client_id — if the client sends
219 // ucan_tokens but no client_id, we cannot bind the audience
220 // and must reject. Spec §4.6 / §5.4 (1013 audience-mismatch
221 // family).
222 let expected_aud = match caller_id.as_ref() {
223 Some(s) if !s.is_empty() => s.clone(),
224 _ => {
225 return Response::Error {
226 message: "UCAN tokens require Hello.client_id (audience pin)"
227 .to_string(),
228 code: Some(atd_protocol::ERR_AUDIENCE_MISMATCH),
229 retryable: Some(false),
230 details: None,
231 };
232 }
233 };
234 let mut cfg = crate::ucan::VerifyConfig::new(expected_aud);
235 cfg.max_chain_depth = state.config.max_ucan_chain_depth;
236 cfg.revocation_store = state.config.ucan_revocation_store.clone();
237 match crate::ucan::verify_tokens(&ucan_tokens, &cfg, std::time::SystemTime::now()) {
238 Ok(c) => c,
239 Err(e) => {
240 let code = crate::ucan::wire_code(&e);
241 return Response::Error {
242 message: e.to_string(),
243 code: Some(code),
244 retryable: Some(false),
245 details: None,
246 };
247 }
248 }
249 };
250
251 let granted_caps = granted_strings.union(&granted_ucan);
252 let granted_vec = granted_caps.granted();
253 *caps = Arc::new(granted_caps);
254 Response::HelloAck {
255 granted_capabilities: granted_vec,
256 server_version: state.config.server_version.clone(),
257 supported_tiers: vec!["hot".into(), "warm".into(), "cold".into()],
258 }
259 }
260 Request::ToolList => {
261 let summaries: Vec<_> = state
262 .registry
263 .summaries()
264 .into_iter()
265 .filter(|s| !matches!(s.visibility, atd_protocol::ToolVisibility::Hidden))
266 .collect();
267 Response::ToolListResponse {
268 tools: serde_json::to_value(&summaries).unwrap_or_else(|_| serde_json::json!([])),
269 }
270 }
271 Request::ToolSchema { tool_id } => match state.registry.get(&tool_id) {
272 Some(entry) => Response::ToolSchemaResponse {
273 schema: serde_json::to_value(entry.definition())
274 .unwrap_or_else(|_| serde_json::json!({})),
275 },
276 None => Response::Error {
277 message: format!("tool not found: {tool_id}"),
278 code: None,
279 retryable: Some(false),
280 details: None,
281 },
282 },
283 Request::RunTool {
284 tool_id,
285 args,
286 dry_run,
287 } => {
288 run_tool(
289 state,
290 tracker,
291 caps,
292 caller_id.as_deref(),
293 tool_id,
294 args,
295 dry_run,
296 )
297 .await
298 }
299 Request::RunToolContinue { tool_id, cursor } => {
300 run_tool_continue(state, tracker, caps, caller_id.as_deref(), tool_id, cursor).await
301 }
302 }
303}
304
305/// SP-pagination-v1 §4.4 — handle a `Request::RunToolContinue`.
306///
307/// Steps:
308/// 1. Verify the cursor (HMAC + TTL + session nonce) → `CursorPayload`.
309/// 2. Reject if the cursor's `tool_id` ≠ request's `tool_id` (anti-replay).
310/// 3. Look up the tool; reject if not found, or if it doesn't override
311/// `supports_pagination()` (a cursor against a non-paginating tool is
312/// bug-or-attack territory; 1021).
313/// 4. Re-check `required_capabilities` against the current connection's
314/// capability set — caps may have changed since the cursor was issued
315/// (UCAN revocation, Hello re-negotiation).
316/// 5. Acquire the per-tool semaphore (same rate-limit envelope as `run_tool`).
317/// 6. Build a `CallContext` with the cursor issuer attached.
318/// 7. Call `tool.call_paginated(Value::Null, &ctx, Some(&cursor_str))`.
319/// Args are intentionally `Null` on continuation — original args were
320/// fingerprinted into the cursor and the tool is expected to read its
321/// state from `cursor.opaque_state`.
322/// 8. Emit one audit event tagged with `cursor_page` = payload's page_index.
323/// 9. Return `ToolResultResponse` with the new `next_cursor` from the tool.
324#[allow(clippy::too_many_arguments)]
325pub async fn run_tool_continue(
326 state: &Arc<ServerState>,
327 tracker: &Arc<ReadTracker>,
328 caps: &Arc<CapabilitySet>,
329 caller_id: Option<&str>,
330 tool_id: String,
331 cursor: String,
332) -> Response {
333 use std::sync::atomic::Ordering;
334
335 let start = Instant::now();
336 let audit_call_id = ulid::Ulid::new();
337
338 // Build an issuer from the configured signing key. The session_nonce
339 // is fresh per construction — but cursors carry the issuer's nonce
340 // from issue-time, and the issuer constructed at server startup is
341 // the one whose nonce went into the cursor. Here we need to verify
342 // against THAT issuer. SharedServerConfig holds the key but not the
343 // issuer; for v1 the listener crates construct the issuer at startup
344 // and inject it via CallContext. Until that path lands we reconstruct
345 // here using the stored key — verification of cursors issued in this
346 // same process still succeeds because the dispatch path that issued
347 // the cursor used the same key + a matching session_nonce embedded.
348 //
349 // TODO: thread Arc<CursorIssuer> through ServerState so the nonce
350 // truly survives across paginated calls in the same process. For
351 // now we accept that the first call_paginated within this fn will
352 // be reading a cursor whose session_nonce came from a sister issuer
353 // — server_session check will hold across the same process because
354 // both issuers got their nonce from the same OS RNG sequence... no
355 // wait, they didn't. Each `CursorIssuer::new` randomizes the nonce.
356 //
357 // Use the process-wide issuer from ServerState so the session_nonce
358 // matches across issue (first-page in run_tool) and verify (continuation
359 // here). One issuer per server process by design.
360 let payload = match state
361 .cursor_issuer
362 .verify(&cursor, state.config.cursor_ttl_seconds)
363 {
364 Ok(p) => p,
365 Err(crate::cursor::CursorError::Expired) => {
366 return Response::Error {
367 message: "cursor expired; re-issue the original RunTool".into(),
368 code: Some(atd_protocol::ERR_CURSOR_EXPIRED),
369 retryable: Some(false),
370 details: None,
371 };
372 }
373 Err(_) => {
374 return Response::Error {
375 message: "cursor invalid".into(),
376 code: Some(atd_protocol::ERR_CURSOR_INVALID),
377 retryable: Some(false),
378 details: None,
379 };
380 }
381 };
382
383 if payload.tool_id != tool_id {
384 return Response::Error {
385 message: format!(
386 "cursor tool_id mismatch: cursor={} request={tool_id}",
387 payload.tool_id
388 ),
389 code: Some(atd_protocol::ERR_CURSOR_INVALID),
390 retryable: Some(false),
391 details: None,
392 };
393 }
394
395 let entry = match state.registry.get(&tool_id) {
396 Some(e) => e.clone(),
397 None => {
398 return Response::Error {
399 message: format!("tool not found: {tool_id}"),
400 code: None,
401 retryable: Some(false),
402 details: None,
403 };
404 }
405 };
406
407 if !entry.tool.supports_pagination() {
408 return Response::Error {
409 message: format!("tool {tool_id} does not support pagination but received a cursor"),
410 code: Some(atd_protocol::ERR_CURSOR_INVALID),
411 retryable: Some(false),
412 details: None,
413 };
414 }
415
416 let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
417
418 // Re-check capability gating — caps can change mid-connection (UCAN revocation).
419 let required = entry.definition().required_capabilities.clone();
420 let missing: Vec<String> = required
421 .iter()
422 .filter(|c| !caps.contains(c))
423 .cloned()
424 .collect();
425 if !missing.is_empty() {
426 return Response::Error {
427 message: format!("capability denied for {tool_id}: missing {missing:?}"),
428 code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
429 retryable: Some(false),
430 details: None,
431 };
432 }
433
434 let _permit = match entry.semaphore.clone().try_acquire_owned() {
435 Ok(p) => p,
436 Err(_) => {
437 return Response::Error {
438 message: format!("rate limited for {tool_id} (continuation)"),
439 code: Some(atd_protocol::ERR_RATE_LIMITED),
440 retryable: Some(true),
441 details: None,
442 };
443 }
444 };
445
446 let tier_timeout = state.tier_policy.timeout(tier);
447 let tier_max_output = state.tier_policy.max_output(tier);
448 let ctx = CallContext::new(
449 state.config.cwd.clone(),
450 tier_max_output,
451 audit_call_id,
452 Some(Instant::now() + tier_timeout),
453 Some(tracker.clone()),
454 caps.clone(),
455 tier,
456 caller_id.map(|s| s.to_string()),
457 None, // secrets — broker resolution skipped on continuation; the
458 // tool reads continuation state from cursor.opaque_state
459 )
460 .with_cursor_issuer(state.cursor_issuer.clone());
461
462 let page_index = payload.page_index;
463 let result = entry
464 .tool
465 .call_paginated(serde_json::Value::Null, &ctx, Some(&cursor))
466 .await;
467
468 let response = match result {
469 Ok(crate::registry::PaginatedResult {
470 mut value,
471 next_cursor,
472 }) => {
473 // SP-pagination-v1 §G5 — apply the middleware chain per page so
474 // FHIR validation / PHI redaction / other egress rewrites cover
475 // continuation pages, not just first-page (RunTool) results. The
476 // celia adopter binds atd-middleware-fhir + atd-middleware-pii-
477 // redact-medical here; missing this hook would leak unredacted
478 // PHI on every continue — a real compliance defect.
479 for mw in &state.middleware {
480 mw.on_result(&tool_id, entry.definition(), &mut value);
481 }
482 Response::ToolResultResponse {
483 tool_id: tool_id.clone(),
484 result: value,
485 success: true,
486 dry_run: false,
487 next_cursor,
488 }
489 }
490 Err(crate::error::ToolCallError::ExecutionFailed {
491 code,
492 message,
493 retryable,
494 }) => Response::ToolResultResponse {
495 tool_id: tool_id.clone(),
496 result: serde_json::json!({
497 "code": code,
498 "message": message,
499 "retryable": retryable,
500 }),
501 success: false,
502 dry_run: false,
503 next_cursor: None,
504 },
505 Err(e) => Response::Error {
506 message: format!("tool {tool_id} continuation failed: {e:?}"),
507 code: None,
508 retryable: Some(false),
509 details: None,
510 },
511 };
512
513 // Audit emission with cursor_page tag (the v2 schema field).
514 if let Some(sink) = state.config.audit_sink.as_ref() {
515 state
516 .metrics
517 .audit_events_total
518 .fetch_add(1, Ordering::Relaxed);
519 let outcome = match &response {
520 Response::ToolResultResponse { success: true, .. } => crate::audit::Outcome::Success,
521 _ => crate::audit::Outcome::ExecutionFailed {
522 code: "continuation_failed".into(),
523 retryable: false,
524 },
525 };
526 sink.on_call(&crate::audit::CallEvent {
527 ts: crate::audit::now_rfc3339(),
528 call_id: audit_call_id.to_string(),
529 tool_id: tool_id.clone(),
530 caller_id: caller_id.map(|s| s.to_string()),
531 granted_capabilities: caps.granted(),
532 duration_ms: start.elapsed().as_millis() as u64,
533 outcome,
534 tier: crate::tier::tier_as_str(tier).to_string(),
535 dry_run: false,
536 schema_version: crate::audit::SCHEMA_VERSION,
537 secrets_resolved: false,
538 cursor_page: Some(page_index),
539 });
540 }
541
542 response
543}
544
545/// Execute one `RunTool` request against the shared dispatch state.
546///
547/// Single-entry-point for both transports: UDS goes through
548/// [`dispatch_request`] which forwards `Request::RunTool` here; HTTP
549/// (`atd-server-http::mcp::tools_call`) calls this fn directly. The
550/// emitted `Response` is the same enum + same JSON encoding on either
551/// route — `Response::ToolResultResponse` on success / execution-failed,
552/// `Response::Error` on capability-denied / rate-limited / broker error /
553/// tool-not-found / invalid args / internal error.
554///
555/// The body is the verbatim move of the SP-streamable-http §6.3
556/// `atd-server::connection.rs::dispatch` `RunTool` arm — every audit
557/// emission, capability check, semaphore acquire, broker call, binding
558/// dispatch, middleware step, and error map is preserved. The existing
559/// `atd-server::connection::tests` suite covers all branches and is
560/// re-routed through this fn unchanged after refactor.
561#[allow(clippy::too_many_arguments)]
562pub async fn run_tool(
563 state: &Arc<ServerState>,
564 tracker: &Arc<ReadTracker>,
565 caps: &Arc<CapabilitySet>,
566 caller_id: Option<&str>,
567 tool_id: String,
568 args: serde_json::Value,
569 dry_run: bool,
570) -> Response {
571 // SP-operability-v1 C1: per-call audit scaffolding. `start` measures
572 // wall-clock duration from dispatch entry; `audit_call_id` is the
573 // stable id put on `CallEvent` regardless of which return branch
574 // fires. Emission is a no-op when `audit_sink` is None.
575 let start = Instant::now();
576 let audit_call_id = ulid::Ulid::new();
577 // SP-pagination-v1 — cursor_page is captured-mut so paginated paths can
578 // tag the audit event with the page index (1 for first, 2+ for continues).
579 // Non-paginated dispatches leave it None and the field is omitted on
580 // the wire via #[serde(skip_serializing_if = "Option::is_none")].
581 let cursor_page: Option<u32> = None;
582 let emit = |outcome: crate::audit::Outcome, tier: ToolTier, secrets_resolved: bool| {
583 if let Some(sink) = state.config.audit_sink.as_ref() {
584 state
585 .metrics
586 .audit_events_total
587 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
588 sink.on_call(&crate::audit::CallEvent {
589 ts: crate::audit::now_rfc3339(),
590 call_id: audit_call_id.to_string(),
591 tool_id: tool_id.clone(),
592 caller_id: caller_id.map(|s| s.to_string()),
593 granted_capabilities: caps.granted(),
594 duration_ms: start.elapsed().as_millis() as u64,
595 outcome,
596 tier: crate::tier::tier_as_str(tier).to_string(),
597 dry_run,
598 schema_version: crate::audit::SCHEMA_VERSION,
599 secrets_resolved,
600 cursor_page,
601 });
602 }
603 };
604
605 if dry_run {
606 // Dry-run short-circuits BEFORE tier derivation — use Warm as the
607 // placeholder tier for the audit event.
608 emit(crate::audit::Outcome::Success, ToolTier::Warm, false);
609 return Response::ToolResultResponse {
610 tool_id: tool_id.clone(),
611 result: serde_json::json!({
612 "dry_run": true,
613 "tool_id": tool_id,
614 "args_preview": args,
615 }),
616 success: true,
617 dry_run: true,
618 next_cursor: None,
619 };
620 }
621 let entry = match state.registry.get(&tool_id) {
622 Some(e) => e.clone(),
623 None => {
624 emit(crate::audit::Outcome::ToolNotFound, ToolTier::Warm, false);
625 return Response::Error {
626 message: format!("tool not found: {tool_id}"),
627 code: None,
628 retryable: Some(false),
629 details: None,
630 };
631 }
632 };
633 let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
634 // SP-12 Task 2: capability enforcement.
635 let required = entry.definition().required_capabilities.clone();
636 let missing: Vec<String> = required
637 .iter()
638 .filter(|c| !caps.contains(c))
639 .cloned()
640 .collect();
641 if !missing.is_empty() {
642 let mut required_sorted = required.clone();
643 required_sorted.sort();
644 let mut missing_sorted = missing.clone();
645 missing_sorted.sort();
646 emit(
647 crate::audit::Outcome::CapabilityDenied {
648 missing: missing_sorted.clone(),
649 },
650 tier,
651 false,
652 );
653 return Response::Error {
654 message: format!("capability denied for {tool_id}: missing {missing_sorted:?}"),
655 code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
656 retryable: Some(false),
657 details: Some(serde_json::json!({
658 "required": required_sorted,
659 "granted": caps.granted(),
660 "missing": missing_sorted,
661 })),
662 };
663 }
664 // SP-operability-v1 C2: rate-limit enforcement.
665 let _permit = match entry.semaphore.clone().try_acquire_owned() {
666 Ok(p) => p,
667 Err(_) => {
668 let max_conc = entry.tool.definition().resources.max_concurrent;
669 emit(
670 crate::audit::Outcome::RateLimited {
671 retry_after_ms: None,
672 },
673 tier,
674 false,
675 );
676 return Response::Error {
677 message: format!("rate limited for {tool_id}: max_concurrent={max_conc} in-flight"),
678 code: Some(atd_protocol::ERR_RATE_LIMITED),
679 retryable: Some(true),
680 details: Some(serde_json::json!({
681 "tool_id": tool_id,
682 "limit": max_conc,
683 })),
684 };
685 }
686 };
687
688 // SP-token-broker-phase1: resolve secrets via the configured TokenBroker.
689 let secrets = match state.config.token_broker.as_ref() {
690 None => None,
691 Some(broker) => match broker.resolve(caller_id).await {
692 Ok(bundle) => bundle,
693 Err(e) => {
694 emit(
695 crate::audit::Outcome::ExecutionFailed {
696 code: "broker_error".into(),
697 retryable: true,
698 },
699 tier,
700 false,
701 );
702 return Response::Error {
703 message: format!("token broker error for {tool_id}: {e}"),
704 code: Some(atd_protocol::ERR_BROKER_FAILED),
705 retryable: Some(true),
706 details: None,
707 };
708 }
709 },
710 };
711 let secrets_resolved = secrets.is_some();
712 let tier_timeout = state.tier_policy.timeout(tier);
713 let tier_max_output = state.tier_policy.max_output(tier);
714
715 // SP-pagination-v1 §4.4 — paginated tools bypass the Binding layer so
716 // they can emit cursors on the first page. Non-paginated tools keep
717 // going through binding so CLI / future MCP / REST bindings stay
718 // intact. The check is one bool dispatch per call — negligible.
719 let ctx = if entry.tool.supports_pagination() {
720 CallContext::new(
721 state.config.cwd.clone(),
722 tier_max_output,
723 audit_call_id,
724 Some(Instant::now() + tier_timeout),
725 Some(tracker.clone()),
726 caps.clone(),
727 tier,
728 caller_id.map(|s| s.to_string()),
729 secrets,
730 )
731 .with_cursor_issuer(state.cursor_issuer.clone())
732 } else {
733 CallContext::new(
734 state.config.cwd.clone(),
735 tier_max_output,
736 audit_call_id,
737 Some(Instant::now() + tier_timeout),
738 Some(tracker.clone()),
739 caps.clone(),
740 tier,
741 caller_id.map(|s| s.to_string()),
742 secrets,
743 )
744 };
745
746 // Two paths: paginated tools call `Tool::call_paginated` directly with
747 // cursor=None (first page); non-paginated tools go through the Binding
748 // (preserves CLI / native dispatch semantics).
749 let call_result: Result<(serde_json::Value, Option<String>), ToolCallError> =
750 if entry.tool.supports_pagination() {
751 entry
752 .tool
753 .call_paginated(args, &ctx, None)
754 .await
755 .map(|p| (p.value, p.next_cursor))
756 } else {
757 entry
758 .binding
759 .call(entry.definition(), args, &ctx)
760 .await
761 .map(|v| (v, None))
762 };
763 match call_result {
764 Ok((mut data, next_cursor)) => {
765 for mw in &state.middleware {
766 mw.on_result(&tool_id, entry.definition(), &mut data);
767 }
768 emit(crate::audit::Outcome::Success, tier, secrets_resolved);
769 Response::ToolResultResponse {
770 tool_id,
771 result: data,
772 success: true,
773 dry_run: false,
774 next_cursor,
775 }
776 }
777 Err(ToolCallError::InvalidArgs(msg)) => {
778 emit(
779 crate::audit::Outcome::InvalidArgs {
780 message: msg.clone(),
781 },
782 tier,
783 secrets_resolved,
784 );
785 Response::Error {
786 message: format!("invalid args for {tool_id}: {msg}"),
787 code: None,
788 retryable: Some(false),
789 details: None,
790 }
791 }
792 Err(ToolCallError::ExecutionFailed {
793 code,
794 message,
795 retryable,
796 }) => {
797 emit(
798 crate::audit::Outcome::ExecutionFailed {
799 code: code.clone(),
800 retryable,
801 },
802 tier,
803 secrets_resolved,
804 );
805 Response::ToolResultResponse {
806 tool_id,
807 result: serde_json::json!({
808 "code": code,
809 "message": message,
810 "retryable": retryable,
811 }),
812 success: false,
813 dry_run: false,
814 next_cursor: None,
815 }
816 }
817 Err(ToolCallError::InternalError(msg)) => {
818 emit(
819 crate::audit::Outcome::ExecutionFailed {
820 code: "INTERNAL".into(),
821 retryable: false,
822 },
823 tier,
824 secrets_resolved,
825 );
826 Response::Error {
827 message: format!("internal error in {tool_id}: {msg}"),
828 code: None,
829 retryable: Some(false),
830 details: None,
831 }
832 }
833 Err(other) => {
834 emit(
835 crate::audit::Outcome::ExecutionFailed {
836 code: "UNHANDLED".into(),
837 retryable: false,
838 },
839 tier,
840 secrets_resolved,
841 );
842 Response::Error {
843 message: format!("unhandled tool error in {tool_id}: {other}"),
844 code: Some(1999),
845 retryable: Some(false),
846 details: None,
847 }
848 }
849 }
850}