Skip to main content

atd_runtime/
dispatch.rs

1//! Transport-neutral dispatch — the shared core of ATD-speaking servers.
2//!
3//! SP-streamable-http §4.3 + §6.3: the body of `atd-server::connection.rs`'s
4//! per-request switch moves here so the Unix-socket listener and the
5//! Streamable-HTTP listener (`atd-server-http`) drive the **same** dispatch
6//! state machine over the **same** `Arc<Registry>`. The wire shape returned
7//! is byte-identical regardless of which transport delivered the request —
8//! this property is exercised by `atd-server-http`'s UDS↔HTTP parity test.
9//!
10//! Two entry points:
11//!
12//! 1. [`dispatch_request`] — handles the full `Request` enum (Ping / Hello /
13//!    ToolList / ToolSchema / RunTool). UDS uses this from
14//!    `handle_connection`'s read loop; capability set + caller identity are
15//!    mutated in place because `Hello` rewrites both for the lifetime of the
16//!    connection.
17//!
18//! 2. [`run_tool`] — the `RunTool` arm extracted for HTTP. HTTP requests
19//!    derive their `CapabilitySet` and `caller_id` fresh from a Bearer-token
20//!    lookup per request (SP-streamable-http §4.3), so there is no
21//!    connection state to carry forward — the listener calls `run_tool`
22//!    directly with the freshly-derived inputs.
23//!
24//! Both routes share the same audit emission, capability gate, semaphore
25//! permit, token-broker resolution, binding-dispatch, middleware, and error
26//! mapping. Any future SP that adds a third transport (vsock, quic, …)
27//! should reach for these two entry points rather than re-implementing the
28//! switch.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::time::Instant;
33
34use atd_protocol::{Request, Response};
35
36use crate::audit::AuditSink;
37use crate::capability::CapabilitySet;
38use crate::context::CallContext;
39use crate::error::ToolCallError;
40use crate::middleware::Middleware;
41use crate::registry::Registry;
42use crate::secrets::TokenBroker;
43use crate::tier::{TierPolicy, ToolTier};
44use crate::tracker::ReadTracker;
45
46/// Transport-neutral configuration shared across listeners.
47///
48/// Carries the fields every listener needs to dispatch a `RunTool`: cwd for
49/// relative-path tools, the output / timeout budgets, the operator
50/// capability allow-list, plus the pluggable audit sink, token broker, and
51/// server identity. The Unix-socket-specific `socket_path` and the HTTP-
52/// specific `listen` / `extra_origins` / `require_bearer` live in
53/// `atd-server::ServerConfig` and `atd-server-http::HttpServerConfig`
54/// respectively (SP-streamable-http §6.3 — the configs share fields by
55/// composition rather than by trait, to keep struct-literal construction
56/// across crate boundaries ergonomic).
57pub struct SharedServerConfig {
58    pub cwd: PathBuf,
59    pub max_output_bytes: usize,
60    pub default_call_timeout_ms: u64,
61    /// Server-operator capability allow-list. The set the `Hello` handshake
62    /// intersects with on UDS, and the set the HTTP listener intersects
63    /// `BearerIdentity::granted_capabilities` against per request.
64    pub granted_capabilities: Vec<String>,
65    /// Optional audit sink for per-call observability. SP-operability-v1 C1.
66    pub audit_sink: Option<Arc<dyn AuditSink>>,
67    /// Identity string echoed in the `Hello` ack (and in the MCP
68    /// `initialize` response on HTTP). Concretely the deployed server's
69    /// name + version, e.g. `"atd-ref-server 0.3.0"`.
70    pub server_version: String,
71    /// Optional `TokenBroker` for multi-tenant secret routing.
72    /// SP-token-broker-phase1.
73    pub token_broker: Option<Arc<dyn TokenBroker>>,
74    /// Maximum UCAN-lite chain depth accepted by the verifier. Default
75    /// `5` per SP-capability-v2 spec §4.6 — prevents stack-exhaustion
76    /// attacks via pathologically deep proof chains. Override via the
77    /// listener crate's CLI flag if a specific deployment justifies it.
78    pub max_ucan_chain_depth: u8,
79    /// Optional revocation store for UCAN-lite tokens (SP-capability-v2
80    /// §4.7). When `None`, no revocation check is performed; the
81    /// connection-scoped allow-list is the only authority bound.
82    /// Adopters wrap their existing revocation table (e.g. celia's
83    /// `consent.status='revoked'`) behind this trait.
84    pub ucan_revocation_store: Option<Arc<dyn crate::ucan::UcanRevocationStore>>,
85    /// Per-frame deadline applied to reads/writes on a connection that has
86    /// already completed the `Hello` handshake. Long enough to cover a
87    /// reasonable tool call's slowest reply (e.g. `host:media.convert` at
88    /// 25s). Default `30_000` ms. SP-concurrency-baseline §5.2.
89    pub frame_deadline_active_ms: u64,
90    /// Per-frame deadline applied to the pre-Hello handshake window. Short
91    /// enough to fail fast under a single-threaded server starvation
92    /// (the §1.2 root cause of the 2026-05-12 celia incident) so the SDK
93    /// retry path can reissue against a less contended worker. Default
94    /// `5_000` ms. SP-concurrency-baseline §5.2.
95    pub frame_deadline_handshake_ms: u64,
96    /// HMAC signing key for paginated-result cursors. SP-pagination-v1 §4.5.
97    /// Production: random per server startup (so cursor forgery requires
98    /// process-state compromise). Multi-instance load-balanced deployments
99    /// share a key via env (`ATD_CURSOR_SIGNING_KEY=base64...`); the
100    /// listener crates apply this on `Server::new`. Test fixtures use a
101    /// fixed zero key — safe because they don't span processes.
102    pub cursor_signing_key: [u8; 32],
103    /// Time-to-live for paginated-result cursors, in seconds. Cursors older
104    /// than this fail verification with `ERR_CURSOR_EXPIRED` (1020).
105    /// Default `300`s (5 minutes) — long enough for one human "think"
106    /// round-trip without indefinite server-side state retention.
107    pub cursor_ttl_seconds: u64,
108}
109
110impl SharedServerConfig {
111    /// Test/CLI helper — minimal default that compiles wherever a
112    /// `SharedServerConfig` is required, but with empty allow-list (fail-
113    /// closed for capability-gated tools) and no audit sink.
114    pub fn for_test() -> Self {
115        Self {
116            cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
117            max_output_bytes: 1_048_576,
118            default_call_timeout_ms: 60_000,
119            granted_capabilities: vec![],
120            audit_sink: None,
121            server_version: "atd-runtime-test 0.0.0".into(),
122            token_broker: None,
123            max_ucan_chain_depth: 5,
124            ucan_revocation_store: None,
125            frame_deadline_active_ms: 30_000,
126            frame_deadline_handshake_ms: 5_000,
127            cursor_signing_key: [0u8; 32],
128            cursor_ttl_seconds: 300,
129        }
130    }
131}
132
133/// Shared dispatch state. Listener crates own one `Arc<ServerState>` and
134/// hand the clone to per-connection / per-request dispatch.
135///
136/// SP-streamable-http §4.3: every listener holds the **same**
137/// `Arc<Registry>`, exactly one `TierPolicy`, exactly one middleware chain,
138/// and exactly one `SharedServerConfig` snapshot. The HTTP listener does
139/// not duplicate any of this — it composes the same struct.
140pub struct ServerState {
141    pub registry: Registry,
142    pub config: SharedServerConfig,
143    pub tier_policy: TierPolicy,
144    pub middleware: Vec<Arc<dyn Middleware>>,
145    /// SP-concurrency-baseline §5.7 — hot-path counters. Updated by the
146    /// dispatch error arms below, by the per-transport accept loops
147    /// (atd-server / atd-server-http), and by Phase F follow-up wiring
148    /// in audit. Always present; counters default to zero.
149    pub metrics: Arc<crate::metrics::MetricsCounters>,
150    /// SP-pagination-v1 — process-wide HMAC issuer for paginated-result
151    /// cursors. Built once at server startup from
152    /// `SharedServerConfig.cursor_signing_key` and a fresh-random
153    /// `session_nonce`. Server restart → new nonce → outstanding cursors
154    /// expire (`ERR_CURSOR_EXPIRED`). Shared `Arc` so dispatch (verify-
155    /// continuation) and tools (issue-next-cursor via `CallContext`) see
156    /// the same nonce.
157    pub cursor_issuer: Arc<crate::cursor::CursorIssuer>,
158}
159
160/// Run the full `Request` state machine and produce a `Response`.
161///
162/// Intended for stream-oriented transports (UDS) where the listener keeps
163/// per-connection capability + caller-id state across many requests on the
164/// same socket. `caps` and `caller_id` are mutated in place when `Request`
165/// is a `Hello` — UDS `connection.rs::handle_connection` relies on this.
166///
167/// HTTP listeners should not use `dispatch_request` for `tools/call`
168/// translation; they should call [`run_tool`] directly after building a
169/// fresh per-request `CapabilitySet` from the bearer-resolved
170/// `BearerIdentity`. The `Hello` / `Ping` / `ToolList` / `ToolSchema`
171/// branches are still useful in introspection-only contexts and remain
172/// available via the same fn.
173pub async fn dispatch_request(
174    state: &Arc<ServerState>,
175    tracker: &Arc<ReadTracker>,
176    caps: &mut Arc<CapabilitySet>,
177    caller_id: &mut Option<String>,
178    req: Request,
179) -> Response {
180    state
181        .metrics
182        .dispatched_requests
183        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184    let resp = dispatch_request_inner(state, tracker, caps, caller_id, req).await;
185    if let Response::Error { code: Some(c), .. } = &resp {
186        state.metrics.record_error(*c);
187    }
188    resp
189}
190
191async fn dispatch_request_inner(
192    state: &Arc<ServerState>,
193    tracker: &Arc<ReadTracker>,
194    caps: &mut Arc<CapabilitySet>,
195    caller_id: &mut Option<String>,
196    req: Request,
197) -> Response {
198    match req {
199        Request::Ping => Response::Pong,
200        Request::Hello {
201            client_id,
202            requested_capabilities,
203            ucan_tokens,
204        } => {
205            *caller_id = client_id.clone();
206            let allow = CapabilitySet::from_iter(state.config.granted_capabilities.iter().cloned());
207            let (granted_strings_vec, _denied) = allow.intersect(&requested_capabilities);
208            // SP-observability-completeness-v1 Axis C: attribute each
209            // string-allow-list grant to its source for the audit sink.
210            let string_provenance = granted_strings_vec
211                .iter()
212                .map(|c| crate::audit::CapProvenance {
213                    cap: c.clone(),
214                    source: crate::audit::ProvSource::StringAllowList,
215                })
216                .collect();
217            let granted_strings =
218                CapabilitySet::with_provenance(granted_strings_vec, string_provenance);
219
220            // SP-capability-v2 Phase C: verify any presented UCAN-lite
221            // tokens and union the resulting caps with the SP-12 string
222            // allow-list result. Pre-SP-capability-v2 clients pass an
223            // empty `ucan_tokens` vec (per `#[serde(default)]`); their
224            // path is byte-identical to SP-12.
225            let granted_ucan = if ucan_tokens.is_empty() {
226                CapabilitySet::empty()
227            } else {
228                // Audience-pin requires a client_id — if the client sends
229                // ucan_tokens but no client_id, we cannot bind the audience
230                // and must reject. Spec §4.6 / §5.4 (1013 audience-mismatch
231                // family).
232                let expected_aud = match caller_id.as_ref() {
233                    Some(s) if !s.is_empty() => s.clone(),
234                    _ => {
235                        return Response::Error {
236                            message: "UCAN tokens require Hello.client_id (audience pin)"
237                                .to_string(),
238                            code: Some(atd_protocol::ERR_AUDIENCE_MISMATCH),
239                            retryable: Some(false),
240                            details: None,
241                        };
242                    }
243                };
244                let mut cfg = crate::ucan::VerifyConfig::new(expected_aud);
245                cfg.max_chain_depth = state.config.max_ucan_chain_depth;
246                cfg.revocation_store = state.config.ucan_revocation_store.clone();
247                match crate::ucan::verify_tokens(&ucan_tokens, &cfg, std::time::SystemTime::now()) {
248                    Ok(c) => c,
249                    Err(e) => {
250                        let code = crate::ucan::wire_code(&e);
251                        return Response::Error {
252                            message: e.to_string(),
253                            code: Some(code),
254                            retryable: Some(false),
255                            details: None,
256                        };
257                    }
258                }
259            };
260
261            let granted_caps = granted_strings.union(&granted_ucan);
262            let granted_vec = granted_caps.granted();
263            *caps = Arc::new(granted_caps);
264            Response::HelloAck {
265                granted_capabilities: granted_vec,
266                server_version: state.config.server_version.clone(),
267                supported_tiers: vec!["hot".into(), "warm".into(), "cold".into()],
268            }
269        }
270        Request::ToolList => {
271            let summaries: Vec<_> = state
272                .registry
273                .summaries()
274                .into_iter()
275                .filter(|s| !matches!(s.visibility, atd_protocol::ToolVisibility::Hidden))
276                .collect();
277            Response::ToolListResponse {
278                tools: serde_json::to_value(&summaries).unwrap_or_else(|_| serde_json::json!([])),
279            }
280        }
281        Request::ToolSchema { tool_id } => match state.registry.get(&tool_id) {
282            Some(entry) => Response::ToolSchemaResponse {
283                schema: serde_json::to_value(entry.definition())
284                    .unwrap_or_else(|_| serde_json::json!({})),
285            },
286            None => Response::Error {
287                message: format!("tool not found: {tool_id}"),
288                code: None,
289                retryable: Some(false),
290                details: None,
291            },
292        },
293        Request::RunTool {
294            tool_id,
295            args,
296            dry_run,
297        } => {
298            run_tool(
299                state,
300                tracker,
301                caps,
302                caller_id.as_deref(),
303                tool_id,
304                args,
305                dry_run,
306            )
307            .await
308        }
309        Request::RunToolContinue { tool_id, cursor } => {
310            run_tool_continue(state, tracker, caps, caller_id.as_deref(), tool_id, cursor).await
311        }
312    }
313}
314
315/// SP-pagination-v1 §4.4 — handle a `Request::RunToolContinue`.
316///
317/// Steps:
318/// 1. Verify the cursor (HMAC + TTL + session nonce) → `CursorPayload`.
319/// 2. Reject if the cursor's `tool_id` ≠ request's `tool_id` (anti-replay).
320/// 3. Look up the tool; reject if not found, or if it doesn't override
321///    `supports_pagination()` (a cursor against a non-paginating tool is
322///    bug-or-attack territory; 1021).
323/// 4. Re-check `required_capabilities` against the current connection's
324///    capability set — caps may have changed since the cursor was issued
325///    (UCAN revocation, Hello re-negotiation).
326/// 5. Acquire the per-tool semaphore (same rate-limit envelope as `run_tool`).
327/// 6. Build a `CallContext` with the cursor issuer attached.
328/// 7. Call `tool.call_paginated(Value::Null, &ctx, Some(&cursor_str))`.
329///    Args are intentionally `Null` on continuation — original args were
330///    fingerprinted into the cursor and the tool is expected to read its
331///    state from `cursor.opaque_state`.
332/// 8. Emit one audit event tagged with `cursor_page` = payload's page_index.
333/// 9. Return `ToolResultResponse` with the new `next_cursor` from the tool.
334#[allow(clippy::too_many_arguments)]
335pub async fn run_tool_continue(
336    state: &Arc<ServerState>,
337    tracker: &Arc<ReadTracker>,
338    caps: &Arc<CapabilitySet>,
339    caller_id: Option<&str>,
340    tool_id: String,
341    cursor: String,
342) -> Response {
343    use std::sync::atomic::Ordering;
344
345    let start = Instant::now();
346    let audit_call_id = ulid::Ulid::new();
347
348    // Build an issuer from the configured signing key. The session_nonce
349    // is fresh per construction — but cursors carry the issuer's nonce
350    // from issue-time, and the issuer constructed at server startup is
351    // the one whose nonce went into the cursor. Here we need to verify
352    // against THAT issuer. SharedServerConfig holds the key but not the
353    // issuer; for v1 the listener crates construct the issuer at startup
354    // and inject it via CallContext. Until that path lands we reconstruct
355    // here using the stored key — verification of cursors issued in this
356    // same process still succeeds because the dispatch path that issued
357    // the cursor used the same key + a matching session_nonce embedded.
358    //
359    // TODO: thread Arc<CursorIssuer> through ServerState so the nonce
360    // truly survives across paginated calls in the same process. For
361    // now we accept that the first call_paginated within this fn will
362    // be reading a cursor whose session_nonce came from a sister issuer
363    // — server_session check will hold across the same process because
364    // both issuers got their nonce from the same OS RNG sequence... no
365    // wait, they didn't. Each `CursorIssuer::new` randomizes the nonce.
366    //
367    // Use the process-wide issuer from ServerState so the session_nonce
368    // matches across issue (first-page in run_tool) and verify (continuation
369    // here). One issuer per server process by design.
370    let payload = match state
371        .cursor_issuer
372        .verify(&cursor, state.config.cursor_ttl_seconds)
373    {
374        Ok(p) => p,
375        Err(crate::cursor::CursorError::Expired) => {
376            return Response::Error {
377                message: "cursor expired; re-issue the original RunTool".into(),
378                code: Some(atd_protocol::ERR_CURSOR_EXPIRED),
379                retryable: Some(false),
380                details: None,
381            };
382        }
383        Err(_) => {
384            return Response::Error {
385                message: "cursor invalid".into(),
386                code: Some(atd_protocol::ERR_CURSOR_INVALID),
387                retryable: Some(false),
388                details: None,
389            };
390        }
391    };
392
393    if payload.tool_id != tool_id {
394        return Response::Error {
395            message: format!(
396                "cursor tool_id mismatch: cursor={} request={tool_id}",
397                payload.tool_id
398            ),
399            code: Some(atd_protocol::ERR_CURSOR_INVALID),
400            retryable: Some(false),
401            details: None,
402        };
403    }
404
405    let entry = match state.registry.get(&tool_id) {
406        Some(e) => e.clone(),
407        None => {
408            return Response::Error {
409                message: format!("tool not found: {tool_id}"),
410                code: None,
411                retryable: Some(false),
412                details: None,
413            };
414        }
415    };
416
417    if !entry.tool.supports_pagination() {
418        return Response::Error {
419            message: format!("tool {tool_id} does not support pagination but received a cursor"),
420            code: Some(atd_protocol::ERR_CURSOR_INVALID),
421            retryable: Some(false),
422            details: None,
423        };
424    }
425
426    let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
427
428    // Re-check capability gating — caps can change mid-connection (UCAN revocation).
429    let required = entry.definition().required_capabilities.clone();
430    let missing: Vec<String> = required
431        .iter()
432        .filter(|c| !caps.contains(c))
433        .cloned()
434        .collect();
435    if !missing.is_empty() {
436        return Response::Error {
437            message: format!("capability denied for {tool_id}: missing {missing:?}"),
438            code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
439            retryable: Some(false),
440            details: None,
441        };
442    }
443
444    let _permit = match entry.semaphore.clone().try_acquire_owned() {
445        Ok(p) => p,
446        Err(_) => {
447            return Response::Error {
448                message: format!("rate limited for {tool_id} (continuation)"),
449                code: Some(atd_protocol::ERR_RATE_LIMITED),
450                retryable: Some(true),
451                details: None,
452            };
453        }
454    };
455
456    let tier_timeout = state.tier_policy.timeout(tier);
457    let tier_max_output = state.tier_policy.max_output(tier);
458    let ctx = CallContext::new(
459        state.config.cwd.clone(),
460        tier_max_output,
461        audit_call_id,
462        Some(Instant::now() + tier_timeout),
463        Some(tracker.clone()),
464        caps.clone(),
465        tier,
466        caller_id.map(|s| s.to_string()),
467        None, // secrets — broker resolution skipped on continuation; the
468              // tool reads continuation state from cursor.opaque_state
469    )
470    .with_cursor_issuer(state.cursor_issuer.clone());
471
472    let page_index = payload.page_index;
473    let result = entry
474        .tool
475        .call_paginated(serde_json::Value::Null, &ctx, Some(&cursor))
476        .await;
477
478    let response = match result {
479        Ok(crate::registry::PaginatedResult {
480            mut value,
481            next_cursor,
482        }) => {
483            // SP-pagination-v1 §G5 — apply the middleware chain per page so
484            // FHIR validation / PHI redaction / other egress rewrites cover
485            // continuation pages, not just first-page (RunTool) results. The
486            // celia adopter binds atd-middleware-fhir + atd-middleware-pii-
487            // redact-medical here; missing this hook would leak unredacted
488            // PHI on every continue — a real compliance defect.
489            run_result_middleware(state, &tool_id, entry.definition(), &mut value);
490            Response::ToolResultResponse {
491                tool_id: tool_id.clone(),
492                result: value,
493                success: true,
494                dry_run: false,
495                next_cursor,
496            }
497        }
498        Err(crate::error::ToolCallError::ExecutionFailed {
499            code,
500            message,
501            retryable,
502        }) => {
503            // SP-observability-completeness-v1 Axis A — continuation failure
504            // result is a Value reaching the LLM; redact via on_result.
505            let mut value = serde_json::json!({
506                "code": code,
507                "message": message,
508                "retryable": retryable,
509            });
510            run_result_middleware(state, &tool_id, entry.definition(), &mut value);
511            Response::ToolResultResponse {
512                tool_id: tool_id.clone(),
513                result: value,
514                success: false,
515                dry_run: false,
516                next_cursor: None,
517            }
518        }
519        Err(e) => {
520            // Axis A — `{e:?}` can embed PHI via a Debug impl; redact.
521            let mut message = format!("tool {tool_id} continuation failed: {e:?}");
522            let mut details = None;
523            run_error_middleware(
524                state,
525                &tool_id,
526                entry.definition(),
527                &mut message,
528                &mut details,
529            );
530            Response::Error {
531                message,
532                code: None,
533                retryable: Some(false),
534                details,
535            }
536        }
537    };
538
539    // Audit emission with cursor_page tag (the v2 schema field).
540    if let Some(sink) = state.config.audit_sink.as_ref() {
541        state
542            .metrics
543            .audit_events_total
544            .fetch_add(1, Ordering::Relaxed);
545        let outcome = match &response {
546            Response::ToolResultResponse { success: true, .. } => crate::audit::Outcome::Success,
547            _ => crate::audit::Outcome::ExecutionFailed {
548                code: "continuation_failed".into(),
549                retryable: false,
550            },
551        };
552        sink.on_call(&crate::audit::CallEvent {
553            ts: crate::audit::now_rfc3339(),
554            call_id: audit_call_id.to_string(),
555            tool_id: tool_id.clone(),
556            caller_id: caller_id.map(|s| s.to_string()),
557            granted_capabilities: caps.granted(),
558            duration_ms: start.elapsed().as_millis() as u64,
559            outcome,
560            tier: crate::tier::tier_as_str(tier).to_string(),
561            dry_run: false,
562            schema_version: crate::audit::SCHEMA_VERSION,
563            secrets_resolved: false,
564            cursor_page: Some(page_index),
565            capability_provenance: prov_for(caps),
566        });
567    }
568
569    response
570}
571
572/// Execute one `RunTool` request against the shared dispatch state.
573///
574/// Single-entry-point for both transports: UDS goes through
575/// [`dispatch_request`] which forwards `Request::RunTool` here; HTTP
576/// (`atd-server-http::mcp::tools_call`) calls this fn directly. The
577/// emitted `Response` is the same enum + same JSON encoding on either
578/// route — `Response::ToolResultResponse` on success / execution-failed,
579/// `Response::Error` on capability-denied / rate-limited / broker error /
580/// tool-not-found / invalid args / internal error.
581///
582/// The body is the verbatim move of the SP-streamable-http §6.3
583/// `atd-server::connection.rs::dispatch` `RunTool` arm — every audit
584/// emission, capability check, semaphore acquire, broker call, binding
585/// dispatch, middleware step, and error map is preserved. The existing
586/// `atd-server::connection::tests` suite covers all branches and is
587/// re-routed through this fn unchanged after refactor.
588/// SP-observability-completeness-v1 Axis C — lift a connection's capability
589/// provenance into the optional `CallEvent.capability_provenance` field.
590/// `None` when nothing was recorded, so the field stays omitted on the wire
591/// for the common (no-provenance) path.
592fn prov_for(caps: &CapabilitySet) -> Option<Vec<crate::audit::CapProvenance>> {
593    let p = caps.provenance();
594    if p.is_empty() { None } else { Some(p.to_vec()) }
595}
596
597/// SP-observability-completeness-v1 Axis A — run the egress `on_error`
598/// middleware chain over a tool-scoped failure reply (`Response::Error`).
599/// Called at EVERY tool-scoped error exit (invalid-args / internal /
600/// unhandled / capability-denied / rate-limited / broker-failed) so PHI in
601/// failure text or details can't bypass redaction.
602///
603/// **Out of scope by design:** `ToolNotFound` and Hello-handshake errors
604/// (UCAN verify, missing client_id). They carry framework *control* text — a
605/// client-supplied `tool_id`, a UCAN verify reason — with no tool/data body,
606/// and have no `ToolDefinition` to drive per-tool middleware. PHI lives in
607/// tool results/args, not in "tool not found"-class control strings.
608fn run_error_middleware(
609    state: &ServerState,
610    tool_id: &str,
611    def: &atd_protocol::ToolDefinition,
612    message: &mut String,
613    details: &mut Option<serde_json::Value>,
614) {
615    for mw in &state.middleware {
616        mw.on_error(tool_id, def, message, details);
617    }
618}
619
620/// SP-observability-completeness-v1 Axis A — run the egress `on_result`
621/// middleware chain over a result Value: the success result, and the
622/// `ExecutionFailed` failure envelope (a result-shaped Value that reaches the
623/// LLM). One chokepoint so success and failure results redact identically.
624fn run_result_middleware(
625    state: &ServerState,
626    tool_id: &str,
627    def: &atd_protocol::ToolDefinition,
628    value: &mut serde_json::Value,
629) {
630    for mw in &state.middleware {
631        mw.on_result(tool_id, def, value);
632    }
633}
634
635#[allow(clippy::too_many_arguments)]
636pub async fn run_tool(
637    state: &Arc<ServerState>,
638    tracker: &Arc<ReadTracker>,
639    caps: &Arc<CapabilitySet>,
640    caller_id: Option<&str>,
641    tool_id: String,
642    args: serde_json::Value,
643    dry_run: bool,
644) -> Response {
645    // SP-operability-v1 C1: per-call audit scaffolding. `start` measures
646    // wall-clock duration from dispatch entry; `audit_call_id` is the
647    // stable id put on `CallEvent` regardless of which return branch
648    // fires. Emission is a no-op when `audit_sink` is None.
649    let start = Instant::now();
650    let audit_call_id = ulid::Ulid::new();
651    // SP-pagination-v1 — cursor_page is captured-mut so paginated paths can
652    // tag the audit event with the page index (1 for first, 2+ for continues).
653    // Non-paginated dispatches leave it None and the field is omitted on
654    // the wire via #[serde(skip_serializing_if = "Option::is_none")].
655    let cursor_page: Option<u32> = None;
656    let emit = |outcome: crate::audit::Outcome, tier: ToolTier, secrets_resolved: bool| {
657        if let Some(sink) = state.config.audit_sink.as_ref() {
658            state
659                .metrics
660                .audit_events_total
661                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
662            sink.on_call(&crate::audit::CallEvent {
663                ts: crate::audit::now_rfc3339(),
664                call_id: audit_call_id.to_string(),
665                tool_id: tool_id.clone(),
666                caller_id: caller_id.map(|s| s.to_string()),
667                granted_capabilities: caps.granted(),
668                duration_ms: start.elapsed().as_millis() as u64,
669                outcome,
670                tier: crate::tier::tier_as_str(tier).to_string(),
671                dry_run,
672                schema_version: crate::audit::SCHEMA_VERSION,
673                secrets_resolved,
674                cursor_page,
675                capability_provenance: prov_for(caps),
676            });
677        }
678    };
679
680    if dry_run {
681        // Dry-run short-circuits BEFORE tier derivation — use Warm as the
682        // placeholder tier for the audit event.
683        emit(crate::audit::Outcome::Success, ToolTier::Warm, false);
684        return Response::ToolResultResponse {
685            tool_id: tool_id.clone(),
686            result: serde_json::json!({
687                "dry_run": true,
688                "tool_id": tool_id,
689                "args_preview": args,
690            }),
691            success: true,
692            dry_run: true,
693            next_cursor: None,
694        };
695    }
696    let entry = match state.registry.get(&tool_id) {
697        Some(e) => e.clone(),
698        None => {
699            emit(crate::audit::Outcome::ToolNotFound, ToolTier::Warm, false);
700            return Response::Error {
701                message: format!("tool not found: {tool_id}"),
702                code: None,
703                retryable: Some(false),
704                details: None,
705            };
706        }
707    };
708    let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
709    // SP-12 Task 2: capability enforcement.
710    let required = entry.definition().required_capabilities.clone();
711    let missing: Vec<String> = required
712        .iter()
713        .filter(|c| !caps.contains(c))
714        .cloned()
715        .collect();
716    if !missing.is_empty() {
717        let mut required_sorted = required.clone();
718        required_sorted.sort();
719        let mut missing_sorted = missing.clone();
720        missing_sorted.sort();
721        emit(
722            crate::audit::Outcome::CapabilityDenied {
723                missing: missing_sorted.clone(),
724            },
725            tier,
726            false,
727        );
728        let mut message = format!("capability denied for {tool_id}: missing {missing_sorted:?}");
729        let mut details = Some(serde_json::json!({
730            "required": required_sorted,
731            "granted": caps.granted(),
732            "missing": missing_sorted,
733        }));
734        run_error_middleware(
735            state,
736            &tool_id,
737            entry.definition(),
738            &mut message,
739            &mut details,
740        );
741        return Response::Error {
742            message,
743            code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
744            retryable: Some(false),
745            details,
746        };
747    }
748    // SP-operability-v1 C2: rate-limit enforcement.
749    let _permit = match entry.semaphore.clone().try_acquire_owned() {
750        Ok(p) => p,
751        Err(_) => {
752            let max_conc = entry.tool.definition().resources.max_concurrent;
753            emit(
754                crate::audit::Outcome::RateLimited {
755                    retry_after_ms: None,
756                },
757                tier,
758                false,
759            );
760            let mut message =
761                format!("rate limited for {tool_id}: max_concurrent={max_conc} in-flight");
762            let mut details = Some(serde_json::json!({
763                "tool_id": tool_id,
764                "limit": max_conc,
765            }));
766            run_error_middleware(
767                state,
768                &tool_id,
769                entry.definition(),
770                &mut message,
771                &mut details,
772            );
773            return Response::Error {
774                message,
775                code: Some(atd_protocol::ERR_RATE_LIMITED),
776                retryable: Some(true),
777                details,
778            };
779        }
780    };
781
782    // SP-token-broker-phase1: resolve secrets via the configured TokenBroker.
783    let secrets = match state.config.token_broker.as_ref() {
784        None => None,
785        Some(broker) => match broker.resolve(caller_id).await {
786            Ok(bundle) => bundle,
787            Err(e) => {
788                emit(
789                    crate::audit::Outcome::ExecutionFailed {
790                        code: "broker_error".into(),
791                        retryable: true,
792                    },
793                    tier,
794                    false,
795                );
796                let mut message = format!("token broker error for {tool_id}: {e}");
797                let mut details = None;
798                run_error_middleware(
799                    state,
800                    &tool_id,
801                    entry.definition(),
802                    &mut message,
803                    &mut details,
804                );
805                return Response::Error {
806                    message,
807                    code: Some(atd_protocol::ERR_BROKER_FAILED),
808                    retryable: Some(true),
809                    details,
810                };
811            }
812        },
813    };
814    let secrets_resolved = secrets.is_some();
815    let tier_timeout = state.tier_policy.timeout(tier);
816    let tier_max_output = state.tier_policy.max_output(tier);
817
818    // SP-pagination-v1 §4.4 — paginated tools bypass the Binding layer so
819    // they can emit cursors on the first page. Non-paginated tools keep
820    // going through binding so CLI / future MCP / REST bindings stay
821    // intact. The check is one bool dispatch per call — negligible.
822    let ctx = if entry.tool.supports_pagination() {
823        CallContext::new(
824            state.config.cwd.clone(),
825            tier_max_output,
826            audit_call_id,
827            Some(Instant::now() + tier_timeout),
828            Some(tracker.clone()),
829            caps.clone(),
830            tier,
831            caller_id.map(|s| s.to_string()),
832            secrets,
833        )
834        .with_cursor_issuer(state.cursor_issuer.clone())
835    } else {
836        CallContext::new(
837            state.config.cwd.clone(),
838            tier_max_output,
839            audit_call_id,
840            Some(Instant::now() + tier_timeout),
841            Some(tracker.clone()),
842            caps.clone(),
843            tier,
844            caller_id.map(|s| s.to_string()),
845            secrets,
846        )
847    };
848
849    // Two paths: paginated tools call `Tool::call_paginated` directly with
850    // cursor=None (first page); non-paginated tools go through the Binding
851    // (preserves CLI / native dispatch semantics).
852    let call_result: Result<(serde_json::Value, Option<String>), ToolCallError> =
853        if entry.tool.supports_pagination() {
854            entry
855                .tool
856                .call_paginated(args, &ctx, None)
857                .await
858                .map(|p| (p.value, p.next_cursor))
859        } else {
860            entry
861                .binding
862                .call(entry.definition(), args, &ctx)
863                .await
864                .map(|v| (v, None))
865        };
866    match call_result {
867        Ok((mut data, next_cursor)) => {
868            run_result_middleware(state, &tool_id, entry.definition(), &mut data);
869            emit(crate::audit::Outcome::Success, tier, secrets_resolved);
870            Response::ToolResultResponse {
871                tool_id,
872                result: data,
873                success: true,
874                dry_run: false,
875                next_cursor,
876            }
877        }
878        Err(ToolCallError::InvalidArgs(msg)) => {
879            emit(
880                crate::audit::Outcome::InvalidArgs {
881                    message: msg.clone(),
882                },
883                tier,
884                secrets_resolved,
885            );
886            let mut message = format!("invalid args for {tool_id}: {msg}");
887            let mut details = None;
888            run_error_middleware(
889                state,
890                &tool_id,
891                entry.definition(),
892                &mut message,
893                &mut details,
894            );
895            Response::Error {
896                message,
897                code: None,
898                retryable: Some(false),
899                details,
900            }
901        }
902        Err(ToolCallError::ExecutionFailed {
903            code,
904            message,
905            retryable,
906        }) => {
907            emit(
908                crate::audit::Outcome::ExecutionFailed {
909                    code: code.clone(),
910                    retryable,
911                },
912                tier,
913                secrets_resolved,
914            );
915            // The failure result is a Value reaching the LLM; redact via the
916            // same on_result pass as a success result.
917            let mut result = serde_json::json!({
918                "code": code,
919                "message": message,
920                "retryable": retryable,
921            });
922            run_result_middleware(state, &tool_id, entry.definition(), &mut result);
923            Response::ToolResultResponse {
924                tool_id,
925                result,
926                success: false,
927                dry_run: false,
928                next_cursor: None,
929            }
930        }
931        Err(ToolCallError::InternalError(msg)) => {
932            emit(
933                crate::audit::Outcome::ExecutionFailed {
934                    code: "INTERNAL".into(),
935                    retryable: false,
936                },
937                tier,
938                secrets_resolved,
939            );
940            let mut message = format!("internal error in {tool_id}: {msg}");
941            let mut details = None;
942            run_error_middleware(
943                state,
944                &tool_id,
945                entry.definition(),
946                &mut message,
947                &mut details,
948            );
949            Response::Error {
950                message,
951                code: None,
952                retryable: Some(false),
953                details,
954            }
955        }
956        Err(other) => {
957            emit(
958                crate::audit::Outcome::ExecutionFailed {
959                    code: "UNHANDLED".into(),
960                    retryable: false,
961                },
962                tier,
963                secrets_resolved,
964            );
965            let mut message = format!("unhandled tool error in {tool_id}: {other}");
966            let mut details = None;
967            run_error_middleware(
968                state,
969                &tool_id,
970                entry.definition(),
971                &mut message,
972                &mut details,
973            );
974            Response::Error {
975                message,
976                code: Some(1999),
977                retryable: Some(false),
978                details,
979            }
980        }
981    }
982}