atd_runtime/dispatch.rs
1//! Transport-neutral dispatch — the shared core of ATD-speaking servers.
2//!
3//! SP-streamable-http §4.3 + §6.3: the body of `atd-server::connection.rs`'s
4//! per-request switch moves here so the Unix-socket listener and the
5//! Streamable-HTTP listener (`atd-server-http`) drive the **same** dispatch
6//! state machine over the **same** `Arc<Registry>`. The wire shape returned
7//! is byte-identical regardless of which transport delivered the request —
8//! this property is exercised by `atd-server-http`'s UDS↔HTTP parity test.
9//!
10//! Two entry points:
11//!
12//! 1. [`dispatch_request`] — handles the full `Request` enum (Ping / Hello /
13//! ToolList / ToolSchema / RunTool). UDS uses this from
14//! `handle_connection`'s read loop; capability set + caller identity are
15//! mutated in place because `Hello` rewrites both for the lifetime of the
16//! connection.
17//!
18//! 2. [`run_tool`] — the `RunTool` arm extracted for HTTP. HTTP requests
19//! derive their `CapabilitySet` and `caller_id` fresh from a Bearer-token
20//! lookup per request (SP-streamable-http §4.3), so there is no
21//! connection state to carry forward — the listener calls `run_tool`
22//! directly with the freshly-derived inputs.
23//!
24//! Both routes share the same audit emission, capability gate, semaphore
25//! permit, token-broker resolution, binding-dispatch, middleware, and error
26//! mapping. Any future SP that adds a third transport (vsock, quic, …)
27//! should reach for these two entry points rather than re-implementing the
28//! switch.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::time::Instant;
33
34use atd_protocol::{Request, Response};
35
36use crate::audit::AuditSink;
37use crate::capability::CapabilitySet;
38use crate::context::CallContext;
39use crate::error::ToolCallError;
40use crate::middleware::Middleware;
41use crate::registry::Registry;
42use crate::secrets::TokenBroker;
43use crate::tier::{TierPolicy, ToolTier};
44use crate::tracker::ReadTracker;
45
46/// Transport-neutral configuration shared across listeners.
47///
48/// Carries the fields every listener needs to dispatch a `RunTool`: cwd for
49/// relative-path tools, the output / timeout budgets, the operator
50/// capability allow-list, plus the pluggable audit sink, token broker, and
51/// server identity. The Unix-socket-specific `socket_path` and the HTTP-
52/// specific `listen` / `extra_origins` / `require_bearer` live in
53/// `atd-server::ServerConfig` and `atd-server-http::HttpServerConfig`
54/// respectively (SP-streamable-http §6.3 — the configs share fields by
55/// composition rather than by trait, to keep struct-literal construction
56/// across crate boundaries ergonomic).
57pub struct SharedServerConfig {
58 pub cwd: PathBuf,
59 pub max_output_bytes: usize,
60 pub default_call_timeout_ms: u64,
61 /// Server-operator capability allow-list. The set the `Hello` handshake
62 /// intersects with on UDS, and the set the HTTP listener intersects
63 /// `BearerIdentity::granted_capabilities` against per request.
64 pub granted_capabilities: Vec<String>,
65 /// Optional audit sink for per-call observability. SP-operability-v1 C1.
66 pub audit_sink: Option<Arc<dyn AuditSink>>,
67 /// Identity string echoed in the `Hello` ack (and in the MCP
68 /// `initialize` response on HTTP). Concretely the deployed server's
69 /// name + version, e.g. `"atd-ref-server 0.3.0"`.
70 pub server_version: String,
71 /// Optional `TokenBroker` for multi-tenant secret routing.
72 /// SP-token-broker-phase1.
73 pub token_broker: Option<Arc<dyn TokenBroker>>,
74 /// Maximum UCAN-lite chain depth accepted by the verifier. Default
75 /// `5` per SP-capability-v2 spec §4.6 — prevents stack-exhaustion
76 /// attacks via pathologically deep proof chains. Override via the
77 /// listener crate's CLI flag if a specific deployment justifies it.
78 pub max_ucan_chain_depth: u8,
79 /// Optional revocation store for UCAN-lite tokens (SP-capability-v2
80 /// §4.7). When `None`, no revocation check is performed; the
81 /// connection-scoped allow-list is the only authority bound.
82 /// Adopters wrap their existing revocation table (e.g. celia's
83 /// `consent.status='revoked'`) behind this trait.
84 pub ucan_revocation_store: Option<Arc<dyn crate::ucan::UcanRevocationStore>>,
85 /// Per-frame deadline applied to reads/writes on a connection that has
86 /// already completed the `Hello` handshake. Long enough to cover a
87 /// reasonable tool call's slowest reply (e.g. `host:media.convert` at
88 /// 25s). Default `30_000` ms. SP-concurrency-baseline §5.2.
89 pub frame_deadline_active_ms: u64,
90 /// Per-frame deadline applied to the pre-Hello handshake window. Short
91 /// enough to fail fast under a single-threaded server starvation
92 /// (the §1.2 root cause of the 2026-05-12 celia incident) so the SDK
93 /// retry path can reissue against a less contended worker. Default
94 /// `5_000` ms. SP-concurrency-baseline §5.2.
95 pub frame_deadline_handshake_ms: u64,
96 /// HMAC signing key for paginated-result cursors. SP-pagination-v1 §4.5.
97 /// Production: random per server startup (so cursor forgery requires
98 /// process-state compromise). Multi-instance load-balanced deployments
99 /// share a key via env (`ATD_CURSOR_SIGNING_KEY=base64...`); the
100 /// listener crates apply this on `Server::new`. Test fixtures use a
101 /// fixed zero key — safe because they don't span processes.
102 pub cursor_signing_key: [u8; 32],
103 /// Time-to-live for paginated-result cursors, in seconds. Cursors older
104 /// than this fail verification with `ERR_CURSOR_EXPIRED` (1020).
105 /// Default `300`s (5 minutes) — long enough for one human "think"
106 /// round-trip without indefinite server-side state retention.
107 pub cursor_ttl_seconds: u64,
108}
109
110impl SharedServerConfig {
111 /// Test/CLI helper — minimal default that compiles wherever a
112 /// `SharedServerConfig` is required, but with empty allow-list (fail-
113 /// closed for capability-gated tools) and no audit sink.
114 pub fn for_test() -> Self {
115 Self {
116 cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
117 max_output_bytes: 1_048_576,
118 default_call_timeout_ms: 60_000,
119 granted_capabilities: vec![],
120 audit_sink: None,
121 server_version: "atd-runtime-test 0.0.0".into(),
122 token_broker: None,
123 max_ucan_chain_depth: 5,
124 ucan_revocation_store: None,
125 frame_deadline_active_ms: 30_000,
126 frame_deadline_handshake_ms: 5_000,
127 cursor_signing_key: [0u8; 32],
128 cursor_ttl_seconds: 300,
129 }
130 }
131}
132
133/// Shared dispatch state. Listener crates own one `Arc<ServerState>` and
134/// hand the clone to per-connection / per-request dispatch.
135///
136/// SP-streamable-http §4.3: every listener holds the **same**
137/// `Arc<Registry>`, exactly one `TierPolicy`, exactly one middleware chain,
138/// and exactly one `SharedServerConfig` snapshot. The HTTP listener does
139/// not duplicate any of this — it composes the same struct.
140pub struct ServerState {
141 pub registry: Registry,
142 pub config: SharedServerConfig,
143 pub tier_policy: TierPolicy,
144 pub middleware: Vec<Arc<dyn Middleware>>,
145 /// SP-concurrency-baseline §5.7 — hot-path counters. Updated by the
146 /// dispatch error arms below, by the per-transport accept loops
147 /// (atd-server / atd-server-http), and by Phase F follow-up wiring
148 /// in audit. Always present; counters default to zero.
149 pub metrics: Arc<crate::metrics::MetricsCounters>,
150 /// SP-pagination-v1 — process-wide HMAC issuer for paginated-result
151 /// cursors. Built once at server startup from
152 /// `SharedServerConfig.cursor_signing_key` and a fresh-random
153 /// `session_nonce`. Server restart → new nonce → outstanding cursors
154 /// expire (`ERR_CURSOR_EXPIRED`). Shared `Arc` so dispatch (verify-
155 /// continuation) and tools (issue-next-cursor via `CallContext`) see
156 /// the same nonce.
157 pub cursor_issuer: Arc<crate::cursor::CursorIssuer>,
158}
159
160/// Run the full `Request` state machine and produce a `Response`.
161///
162/// Intended for stream-oriented transports (UDS) where the listener keeps
163/// per-connection capability + caller-id state across many requests on the
164/// same socket. `caps` and `caller_id` are mutated in place when `Request`
165/// is a `Hello` — UDS `connection.rs::handle_connection` relies on this.
166///
167/// HTTP listeners should not use `dispatch_request` for `tools/call`
168/// translation; they should call [`run_tool`] directly after building a
169/// fresh per-request `CapabilitySet` from the bearer-resolved
170/// `BearerIdentity`. The `Hello` / `Ping` / `ToolList` / `ToolSchema`
171/// branches are still useful in introspection-only contexts and remain
172/// available via the same fn.
173pub async fn dispatch_request(
174 state: &Arc<ServerState>,
175 tracker: &Arc<ReadTracker>,
176 caps: &mut Arc<CapabilitySet>,
177 caller_id: &mut Option<String>,
178 req: Request,
179) -> Response {
180 state
181 .metrics
182 .dispatched_requests
183 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184 let resp = dispatch_request_inner(state, tracker, caps, caller_id, req).await;
185 if let Response::Error { code: Some(c), .. } = &resp {
186 state.metrics.record_error(*c);
187 }
188 resp
189}
190
191async fn dispatch_request_inner(
192 state: &Arc<ServerState>,
193 tracker: &Arc<ReadTracker>,
194 caps: &mut Arc<CapabilitySet>,
195 caller_id: &mut Option<String>,
196 req: Request,
197) -> Response {
198 match req {
199 Request::Ping => Response::Pong,
200 Request::Hello {
201 client_id,
202 requested_capabilities,
203 ucan_tokens,
204 } => {
205 *caller_id = client_id.clone();
206 let allow = CapabilitySet::from_iter(state.config.granted_capabilities.iter().cloned());
207 let (granted_strings_vec, _denied) = allow.intersect(&requested_capabilities);
208 // SP-observability-completeness-v1 Axis C: attribute each
209 // string-allow-list grant to its source for the audit sink.
210 let string_provenance = granted_strings_vec
211 .iter()
212 .map(|c| crate::audit::CapProvenance {
213 cap: c.clone(),
214 source: crate::audit::ProvSource::StringAllowList,
215 })
216 .collect();
217 let granted_strings =
218 CapabilitySet::with_provenance(granted_strings_vec, string_provenance);
219
220 // SP-capability-v2 Phase C: verify any presented UCAN-lite
221 // tokens and union the resulting caps with the SP-12 string
222 // allow-list result. Pre-SP-capability-v2 clients pass an
223 // empty `ucan_tokens` vec (per `#[serde(default)]`); their
224 // path is byte-identical to SP-12.
225 let granted_ucan = if ucan_tokens.is_empty() {
226 CapabilitySet::empty()
227 } else {
228 // Audience-pin requires a client_id — if the client sends
229 // ucan_tokens but no client_id, we cannot bind the audience
230 // and must reject. Spec §4.6 / §5.4 (1013 audience-mismatch
231 // family).
232 let expected_aud = match caller_id.as_ref() {
233 Some(s) if !s.is_empty() => s.clone(),
234 _ => {
235 return Response::Error {
236 message: "UCAN tokens require Hello.client_id (audience pin)"
237 .to_string(),
238 code: Some(atd_protocol::ERR_AUDIENCE_MISMATCH),
239 retryable: Some(false),
240 details: None,
241 };
242 }
243 };
244 let mut cfg = crate::ucan::VerifyConfig::new(expected_aud);
245 cfg.max_chain_depth = state.config.max_ucan_chain_depth;
246 cfg.revocation_store = state.config.ucan_revocation_store.clone();
247 match crate::ucan::verify_tokens(&ucan_tokens, &cfg, std::time::SystemTime::now()) {
248 Ok(c) => c,
249 Err(e) => {
250 let code = crate::ucan::wire_code(&e);
251 return Response::Error {
252 message: e.to_string(),
253 code: Some(code),
254 retryable: Some(false),
255 details: None,
256 };
257 }
258 }
259 };
260
261 let granted_caps = granted_strings.union(&granted_ucan);
262 let granted_vec = granted_caps.granted();
263 *caps = Arc::new(granted_caps);
264 Response::HelloAck {
265 granted_capabilities: granted_vec,
266 server_version: state.config.server_version.clone(),
267 supported_tiers: vec!["hot".into(), "warm".into(), "cold".into()],
268 }
269 }
270 Request::ToolList => {
271 let summaries: Vec<_> = state
272 .registry
273 .summaries()
274 .into_iter()
275 .filter(|s| !matches!(s.visibility, atd_protocol::ToolVisibility::Hidden))
276 .collect();
277 Response::ToolListResponse {
278 tools: serde_json::to_value(&summaries).unwrap_or_else(|_| serde_json::json!([])),
279 }
280 }
281 Request::ToolSchema { tool_id } => match state.registry.get(&tool_id) {
282 Some(entry) => Response::ToolSchemaResponse {
283 schema: serde_json::to_value(entry.definition())
284 .unwrap_or_else(|_| serde_json::json!({})),
285 },
286 None => Response::Error {
287 message: format!("tool not found: {tool_id}"),
288 code: None,
289 retryable: Some(false),
290 details: None,
291 },
292 },
293 Request::RunTool {
294 tool_id,
295 args,
296 dry_run,
297 } => {
298 run_tool(
299 state,
300 tracker,
301 caps,
302 caller_id.as_deref(),
303 tool_id,
304 args,
305 dry_run,
306 )
307 .await
308 }
309 Request::RunToolContinue { tool_id, cursor } => {
310 run_tool_continue(state, tracker, caps, caller_id.as_deref(), tool_id, cursor).await
311 }
312 }
313}
314
315/// SP-pagination-v1 §4.4 — handle a `Request::RunToolContinue`.
316///
317/// Steps:
318/// 1. Verify the cursor (HMAC + TTL + session nonce) → `CursorPayload`.
319/// 2. Reject if the cursor's `tool_id` ≠ request's `tool_id` (anti-replay).
320/// 3. Look up the tool; reject if not found, or if it doesn't override
321/// `supports_pagination()` (a cursor against a non-paginating tool is
322/// bug-or-attack territory; 1021).
323/// 4. Re-check `required_capabilities` against the current connection's
324/// capability set — caps may have changed since the cursor was issued
325/// (UCAN revocation, Hello re-negotiation).
326/// 5. Acquire the per-tool semaphore (same rate-limit envelope as `run_tool`).
327/// 6. Build a `CallContext` with the cursor issuer attached.
328/// 7. Call `tool.call_paginated(Value::Null, &ctx, Some(&cursor_str))`.
329/// Args are intentionally `Null` on continuation — original args were
330/// fingerprinted into the cursor and the tool is expected to read its
331/// state from `cursor.opaque_state`.
332/// 8. Emit one audit event tagged with `cursor_page` = payload's page_index.
333/// 9. Return `ToolResultResponse` with the new `next_cursor` from the tool.
334#[allow(clippy::too_many_arguments)]
335pub async fn run_tool_continue(
336 state: &Arc<ServerState>,
337 tracker: &Arc<ReadTracker>,
338 caps: &Arc<CapabilitySet>,
339 caller_id: Option<&str>,
340 tool_id: String,
341 cursor: String,
342) -> Response {
343 use std::sync::atomic::Ordering;
344
345 let start = Instant::now();
346 let audit_call_id = ulid::Ulid::new();
347
348 // Build an issuer from the configured signing key. The session_nonce
349 // is fresh per construction — but cursors carry the issuer's nonce
350 // from issue-time, and the issuer constructed at server startup is
351 // the one whose nonce went into the cursor. Here we need to verify
352 // against THAT issuer. SharedServerConfig holds the key but not the
353 // issuer; for v1 the listener crates construct the issuer at startup
354 // and inject it via CallContext. Until that path lands we reconstruct
355 // here using the stored key — verification of cursors issued in this
356 // same process still succeeds because the dispatch path that issued
357 // the cursor used the same key + a matching session_nonce embedded.
358 //
359 // TODO: thread Arc<CursorIssuer> through ServerState so the nonce
360 // truly survives across paginated calls in the same process. For
361 // now we accept that the first call_paginated within this fn will
362 // be reading a cursor whose session_nonce came from a sister issuer
363 // — server_session check will hold across the same process because
364 // both issuers got their nonce from the same OS RNG sequence... no
365 // wait, they didn't. Each `CursorIssuer::new` randomizes the nonce.
366 //
367 // Use the process-wide issuer from ServerState so the session_nonce
368 // matches across issue (first-page in run_tool) and verify (continuation
369 // here). One issuer per server process by design.
370 let payload = match state
371 .cursor_issuer
372 .verify(&cursor, state.config.cursor_ttl_seconds)
373 {
374 Ok(p) => p,
375 Err(crate::cursor::CursorError::Expired) => {
376 return Response::Error {
377 message: "cursor expired; re-issue the original RunTool".into(),
378 code: Some(atd_protocol::ERR_CURSOR_EXPIRED),
379 retryable: Some(false),
380 details: None,
381 };
382 }
383 Err(_) => {
384 return Response::Error {
385 message: "cursor invalid".into(),
386 code: Some(atd_protocol::ERR_CURSOR_INVALID),
387 retryable: Some(false),
388 details: None,
389 };
390 }
391 };
392
393 if payload.tool_id != tool_id {
394 return Response::Error {
395 message: format!(
396 "cursor tool_id mismatch: cursor={} request={tool_id}",
397 payload.tool_id
398 ),
399 code: Some(atd_protocol::ERR_CURSOR_INVALID),
400 retryable: Some(false),
401 details: None,
402 };
403 }
404
405 let entry = match state.registry.get(&tool_id) {
406 Some(e) => e.clone(),
407 None => {
408 return Response::Error {
409 message: format!("tool not found: {tool_id}"),
410 code: None,
411 retryable: Some(false),
412 details: None,
413 };
414 }
415 };
416
417 if !entry.tool.supports_pagination() {
418 return Response::Error {
419 message: format!("tool {tool_id} does not support pagination but received a cursor"),
420 code: Some(atd_protocol::ERR_CURSOR_INVALID),
421 retryable: Some(false),
422 details: None,
423 };
424 }
425
426 let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
427
428 // Re-check capability gating — caps can change mid-connection (UCAN revocation).
429 let required = entry.definition().required_capabilities.clone();
430 let missing: Vec<String> = required
431 .iter()
432 .filter(|c| !caps.contains(c))
433 .cloned()
434 .collect();
435 if !missing.is_empty() {
436 return Response::Error {
437 message: format!("capability denied for {tool_id}: missing {missing:?}"),
438 code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
439 retryable: Some(false),
440 details: None,
441 };
442 }
443
444 let _permit = match entry.semaphore.clone().try_acquire_owned() {
445 Ok(p) => p,
446 Err(_) => {
447 return Response::Error {
448 message: format!("rate limited for {tool_id} (continuation)"),
449 code: Some(atd_protocol::ERR_RATE_LIMITED),
450 retryable: Some(true),
451 details: None,
452 };
453 }
454 };
455
456 let tier_timeout = state.tier_policy.timeout(tier);
457 let tier_max_output = state.tier_policy.max_output(tier);
458 let ctx = CallContext::new(
459 state.config.cwd.clone(),
460 tier_max_output,
461 audit_call_id,
462 Some(Instant::now() + tier_timeout),
463 Some(tracker.clone()),
464 caps.clone(),
465 tier,
466 caller_id.map(|s| s.to_string()),
467 None, // secrets — broker resolution skipped on continuation; the
468 // tool reads continuation state from cursor.opaque_state
469 )
470 .with_cursor_issuer(state.cursor_issuer.clone());
471
472 let page_index = payload.page_index;
473 let result = entry
474 .tool
475 .call_paginated(serde_json::Value::Null, &ctx, Some(&cursor))
476 .await;
477
478 let response = match result {
479 Ok(crate::registry::PaginatedResult {
480 mut value,
481 next_cursor,
482 }) => {
483 // SP-pagination-v1 §G5 — apply the middleware chain per page so
484 // FHIR validation / PHI redaction / other egress rewrites cover
485 // continuation pages, not just first-page (RunTool) results. The
486 // celia adopter binds atd-middleware-fhir + atd-middleware-pii-
487 // redact-medical here; missing this hook would leak unredacted
488 // PHI on every continue — a real compliance defect.
489 run_result_middleware(state, &tool_id, entry.definition(), &mut value);
490 Response::ToolResultResponse {
491 tool_id: tool_id.clone(),
492 result: value,
493 success: true,
494 dry_run: false,
495 next_cursor,
496 }
497 }
498 Err(crate::error::ToolCallError::ExecutionFailed {
499 code,
500 message,
501 retryable,
502 }) => {
503 // SP-observability-completeness-v1 Axis A — continuation failure
504 // result is a Value reaching the LLM; redact via on_result.
505 let mut value = serde_json::json!({
506 "code": code,
507 "message": message,
508 "retryable": retryable,
509 });
510 run_result_middleware(state, &tool_id, entry.definition(), &mut value);
511 Response::ToolResultResponse {
512 tool_id: tool_id.clone(),
513 result: value,
514 success: false,
515 dry_run: false,
516 next_cursor: None,
517 }
518 }
519 Err(e) => {
520 // Axis A — `{e:?}` can embed PHI via a Debug impl; redact.
521 let mut message = format!("tool {tool_id} continuation failed: {e:?}");
522 let mut details = None;
523 run_error_middleware(
524 state,
525 &tool_id,
526 entry.definition(),
527 &mut message,
528 &mut details,
529 );
530 Response::Error {
531 message,
532 code: None,
533 retryable: Some(false),
534 details,
535 }
536 }
537 };
538
539 // Audit emission with cursor_page tag (the v2 schema field).
540 if let Some(sink) = state.config.audit_sink.as_ref() {
541 state
542 .metrics
543 .audit_events_total
544 .fetch_add(1, Ordering::Relaxed);
545 let outcome = match &response {
546 Response::ToolResultResponse { success: true, .. } => crate::audit::Outcome::Success,
547 _ => crate::audit::Outcome::ExecutionFailed {
548 code: "continuation_failed".into(),
549 retryable: false,
550 },
551 };
552 sink.on_call(&crate::audit::CallEvent {
553 ts: crate::audit::now_rfc3339(),
554 call_id: audit_call_id.to_string(),
555 tool_id: tool_id.clone(),
556 caller_id: caller_id.map(|s| s.to_string()),
557 granted_capabilities: caps.granted(),
558 duration_ms: start.elapsed().as_millis() as u64,
559 outcome,
560 tier: crate::tier::tier_as_str(tier).to_string(),
561 dry_run: false,
562 schema_version: crate::audit::SCHEMA_VERSION,
563 secrets_resolved: false,
564 cursor_page: Some(page_index),
565 capability_provenance: prov_for(caps),
566 });
567 }
568
569 response
570}
571
572/// Execute one `RunTool` request against the shared dispatch state.
573///
574/// Single-entry-point for both transports: UDS goes through
575/// [`dispatch_request`] which forwards `Request::RunTool` here; HTTP
576/// (`atd-server-http::mcp::tools_call`) calls this fn directly. The
577/// emitted `Response` is the same enum + same JSON encoding on either
578/// route — `Response::ToolResultResponse` on success / execution-failed,
579/// `Response::Error` on capability-denied / rate-limited / broker error /
580/// tool-not-found / invalid args / internal error.
581///
582/// The body is the verbatim move of the SP-streamable-http §6.3
583/// `atd-server::connection.rs::dispatch` `RunTool` arm — every audit
584/// emission, capability check, semaphore acquire, broker call, binding
585/// dispatch, middleware step, and error map is preserved. The existing
586/// `atd-server::connection::tests` suite covers all branches and is
587/// re-routed through this fn unchanged after refactor.
588/// SP-observability-completeness-v1 Axis C — lift a connection's capability
589/// provenance into the optional `CallEvent.capability_provenance` field.
590/// `None` when nothing was recorded, so the field stays omitted on the wire
591/// for the common (no-provenance) path.
592fn prov_for(caps: &CapabilitySet) -> Option<Vec<crate::audit::CapProvenance>> {
593 let p = caps.provenance();
594 if p.is_empty() { None } else { Some(p.to_vec()) }
595}
596
597/// SP-observability-completeness-v1 Axis A — run the egress `on_error`
598/// middleware chain over a tool-scoped failure reply (`Response::Error`).
599/// Called at EVERY tool-scoped error exit (invalid-args / internal /
600/// unhandled / capability-denied / rate-limited / broker-failed) so PHI in
601/// failure text or details can't bypass redaction.
602///
603/// **Out of scope by design:** `ToolNotFound` and Hello-handshake errors
604/// (UCAN verify, missing client_id). They carry framework *control* text — a
605/// client-supplied `tool_id`, a UCAN verify reason — with no tool/data body,
606/// and have no `ToolDefinition` to drive per-tool middleware. PHI lives in
607/// tool results/args, not in "tool not found"-class control strings.
608fn run_error_middleware(
609 state: &ServerState,
610 tool_id: &str,
611 def: &atd_protocol::ToolDefinition,
612 message: &mut String,
613 details: &mut Option<serde_json::Value>,
614) {
615 for mw in &state.middleware {
616 mw.on_error(tool_id, def, message, details);
617 }
618}
619
620/// SP-observability-completeness-v1 Axis A — run the egress `on_result`
621/// middleware chain over a result Value: the success result, and the
622/// `ExecutionFailed` failure envelope (a result-shaped Value that reaches the
623/// LLM). One chokepoint so success and failure results redact identically.
624fn run_result_middleware(
625 state: &ServerState,
626 tool_id: &str,
627 def: &atd_protocol::ToolDefinition,
628 value: &mut serde_json::Value,
629) {
630 for mw in &state.middleware {
631 mw.on_result(tool_id, def, value);
632 }
633}
634
635#[allow(clippy::too_many_arguments)]
636pub async fn run_tool(
637 state: &Arc<ServerState>,
638 tracker: &Arc<ReadTracker>,
639 caps: &Arc<CapabilitySet>,
640 caller_id: Option<&str>,
641 tool_id: String,
642 args: serde_json::Value,
643 dry_run: bool,
644) -> Response {
645 // SP-operability-v1 C1: per-call audit scaffolding. `start` measures
646 // wall-clock duration from dispatch entry; `audit_call_id` is the
647 // stable id put on `CallEvent` regardless of which return branch
648 // fires. Emission is a no-op when `audit_sink` is None.
649 let start = Instant::now();
650 let audit_call_id = ulid::Ulid::new();
651 // SP-pagination-v1 — cursor_page is captured-mut so paginated paths can
652 // tag the audit event with the page index (1 for first, 2+ for continues).
653 // Non-paginated dispatches leave it None and the field is omitted on
654 // the wire via #[serde(skip_serializing_if = "Option::is_none")].
655 let cursor_page: Option<u32> = None;
656 let emit = |outcome: crate::audit::Outcome, tier: ToolTier, secrets_resolved: bool| {
657 if let Some(sink) = state.config.audit_sink.as_ref() {
658 state
659 .metrics
660 .audit_events_total
661 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
662 sink.on_call(&crate::audit::CallEvent {
663 ts: crate::audit::now_rfc3339(),
664 call_id: audit_call_id.to_string(),
665 tool_id: tool_id.clone(),
666 caller_id: caller_id.map(|s| s.to_string()),
667 granted_capabilities: caps.granted(),
668 duration_ms: start.elapsed().as_millis() as u64,
669 outcome,
670 tier: crate::tier::tier_as_str(tier).to_string(),
671 dry_run,
672 schema_version: crate::audit::SCHEMA_VERSION,
673 secrets_resolved,
674 cursor_page,
675 capability_provenance: prov_for(caps),
676 });
677 }
678 };
679
680 if dry_run {
681 // Dry-run short-circuits BEFORE tier derivation — use Warm as the
682 // placeholder tier for the audit event.
683 emit(crate::audit::Outcome::Success, ToolTier::Warm, false);
684 return Response::ToolResultResponse {
685 tool_id: tool_id.clone(),
686 result: serde_json::json!({
687 "dry_run": true,
688 "tool_id": tool_id,
689 "args_preview": args,
690 }),
691 success: true,
692 dry_run: true,
693 next_cursor: None,
694 };
695 }
696 let entry = match state.registry.get(&tool_id) {
697 Some(e) => e.clone(),
698 None => {
699 emit(crate::audit::Outcome::ToolNotFound, ToolTier::Warm, false);
700 return Response::Error {
701 message: format!("tool not found: {tool_id}"),
702 code: None,
703 retryable: Some(false),
704 details: None,
705 };
706 }
707 };
708 let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
709 // SP-12 Task 2: capability enforcement.
710 let required = entry.definition().required_capabilities.clone();
711 let missing: Vec<String> = required
712 .iter()
713 .filter(|c| !caps.contains(c))
714 .cloned()
715 .collect();
716 if !missing.is_empty() {
717 let mut required_sorted = required.clone();
718 required_sorted.sort();
719 let mut missing_sorted = missing.clone();
720 missing_sorted.sort();
721 emit(
722 crate::audit::Outcome::CapabilityDenied {
723 missing: missing_sorted.clone(),
724 },
725 tier,
726 false,
727 );
728 let mut message = format!("capability denied for {tool_id}: missing {missing_sorted:?}");
729 let mut details = Some(serde_json::json!({
730 "required": required_sorted,
731 "granted": caps.granted(),
732 "missing": missing_sorted,
733 }));
734 run_error_middleware(
735 state,
736 &tool_id,
737 entry.definition(),
738 &mut message,
739 &mut details,
740 );
741 return Response::Error {
742 message,
743 code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
744 retryable: Some(false),
745 details,
746 };
747 }
748 // SP-operability-v1 C2: rate-limit enforcement.
749 let _permit = match entry.semaphore.clone().try_acquire_owned() {
750 Ok(p) => p,
751 Err(_) => {
752 let max_conc = entry.tool.definition().resources.max_concurrent;
753 emit(
754 crate::audit::Outcome::RateLimited {
755 retry_after_ms: None,
756 },
757 tier,
758 false,
759 );
760 let mut message =
761 format!("rate limited for {tool_id}: max_concurrent={max_conc} in-flight");
762 let mut details = Some(serde_json::json!({
763 "tool_id": tool_id,
764 "limit": max_conc,
765 }));
766 run_error_middleware(
767 state,
768 &tool_id,
769 entry.definition(),
770 &mut message,
771 &mut details,
772 );
773 return Response::Error {
774 message,
775 code: Some(atd_protocol::ERR_RATE_LIMITED),
776 retryable: Some(true),
777 details,
778 };
779 }
780 };
781
782 // SP-token-broker-phase1: resolve secrets via the configured TokenBroker.
783 let secrets = match state.config.token_broker.as_ref() {
784 None => None,
785 Some(broker) => match broker.resolve(caller_id).await {
786 Ok(bundle) => bundle,
787 Err(e) => {
788 emit(
789 crate::audit::Outcome::ExecutionFailed {
790 code: "broker_error".into(),
791 retryable: true,
792 },
793 tier,
794 false,
795 );
796 let mut message = format!("token broker error for {tool_id}: {e}");
797 let mut details = None;
798 run_error_middleware(
799 state,
800 &tool_id,
801 entry.definition(),
802 &mut message,
803 &mut details,
804 );
805 return Response::Error {
806 message,
807 code: Some(atd_protocol::ERR_BROKER_FAILED),
808 retryable: Some(true),
809 details,
810 };
811 }
812 },
813 };
814 let secrets_resolved = secrets.is_some();
815 let tier_timeout = state.tier_policy.timeout(tier);
816 let tier_max_output = state.tier_policy.max_output(tier);
817
818 // SP-pagination-v1 §4.4 — paginated tools bypass the Binding layer so
819 // they can emit cursors on the first page. Non-paginated tools keep
820 // going through binding so CLI / future MCP / REST bindings stay
821 // intact. The check is one bool dispatch per call — negligible.
822 let ctx = if entry.tool.supports_pagination() {
823 CallContext::new(
824 state.config.cwd.clone(),
825 tier_max_output,
826 audit_call_id,
827 Some(Instant::now() + tier_timeout),
828 Some(tracker.clone()),
829 caps.clone(),
830 tier,
831 caller_id.map(|s| s.to_string()),
832 secrets,
833 )
834 .with_cursor_issuer(state.cursor_issuer.clone())
835 } else {
836 CallContext::new(
837 state.config.cwd.clone(),
838 tier_max_output,
839 audit_call_id,
840 Some(Instant::now() + tier_timeout),
841 Some(tracker.clone()),
842 caps.clone(),
843 tier,
844 caller_id.map(|s| s.to_string()),
845 secrets,
846 )
847 };
848
849 // Two paths: paginated tools call `Tool::call_paginated` directly with
850 // cursor=None (first page); non-paginated tools go through the Binding
851 // (preserves CLI / native dispatch semantics).
852 let call_result: Result<(serde_json::Value, Option<String>), ToolCallError> =
853 if entry.tool.supports_pagination() {
854 entry
855 .tool
856 .call_paginated(args, &ctx, None)
857 .await
858 .map(|p| (p.value, p.next_cursor))
859 } else {
860 entry
861 .binding
862 .call(entry.definition(), args, &ctx)
863 .await
864 .map(|v| (v, None))
865 };
866 match call_result {
867 Ok((mut data, next_cursor)) => {
868 run_result_middleware(state, &tool_id, entry.definition(), &mut data);
869 emit(crate::audit::Outcome::Success, tier, secrets_resolved);
870 Response::ToolResultResponse {
871 tool_id,
872 result: data,
873 success: true,
874 dry_run: false,
875 next_cursor,
876 }
877 }
878 Err(ToolCallError::InvalidArgs(msg)) => {
879 emit(
880 crate::audit::Outcome::InvalidArgs {
881 message: msg.clone(),
882 },
883 tier,
884 secrets_resolved,
885 );
886 let mut message = format!("invalid args for {tool_id}: {msg}");
887 let mut details = None;
888 run_error_middleware(
889 state,
890 &tool_id,
891 entry.definition(),
892 &mut message,
893 &mut details,
894 );
895 Response::Error {
896 message,
897 code: None,
898 retryable: Some(false),
899 details,
900 }
901 }
902 Err(ToolCallError::ExecutionFailed {
903 code,
904 message,
905 retryable,
906 }) => {
907 emit(
908 crate::audit::Outcome::ExecutionFailed {
909 code: code.clone(),
910 retryable,
911 },
912 tier,
913 secrets_resolved,
914 );
915 // The failure result is a Value reaching the LLM; redact via the
916 // same on_result pass as a success result.
917 let mut result = serde_json::json!({
918 "code": code,
919 "message": message,
920 "retryable": retryable,
921 });
922 run_result_middleware(state, &tool_id, entry.definition(), &mut result);
923 Response::ToolResultResponse {
924 tool_id,
925 result,
926 success: false,
927 dry_run: false,
928 next_cursor: None,
929 }
930 }
931 Err(ToolCallError::InternalError(msg)) => {
932 emit(
933 crate::audit::Outcome::ExecutionFailed {
934 code: "INTERNAL".into(),
935 retryable: false,
936 },
937 tier,
938 secrets_resolved,
939 );
940 let mut message = format!("internal error in {tool_id}: {msg}");
941 let mut details = None;
942 run_error_middleware(
943 state,
944 &tool_id,
945 entry.definition(),
946 &mut message,
947 &mut details,
948 );
949 Response::Error {
950 message,
951 code: None,
952 retryable: Some(false),
953 details,
954 }
955 }
956 Err(other) => {
957 emit(
958 crate::audit::Outcome::ExecutionFailed {
959 code: "UNHANDLED".into(),
960 retryable: false,
961 },
962 tier,
963 secrets_resolved,
964 );
965 let mut message = format!("unhandled tool error in {tool_id}: {other}");
966 let mut details = None;
967 run_error_middleware(
968 state,
969 &tool_id,
970 entry.definition(),
971 &mut message,
972 &mut details,
973 );
974 Response::Error {
975 message,
976 code: Some(1999),
977 retryable: Some(false),
978 details,
979 }
980 }
981 }
982}