Skip to main content

atd_runtime/
dispatch.rs

1//! Transport-neutral dispatch — the shared core of ATD-speaking servers.
2//!
3//! SP-streamable-http §4.3 + §6.3: the body of `atd-server::connection.rs`'s
4//! per-request switch moves here so the Unix-socket listener and the
5//! Streamable-HTTP listener (`atd-server-http`) drive the **same** dispatch
6//! state machine over the **same** `Arc<Registry>`. The wire shape returned
7//! is byte-identical regardless of which transport delivered the request —
8//! this property is exercised by `atd-server-http`'s UDS↔HTTP parity test.
9//!
10//! Two entry points:
11//!
12//! 1. [`dispatch_request`] — handles the full `Request` enum (Ping / Hello /
13//!    ToolList / ToolSchema / RunTool). UDS uses this from
14//!    `handle_connection`'s read loop; capability set + caller identity are
15//!    mutated in place because `Hello` rewrites both for the lifetime of the
16//!    connection.
17//!
18//! 2. [`run_tool`] — the `RunTool` arm extracted for HTTP. HTTP requests
19//!    derive their `CapabilitySet` and `caller_id` fresh from a Bearer-token
20//!    lookup per request (SP-streamable-http §4.3), so there is no
21//!    connection state to carry forward — the listener calls `run_tool`
22//!    directly with the freshly-derived inputs.
23//!
24//! Both routes share the same audit emission, capability gate, semaphore
25//! permit, token-broker resolution, binding-dispatch, middleware, and error
26//! mapping. Any future SP that adds a third transport (vsock, quic, …)
27//! should reach for these two entry points rather than re-implementing the
28//! switch.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::time::Instant;
33
34use atd_protocol::{Request, Response};
35
36use crate::audit::AuditSink;
37use crate::capability::CapabilitySet;
38use crate::context::CallContext;
39use crate::error::ToolCallError;
40use crate::middleware::Middleware;
41use crate::registry::Registry;
42use crate::secrets::TokenBroker;
43use crate::tier::{TierPolicy, ToolTier};
44use crate::tracker::ReadTracker;
45
46/// Transport-neutral configuration shared across listeners.
47///
48/// Carries the fields every listener needs to dispatch a `RunTool`: cwd for
49/// relative-path tools, the output / timeout budgets, the operator
50/// capability allow-list, plus the pluggable audit sink, token broker, and
51/// server identity. The Unix-socket-specific `socket_path` and the HTTP-
52/// specific `listen` / `extra_origins` / `require_bearer` live in
53/// `atd-server::ServerConfig` and `atd-server-http::HttpServerConfig`
54/// respectively (SP-streamable-http §6.3 — the configs share fields by
55/// composition rather than by trait, to keep struct-literal construction
56/// across crate boundaries ergonomic).
57pub struct SharedServerConfig {
58    pub cwd: PathBuf,
59    pub max_output_bytes: usize,
60    pub default_call_timeout_ms: u64,
61    /// Server-operator capability allow-list. The set the `Hello` handshake
62    /// intersects with on UDS, and the set the HTTP listener intersects
63    /// `BearerIdentity::granted_capabilities` against per request.
64    pub granted_capabilities: Vec<String>,
65    /// Optional audit sink for per-call observability. SP-operability-v1 C1.
66    pub audit_sink: Option<Arc<dyn AuditSink>>,
67    /// Identity string echoed in the `Hello` ack (and in the MCP
68    /// `initialize` response on HTTP). Concretely the deployed server's
69    /// name + version, e.g. `"atd-ref-server 0.3.0"`.
70    pub server_version: String,
71    /// Optional `TokenBroker` for multi-tenant secret routing.
72    /// SP-token-broker-phase1.
73    pub token_broker: Option<Arc<dyn TokenBroker>>,
74    /// Maximum UCAN-lite chain depth accepted by the verifier. Default
75    /// `5` per SP-capability-v2 spec §4.6 — prevents stack-exhaustion
76    /// attacks via pathologically deep proof chains. Override via the
77    /// listener crate's CLI flag if a specific deployment justifies it.
78    pub max_ucan_chain_depth: u8,
79    /// Optional revocation store for UCAN-lite tokens (SP-capability-v2
80    /// §4.7). When `None`, no revocation check is performed; the
81    /// connection-scoped allow-list is the only authority bound.
82    /// Adopters wrap their existing revocation table (e.g. celia's
83    /// `consent.status='revoked'`) behind this trait.
84    pub ucan_revocation_store: Option<Arc<dyn crate::ucan::UcanRevocationStore>>,
85    /// Per-frame deadline applied to reads/writes on a connection that has
86    /// already completed the `Hello` handshake. Long enough to cover a
87    /// reasonable tool call's slowest reply (e.g. `host:media.convert` at
88    /// 25s). Default `30_000` ms. SP-concurrency-baseline §5.2.
89    pub frame_deadline_active_ms: u64,
90    /// Per-frame deadline applied to the pre-Hello handshake window. Short
91    /// enough to fail fast under a single-threaded server starvation
92    /// (the §1.2 root cause of the 2026-05-12 celia incident) so the SDK
93    /// retry path can reissue against a less contended worker. Default
94    /// `5_000` ms. SP-concurrency-baseline §5.2.
95    pub frame_deadline_handshake_ms: u64,
96    /// HMAC signing key for paginated-result cursors. SP-pagination-v1 §4.5.
97    /// Production: random per server startup (so cursor forgery requires
98    /// process-state compromise). Multi-instance load-balanced deployments
99    /// share a key via env (`ATD_CURSOR_SIGNING_KEY=base64...`); the
100    /// listener crates apply this on `Server::new`. Test fixtures use a
101    /// fixed zero key — safe because they don't span processes.
102    pub cursor_signing_key: [u8; 32],
103    /// Time-to-live for paginated-result cursors, in seconds. Cursors older
104    /// than this fail verification with `ERR_CURSOR_EXPIRED` (1020).
105    /// Default `300`s (5 minutes) — long enough for one human "think"
106    /// round-trip without indefinite server-side state retention.
107    pub cursor_ttl_seconds: u64,
108}
109
110impl SharedServerConfig {
111    /// Test/CLI helper — minimal default that compiles wherever a
112    /// `SharedServerConfig` is required, but with empty allow-list (fail-
113    /// closed for capability-gated tools) and no audit sink.
114    pub fn for_test() -> Self {
115        Self {
116            cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
117            max_output_bytes: 1_048_576,
118            default_call_timeout_ms: 60_000,
119            granted_capabilities: vec![],
120            audit_sink: None,
121            server_version: "atd-runtime-test 0.0.0".into(),
122            token_broker: None,
123            max_ucan_chain_depth: 5,
124            ucan_revocation_store: None,
125            frame_deadline_active_ms: 30_000,
126            frame_deadline_handshake_ms: 5_000,
127            cursor_signing_key: [0u8; 32],
128            cursor_ttl_seconds: 300,
129        }
130    }
131}
132
133/// Shared dispatch state. Listener crates own one `Arc<ServerState>` and
134/// hand the clone to per-connection / per-request dispatch.
135///
136/// SP-streamable-http §4.3: every listener holds the **same**
137/// `Arc<Registry>`, exactly one `TierPolicy`, exactly one middleware chain,
138/// and exactly one `SharedServerConfig` snapshot. The HTTP listener does
139/// not duplicate any of this — it composes the same struct.
140pub struct ServerState {
141    pub registry: Registry,
142    pub config: SharedServerConfig,
143    pub tier_policy: TierPolicy,
144    pub middleware: Vec<Arc<dyn Middleware>>,
145    /// SP-concurrency-baseline §5.7 — hot-path counters. Updated by the
146    /// dispatch error arms below, by the per-transport accept loops
147    /// (atd-server / atd-server-http), and by Phase F follow-up wiring
148    /// in audit. Always present; counters default to zero.
149    pub metrics: Arc<crate::metrics::MetricsCounters>,
150    /// SP-pagination-v1 — process-wide HMAC issuer for paginated-result
151    /// cursors. Built once at server startup from
152    /// `SharedServerConfig.cursor_signing_key` and a fresh-random
153    /// `session_nonce`. Server restart → new nonce → outstanding cursors
154    /// expire (`ERR_CURSOR_EXPIRED`). Shared `Arc` so dispatch (verify-
155    /// continuation) and tools (issue-next-cursor via `CallContext`) see
156    /// the same nonce.
157    pub cursor_issuer: Arc<crate::cursor::CursorIssuer>,
158}
159
160/// Run the full `Request` state machine and produce a `Response`.
161///
162/// Intended for stream-oriented transports (UDS) where the listener keeps
163/// per-connection capability + caller-id state across many requests on the
164/// same socket. `caps` and `caller_id` are mutated in place when `Request`
165/// is a `Hello` — UDS `connection.rs::handle_connection` relies on this.
166///
167/// HTTP listeners should not use `dispatch_request` for `tools/call`
168/// translation; they should call [`run_tool`] directly after building a
169/// fresh per-request `CapabilitySet` from the bearer-resolved
170/// `BearerIdentity`. The `Hello` / `Ping` / `ToolList` / `ToolSchema`
171/// branches are still useful in introspection-only contexts and remain
172/// available via the same fn.
173pub async fn dispatch_request(
174    state: &Arc<ServerState>,
175    tracker: &Arc<ReadTracker>,
176    caps: &mut Arc<CapabilitySet>,
177    caller_id: &mut Option<String>,
178    req: Request,
179) -> Response {
180    state
181        .metrics
182        .dispatched_requests
183        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
184    let resp = dispatch_request_inner(state, tracker, caps, caller_id, req).await;
185    if let Response::Error { code: Some(c), .. } = &resp {
186        state.metrics.record_error(*c);
187    }
188    resp
189}
190
191async fn dispatch_request_inner(
192    state: &Arc<ServerState>,
193    tracker: &Arc<ReadTracker>,
194    caps: &mut Arc<CapabilitySet>,
195    caller_id: &mut Option<String>,
196    req: Request,
197) -> Response {
198    match req {
199        Request::Ping => Response::Pong,
200        Request::Hello {
201            client_id,
202            requested_capabilities,
203            ucan_tokens,
204        } => {
205            *caller_id = client_id.clone();
206            let allow = CapabilitySet::from_iter(state.config.granted_capabilities.iter().cloned());
207            let (granted_strings_vec, _denied) = allow.intersect(&requested_capabilities);
208            let granted_strings = CapabilitySet::from_iter(granted_strings_vec);
209
210            // SP-capability-v2 Phase C: verify any presented UCAN-lite
211            // tokens and union the resulting caps with the SP-12 string
212            // allow-list result. Pre-SP-capability-v2 clients pass an
213            // empty `ucan_tokens` vec (per `#[serde(default)]`); their
214            // path is byte-identical to SP-12.
215            let granted_ucan = if ucan_tokens.is_empty() {
216                CapabilitySet::empty()
217            } else {
218                // Audience-pin requires a client_id — if the client sends
219                // ucan_tokens but no client_id, we cannot bind the audience
220                // and must reject. Spec §4.6 / §5.4 (1013 audience-mismatch
221                // family).
222                let expected_aud = match caller_id.as_ref() {
223                    Some(s) if !s.is_empty() => s.clone(),
224                    _ => {
225                        return Response::Error {
226                            message: "UCAN tokens require Hello.client_id (audience pin)"
227                                .to_string(),
228                            code: Some(atd_protocol::ERR_AUDIENCE_MISMATCH),
229                            retryable: Some(false),
230                            details: None,
231                        };
232                    }
233                };
234                let mut cfg = crate::ucan::VerifyConfig::new(expected_aud);
235                cfg.max_chain_depth = state.config.max_ucan_chain_depth;
236                cfg.revocation_store = state.config.ucan_revocation_store.clone();
237                match crate::ucan::verify_tokens(&ucan_tokens, &cfg, std::time::SystemTime::now()) {
238                    Ok(c) => c,
239                    Err(e) => {
240                        let code = crate::ucan::wire_code(&e);
241                        return Response::Error {
242                            message: e.to_string(),
243                            code: Some(code),
244                            retryable: Some(false),
245                            details: None,
246                        };
247                    }
248                }
249            };
250
251            let granted_caps = granted_strings.union(&granted_ucan);
252            let granted_vec = granted_caps.granted();
253            *caps = Arc::new(granted_caps);
254            Response::HelloAck {
255                granted_capabilities: granted_vec,
256                server_version: state.config.server_version.clone(),
257                supported_tiers: vec!["hot".into(), "warm".into(), "cold".into()],
258            }
259        }
260        Request::ToolList => {
261            let summaries: Vec<_> = state
262                .registry
263                .summaries()
264                .into_iter()
265                .filter(|s| !matches!(s.visibility, atd_protocol::ToolVisibility::Hidden))
266                .collect();
267            Response::ToolListResponse {
268                tools: serde_json::to_value(&summaries).unwrap_or_else(|_| serde_json::json!([])),
269            }
270        }
271        Request::ToolSchema { tool_id } => match state.registry.get(&tool_id) {
272            Some(entry) => Response::ToolSchemaResponse {
273                schema: serde_json::to_value(entry.definition())
274                    .unwrap_or_else(|_| serde_json::json!({})),
275            },
276            None => Response::Error {
277                message: format!("tool not found: {tool_id}"),
278                code: None,
279                retryable: Some(false),
280                details: None,
281            },
282        },
283        Request::RunTool {
284            tool_id,
285            args,
286            dry_run,
287        } => {
288            run_tool(
289                state,
290                tracker,
291                caps,
292                caller_id.as_deref(),
293                tool_id,
294                args,
295                dry_run,
296            )
297            .await
298        }
299        Request::RunToolContinue { tool_id, cursor } => {
300            run_tool_continue(state, tracker, caps, caller_id.as_deref(), tool_id, cursor).await
301        }
302    }
303}
304
305/// SP-pagination-v1 §4.4 — handle a `Request::RunToolContinue`.
306///
307/// Steps:
308/// 1. Verify the cursor (HMAC + TTL + session nonce) → `CursorPayload`.
309/// 2. Reject if the cursor's `tool_id` ≠ request's `tool_id` (anti-replay).
310/// 3. Look up the tool; reject if not found, or if it doesn't override
311///    `supports_pagination()` (a cursor against a non-paginating tool is
312///    bug-or-attack territory; 1021).
313/// 4. Re-check `required_capabilities` against the current connection's
314///    capability set — caps may have changed since the cursor was issued
315///    (UCAN revocation, Hello re-negotiation).
316/// 5. Acquire the per-tool semaphore (same rate-limit envelope as `run_tool`).
317/// 6. Build a `CallContext` with the cursor issuer attached.
318/// 7. Call `tool.call_paginated(Value::Null, &ctx, Some(&cursor_str))`.
319///    Args are intentionally `Null` on continuation — original args were
320///    fingerprinted into the cursor and the tool is expected to read its
321///    state from `cursor.opaque_state`.
322/// 8. Emit one audit event tagged with `cursor_page` = payload's page_index.
323/// 9. Return `ToolResultResponse` with the new `next_cursor` from the tool.
324#[allow(clippy::too_many_arguments)]
325pub async fn run_tool_continue(
326    state: &Arc<ServerState>,
327    tracker: &Arc<ReadTracker>,
328    caps: &Arc<CapabilitySet>,
329    caller_id: Option<&str>,
330    tool_id: String,
331    cursor: String,
332) -> Response {
333    use std::sync::atomic::Ordering;
334
335    let start = Instant::now();
336    let audit_call_id = ulid::Ulid::new();
337
338    // Build an issuer from the configured signing key. The session_nonce
339    // is fresh per construction — but cursors carry the issuer's nonce
340    // from issue-time, and the issuer constructed at server startup is
341    // the one whose nonce went into the cursor. Here we need to verify
342    // against THAT issuer. SharedServerConfig holds the key but not the
343    // issuer; for v1 the listener crates construct the issuer at startup
344    // and inject it via CallContext. Until that path lands we reconstruct
345    // here using the stored key — verification of cursors issued in this
346    // same process still succeeds because the dispatch path that issued
347    // the cursor used the same key + a matching session_nonce embedded.
348    //
349    // TODO: thread Arc<CursorIssuer> through ServerState so the nonce
350    // truly survives across paginated calls in the same process. For
351    // now we accept that the first call_paginated within this fn will
352    // be reading a cursor whose session_nonce came from a sister issuer
353    // — server_session check will hold across the same process because
354    // both issuers got their nonce from the same OS RNG sequence... no
355    // wait, they didn't. Each `CursorIssuer::new` randomizes the nonce.
356    //
357    // Use the process-wide issuer from ServerState so the session_nonce
358    // matches across issue (first-page in run_tool) and verify (continuation
359    // here). One issuer per server process by design.
360    let payload = match state
361        .cursor_issuer
362        .verify(&cursor, state.config.cursor_ttl_seconds)
363    {
364        Ok(p) => p,
365        Err(crate::cursor::CursorError::Expired) => {
366            return Response::Error {
367                message: "cursor expired; re-issue the original RunTool".into(),
368                code: Some(atd_protocol::ERR_CURSOR_EXPIRED),
369                retryable: Some(false),
370                details: None,
371            };
372        }
373        Err(_) => {
374            return Response::Error {
375                message: "cursor invalid".into(),
376                code: Some(atd_protocol::ERR_CURSOR_INVALID),
377                retryable: Some(false),
378                details: None,
379            };
380        }
381    };
382
383    if payload.tool_id != tool_id {
384        return Response::Error {
385            message: format!(
386                "cursor tool_id mismatch: cursor={} request={tool_id}",
387                payload.tool_id
388            ),
389            code: Some(atd_protocol::ERR_CURSOR_INVALID),
390            retryable: Some(false),
391            details: None,
392        };
393    }
394
395    let entry = match state.registry.get(&tool_id) {
396        Some(e) => e.clone(),
397        None => {
398            return Response::Error {
399                message: format!("tool not found: {tool_id}"),
400                code: None,
401                retryable: Some(false),
402                details: None,
403            };
404        }
405    };
406
407    if !entry.tool.supports_pagination() {
408        return Response::Error {
409            message: format!("tool {tool_id} does not support pagination but received a cursor"),
410            code: Some(atd_protocol::ERR_CURSOR_INVALID),
411            retryable: Some(false),
412            details: None,
413        };
414    }
415
416    let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
417
418    // Re-check capability gating — caps can change mid-connection (UCAN revocation).
419    let required = entry.definition().required_capabilities.clone();
420    let missing: Vec<String> = required
421        .iter()
422        .filter(|c| !caps.contains(c))
423        .cloned()
424        .collect();
425    if !missing.is_empty() {
426        return Response::Error {
427            message: format!("capability denied for {tool_id}: missing {missing:?}"),
428            code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
429            retryable: Some(false),
430            details: None,
431        };
432    }
433
434    let _permit = match entry.semaphore.clone().try_acquire_owned() {
435        Ok(p) => p,
436        Err(_) => {
437            return Response::Error {
438                message: format!("rate limited for {tool_id} (continuation)"),
439                code: Some(atd_protocol::ERR_RATE_LIMITED),
440                retryable: Some(true),
441                details: None,
442            };
443        }
444    };
445
446    let tier_timeout = state.tier_policy.timeout(tier);
447    let tier_max_output = state.tier_policy.max_output(tier);
448    let ctx = CallContext::new(
449        state.config.cwd.clone(),
450        tier_max_output,
451        audit_call_id,
452        Some(Instant::now() + tier_timeout),
453        Some(tracker.clone()),
454        caps.clone(),
455        tier,
456        caller_id.map(|s| s.to_string()),
457        None, // secrets — broker resolution skipped on continuation; the
458              // tool reads continuation state from cursor.opaque_state
459    )
460    .with_cursor_issuer(state.cursor_issuer.clone());
461
462    let page_index = payload.page_index;
463    let result = entry
464        .tool
465        .call_paginated(serde_json::Value::Null, &ctx, Some(&cursor))
466        .await;
467
468    let response = match result {
469        Ok(crate::registry::PaginatedResult {
470            mut value,
471            next_cursor,
472        }) => {
473            // SP-pagination-v1 §G5 — apply the middleware chain per page so
474            // FHIR validation / PHI redaction / other egress rewrites cover
475            // continuation pages, not just first-page (RunTool) results. The
476            // celia adopter binds atd-middleware-fhir + atd-middleware-pii-
477            // redact-medical here; missing this hook would leak unredacted
478            // PHI on every continue — a real compliance defect.
479            for mw in &state.middleware {
480                mw.on_result(&tool_id, entry.definition(), &mut value);
481            }
482            Response::ToolResultResponse {
483                tool_id: tool_id.clone(),
484                result: value,
485                success: true,
486                dry_run: false,
487                next_cursor,
488            }
489        }
490        Err(crate::error::ToolCallError::ExecutionFailed {
491            code,
492            message,
493            retryable,
494        }) => Response::ToolResultResponse {
495            tool_id: tool_id.clone(),
496            result: serde_json::json!({
497                "code": code,
498                "message": message,
499                "retryable": retryable,
500            }),
501            success: false,
502            dry_run: false,
503            next_cursor: None,
504        },
505        Err(e) => Response::Error {
506            message: format!("tool {tool_id} continuation failed: {e:?}"),
507            code: None,
508            retryable: Some(false),
509            details: None,
510        },
511    };
512
513    // Audit emission with cursor_page tag (the v2 schema field).
514    if let Some(sink) = state.config.audit_sink.as_ref() {
515        state
516            .metrics
517            .audit_events_total
518            .fetch_add(1, Ordering::Relaxed);
519        let outcome = match &response {
520            Response::ToolResultResponse { success: true, .. } => crate::audit::Outcome::Success,
521            _ => crate::audit::Outcome::ExecutionFailed {
522                code: "continuation_failed".into(),
523                retryable: false,
524            },
525        };
526        sink.on_call(&crate::audit::CallEvent {
527            ts: crate::audit::now_rfc3339(),
528            call_id: audit_call_id.to_string(),
529            tool_id: tool_id.clone(),
530            caller_id: caller_id.map(|s| s.to_string()),
531            granted_capabilities: caps.granted(),
532            duration_ms: start.elapsed().as_millis() as u64,
533            outcome,
534            tier: crate::tier::tier_as_str(tier).to_string(),
535            dry_run: false,
536            schema_version: crate::audit::SCHEMA_VERSION,
537            secrets_resolved: false,
538            cursor_page: Some(page_index),
539        });
540    }
541
542    response
543}
544
545/// Execute one `RunTool` request against the shared dispatch state.
546///
547/// Single-entry-point for both transports: UDS goes through
548/// [`dispatch_request`] which forwards `Request::RunTool` here; HTTP
549/// (`atd-server-http::mcp::tools_call`) calls this fn directly. The
550/// emitted `Response` is the same enum + same JSON encoding on either
551/// route — `Response::ToolResultResponse` on success / execution-failed,
552/// `Response::Error` on capability-denied / rate-limited / broker error /
553/// tool-not-found / invalid args / internal error.
554///
555/// The body is the verbatim move of the SP-streamable-http §6.3
556/// `atd-server::connection.rs::dispatch` `RunTool` arm — every audit
557/// emission, capability check, semaphore acquire, broker call, binding
558/// dispatch, middleware step, and error map is preserved. The existing
559/// `atd-server::connection::tests` suite covers all branches and is
560/// re-routed through this fn unchanged after refactor.
561#[allow(clippy::too_many_arguments)]
562pub async fn run_tool(
563    state: &Arc<ServerState>,
564    tracker: &Arc<ReadTracker>,
565    caps: &Arc<CapabilitySet>,
566    caller_id: Option<&str>,
567    tool_id: String,
568    args: serde_json::Value,
569    dry_run: bool,
570) -> Response {
571    // SP-operability-v1 C1: per-call audit scaffolding. `start` measures
572    // wall-clock duration from dispatch entry; `audit_call_id` is the
573    // stable id put on `CallEvent` regardless of which return branch
574    // fires. Emission is a no-op when `audit_sink` is None.
575    let start = Instant::now();
576    let audit_call_id = ulid::Ulid::new();
577    // SP-pagination-v1 — cursor_page is captured-mut so paginated paths can
578    // tag the audit event with the page index (1 for first, 2+ for continues).
579    // Non-paginated dispatches leave it None and the field is omitted on
580    // the wire via #[serde(skip_serializing_if = "Option::is_none")].
581    let cursor_page: Option<u32> = None;
582    let emit = |outcome: crate::audit::Outcome, tier: ToolTier, secrets_resolved: bool| {
583        if let Some(sink) = state.config.audit_sink.as_ref() {
584            state
585                .metrics
586                .audit_events_total
587                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
588            sink.on_call(&crate::audit::CallEvent {
589                ts: crate::audit::now_rfc3339(),
590                call_id: audit_call_id.to_string(),
591                tool_id: tool_id.clone(),
592                caller_id: caller_id.map(|s| s.to_string()),
593                granted_capabilities: caps.granted(),
594                duration_ms: start.elapsed().as_millis() as u64,
595                outcome,
596                tier: crate::tier::tier_as_str(tier).to_string(),
597                dry_run,
598                schema_version: crate::audit::SCHEMA_VERSION,
599                secrets_resolved,
600                cursor_page,
601            });
602        }
603    };
604
605    if dry_run {
606        // Dry-run short-circuits BEFORE tier derivation — use Warm as the
607        // placeholder tier for the audit event.
608        emit(crate::audit::Outcome::Success, ToolTier::Warm, false);
609        return Response::ToolResultResponse {
610            tool_id: tool_id.clone(),
611            result: serde_json::json!({
612                "dry_run": true,
613                "tool_id": tool_id,
614                "args_preview": args,
615            }),
616            success: true,
617            dry_run: true,
618            next_cursor: None,
619        };
620    }
621    let entry = match state.registry.get(&tool_id) {
622        Some(e) => e.clone(),
623        None => {
624            emit(crate::audit::Outcome::ToolNotFound, ToolTier::Warm, false);
625            return Response::Error {
626                message: format!("tool not found: {tool_id}"),
627                code: None,
628                retryable: Some(false),
629                details: None,
630            };
631        }
632    };
633    let tier = entry.definition().tier.unwrap_or(ToolTier::Warm);
634    // SP-12 Task 2: capability enforcement.
635    let required = entry.definition().required_capabilities.clone();
636    let missing: Vec<String> = required
637        .iter()
638        .filter(|c| !caps.contains(c))
639        .cloned()
640        .collect();
641    if !missing.is_empty() {
642        let mut required_sorted = required.clone();
643        required_sorted.sort();
644        let mut missing_sorted = missing.clone();
645        missing_sorted.sort();
646        emit(
647            crate::audit::Outcome::CapabilityDenied {
648                missing: missing_sorted.clone(),
649            },
650            tier,
651            false,
652        );
653        return Response::Error {
654            message: format!("capability denied for {tool_id}: missing {missing_sorted:?}"),
655            code: Some(atd_protocol::ERR_CAPABILITY_DENIED),
656            retryable: Some(false),
657            details: Some(serde_json::json!({
658                "required": required_sorted,
659                "granted": caps.granted(),
660                "missing": missing_sorted,
661            })),
662        };
663    }
664    // SP-operability-v1 C2: rate-limit enforcement.
665    let _permit = match entry.semaphore.clone().try_acquire_owned() {
666        Ok(p) => p,
667        Err(_) => {
668            let max_conc = entry.tool.definition().resources.max_concurrent;
669            emit(
670                crate::audit::Outcome::RateLimited {
671                    retry_after_ms: None,
672                },
673                tier,
674                false,
675            );
676            return Response::Error {
677                message: format!("rate limited for {tool_id}: max_concurrent={max_conc} in-flight"),
678                code: Some(atd_protocol::ERR_RATE_LIMITED),
679                retryable: Some(true),
680                details: Some(serde_json::json!({
681                    "tool_id": tool_id,
682                    "limit": max_conc,
683                })),
684            };
685        }
686    };
687
688    // SP-token-broker-phase1: resolve secrets via the configured TokenBroker.
689    let secrets = match state.config.token_broker.as_ref() {
690        None => None,
691        Some(broker) => match broker.resolve(caller_id).await {
692            Ok(bundle) => bundle,
693            Err(e) => {
694                emit(
695                    crate::audit::Outcome::ExecutionFailed {
696                        code: "broker_error".into(),
697                        retryable: true,
698                    },
699                    tier,
700                    false,
701                );
702                return Response::Error {
703                    message: format!("token broker error for {tool_id}: {e}"),
704                    code: Some(atd_protocol::ERR_BROKER_FAILED),
705                    retryable: Some(true),
706                    details: None,
707                };
708            }
709        },
710    };
711    let secrets_resolved = secrets.is_some();
712    let tier_timeout = state.tier_policy.timeout(tier);
713    let tier_max_output = state.tier_policy.max_output(tier);
714
715    // SP-pagination-v1 §4.4 — paginated tools bypass the Binding layer so
716    // they can emit cursors on the first page. Non-paginated tools keep
717    // going through binding so CLI / future MCP / REST bindings stay
718    // intact. The check is one bool dispatch per call — negligible.
719    let ctx = if entry.tool.supports_pagination() {
720        CallContext::new(
721            state.config.cwd.clone(),
722            tier_max_output,
723            audit_call_id,
724            Some(Instant::now() + tier_timeout),
725            Some(tracker.clone()),
726            caps.clone(),
727            tier,
728            caller_id.map(|s| s.to_string()),
729            secrets,
730        )
731        .with_cursor_issuer(state.cursor_issuer.clone())
732    } else {
733        CallContext::new(
734            state.config.cwd.clone(),
735            tier_max_output,
736            audit_call_id,
737            Some(Instant::now() + tier_timeout),
738            Some(tracker.clone()),
739            caps.clone(),
740            tier,
741            caller_id.map(|s| s.to_string()),
742            secrets,
743        )
744    };
745
746    // Two paths: paginated tools call `Tool::call_paginated` directly with
747    // cursor=None (first page); non-paginated tools go through the Binding
748    // (preserves CLI / native dispatch semantics).
749    let call_result: Result<(serde_json::Value, Option<String>), ToolCallError> =
750        if entry.tool.supports_pagination() {
751            entry
752                .tool
753                .call_paginated(args, &ctx, None)
754                .await
755                .map(|p| (p.value, p.next_cursor))
756        } else {
757            entry
758                .binding
759                .call(entry.definition(), args, &ctx)
760                .await
761                .map(|v| (v, None))
762        };
763    match call_result {
764        Ok((mut data, next_cursor)) => {
765            for mw in &state.middleware {
766                mw.on_result(&tool_id, entry.definition(), &mut data);
767            }
768            emit(crate::audit::Outcome::Success, tier, secrets_resolved);
769            Response::ToolResultResponse {
770                tool_id,
771                result: data,
772                success: true,
773                dry_run: false,
774                next_cursor,
775            }
776        }
777        Err(ToolCallError::InvalidArgs(msg)) => {
778            emit(
779                crate::audit::Outcome::InvalidArgs {
780                    message: msg.clone(),
781                },
782                tier,
783                secrets_resolved,
784            );
785            Response::Error {
786                message: format!("invalid args for {tool_id}: {msg}"),
787                code: None,
788                retryable: Some(false),
789                details: None,
790            }
791        }
792        Err(ToolCallError::ExecutionFailed {
793            code,
794            message,
795            retryable,
796        }) => {
797            emit(
798                crate::audit::Outcome::ExecutionFailed {
799                    code: code.clone(),
800                    retryable,
801                },
802                tier,
803                secrets_resolved,
804            );
805            Response::ToolResultResponse {
806                tool_id,
807                result: serde_json::json!({
808                    "code": code,
809                    "message": message,
810                    "retryable": retryable,
811                }),
812                success: false,
813                dry_run: false,
814                next_cursor: None,
815            }
816        }
817        Err(ToolCallError::InternalError(msg)) => {
818            emit(
819                crate::audit::Outcome::ExecutionFailed {
820                    code: "INTERNAL".into(),
821                    retryable: false,
822                },
823                tier,
824                secrets_resolved,
825            );
826            Response::Error {
827                message: format!("internal error in {tool_id}: {msg}"),
828                code: None,
829                retryable: Some(false),
830                details: None,
831            }
832        }
833        Err(other) => {
834            emit(
835                crate::audit::Outcome::ExecutionFailed {
836                    code: "UNHANDLED".into(),
837                    retryable: false,
838                },
839                tier,
840                secrets_resolved,
841            );
842            Response::Error {
843                message: format!("unhandled tool error in {tool_id}: {other}"),
844                code: Some(1999),
845                retryable: Some(false),
846                details: None,
847            }
848        }
849    }
850}