Skip to main content

meerkat_core/
agent.rs

1//! Agent - the core agent orchestrator
2//!
3//! The Agent struct ties together all components and runs the agent loop.
4
5mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10#[cfg(test)]
11mod hooks_behavior_tests;
12mod runner;
13pub mod skills;
14mod state;
15#[cfg(test)]
16#[doc(hidden)]
17pub(crate) mod test_turn_state_handle;
18use crate::budget::Budget;
19use crate::comms::{
20    CommsCommand, CommsTrustMutation, CommsTrustMutationResult, EventStream, PeerDirectoryEntry,
21    PeerId, SendAndStreamError, SendError, SendReceipt, StreamError, StreamScope,
22    TrustedPeerDescriptor,
23};
24use crate::compact::SessionCompactionCadence;
25use crate::completion_feed::CompletionSeq;
26use crate::config::{AgentConfig, HookRunOverrides};
27use crate::error::AgentError;
28use crate::event::ExternalToolDelta;
29use crate::hooks::HookEngine;
30use crate::lifecycle::RunId;
31use crate::lifecycle::run_primitive::ProviderParamsOverride;
32use crate::ops::OperationId;
33use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
34use crate::retry::RetryPolicy;
35use crate::schema::{CompiledSchema, SchemaError};
36use crate::session::Session;
37use crate::state::LoopState;
38#[cfg(target_arch = "wasm32")]
39use crate::tokio;
40use crate::tool_catalog::{
41    ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
42    select_catalog_mode_from_snapshot,
43};
44use crate::tool_scope::ToolScope;
45use crate::turn_execution_authority::{
46    ContentShape, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
47};
48use crate::types::{
49    AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
50    ToolDef, ToolName, ToolNameSet, Usage,
51};
52use async_trait::async_trait;
53use serde::{Deserialize, Serialize};
54use std::collections::{BTreeMap, BTreeSet};
55use std::sync::Arc;
56
57pub use builder::{AgentBuildPolicyError, AgentBuilder, DefaultSystemPromptPolicy};
58pub use runner::{AgentRunner, SnapshotProjectionError, SystemContextStateError};
59
60/// Trait for LLM clients that can be used with the agent
61#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
62#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
63pub trait AgentLlmClient: Send + Sync {
64    /// Stream a response from the LLM
65    async fn stream_response(
66        &self,
67        messages: &[Message],
68        tools: &[Arc<ToolDef>],
69        max_tokens: u32,
70        temperature: Option<f32>,
71        provider_params: Option<&ProviderParamsOverride>,
72    ) -> Result<LlmStreamResult, AgentError>;
73
74    /// Get the typed catalog provider identity for this client.
75    ///
76    /// Clients return the typed [`crate::provider::Provider`] directly so no
77    /// boundary ever parses a caller-supplied string back into catalog
78    /// identity. String projections are derived via
79    /// [`crate::provider::Provider::as_str`].
80    fn provider(&self) -> crate::provider::Provider;
81
82    /// Get the current effective model identifier.
83    ///
84    /// Used by the agent loop for profile-default resolution (e.g., call timeout
85    /// defaults that vary per model family). Must reflect the current model even
86    /// after hot-swap.
87    fn model(&self) -> &str;
88
89    /// Prepare the next prebuilt fallback model after the generated turn
90    /// authority has classified the LLM failure as recoverable.
91    ///
92    /// This method does not classify failures and must not call the provider.
93    /// It only selects an already-constructed candidate and returns the typed
94    /// state the agent loop must apply before the retry attempt.
95    fn prepare_model_fallback(&self, _failure: &AgentError) -> Option<AgentLlmFallbackSwitch> {
96        None
97    }
98
99    /// Commit a prepared fallback after the agent loop has applied the paired
100    /// request policy, identity, and tool-visibility state.
101    fn commit_model_fallback(&self, _identity: &crate::SessionLlmIdentity) {}
102
103    /// Capability base filter for the current active model.
104    ///
105    /// The agent loop composes this with the already-authorized tool set before
106    /// every provider call so sticky fallback models keep their downgraded tool
107    /// surface on later turns too.
108    fn active_capability_base_filter(&self) -> crate::ToolFilter {
109        crate::ToolFilter::All
110    }
111
112    /// Output-token ceiling for the current active model, if the catalog knows
113    /// one. Used on every provider call so a sticky fallback keeps respecting
114    /// the backup model's smaller output limit after the failover turn.
115    fn active_max_output_tokens(&self) -> Option<u32> {
116        None
117    }
118
119    /// Reset per-call observation of user-visible streaming output.
120    ///
121    /// Adapters that emit display/reasoning deltas before returning the final
122    /// stream result use this to let the retry loop distinguish a pre-stream
123    /// failure from a post-partial-output failure. The default is no-op for
124    /// clients that do not stream visible events outside the returned blocks.
125    fn begin_stream_output_observation(&self) {}
126
127    /// Whether the current LLM call has emitted user-visible streaming output.
128    ///
129    /// A `true` value suppresses model fallback for the failed call: retrying
130    /// against a different model after users already saw partial output can
131    /// produce duplicate assistant answers. Ordinary same-model retry policy is
132    /// still governed by the generated turn recovery authority.
133    fn stream_output_observed(&self) -> bool {
134        false
135    }
136
137    /// Compile an output schema for this provider.
138    ///
139    /// Default implementation normalizes the schema without provider-specific lowering.
140    /// Adapters override this to apply provider-specific transformations (e.g.,
141    /// Anthropic adds `additionalProperties: false`, Gemini strips unsupported keywords).
142    fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
143        // Default passthrough: normalized clone, no provider-specific lowering
144        Ok(CompiledSchema {
145            schema: output_schema.schema.as_value().clone(),
146            warnings: Vec::new(),
147        })
148    }
149}
150
151/// Hook for wrapping the final agent-facing LLM client.
152///
153/// Factories and runtimes apply this after provider/raw-client adaptation so
154/// embedders can compose cross-cutting behavior without provider-specific
155/// registry hooks.
156pub type AgentLlmClientDecorator =
157    Arc<dyn Fn(Arc<dyn AgentLlmClient>) -> Arc<dyn AgentLlmClient> + Send + Sync + 'static>;
158
159/// One fallback target skipped while selecting a viable backup model.
160#[derive(Debug, Clone)]
161pub struct AgentLlmFallbackSkippedTarget {
162    pub identity: crate::SessionLlmIdentity,
163    pub reason: String,
164}
165
166/// Typed state produced when an agent-facing LLM client activates a fallback.
167///
168/// The client owns only prebuilt candidate selection. The agent loop owns
169/// applying request policy, durable identity metadata, and tool visibility
170/// before issuing the machine-authorized retry.
171#[derive(Debug, Clone)]
172pub struct AgentLlmFallbackSwitch {
173    pub previous_identity: crate::SessionLlmIdentity,
174    pub new_identity: crate::SessionLlmIdentity,
175    pub request_policy: crate::SessionLlmRequestPolicy,
176    pub capability_base_filter: crate::ToolFilter,
177    pub context_window: Option<u32>,
178    pub max_output_tokens: Option<u32>,
179    pub skipped_targets: Vec<AgentLlmFallbackSkippedTarget>,
180}
181
182/// Result of streaming from the LLM
183pub struct LlmStreamResult {
184    blocks: Vec<AssistantBlock>,
185    stop_reason: StopReason,
186    usage: Usage,
187}
188
189impl LlmStreamResult {
190    pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
191        Self {
192            blocks,
193            stop_reason,
194            usage,
195        }
196    }
197
198    pub fn blocks(&self) -> &[AssistantBlock] {
199        &self.blocks
200    }
201    pub fn stop_reason(&self) -> StopReason {
202        self.stop_reason
203    }
204    pub fn usage(&self) -> &Usage {
205        &self.usage
206    }
207
208    pub fn into_message(self) -> BlockAssistantMessage {
209        BlockAssistantMessage::new(self.blocks, self.stop_reason)
210    }
211
212    pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
213        (self.blocks, self.stop_reason, self.usage)
214    }
215}
216
217/// Snapshot of the core agent's live execution state.
218///
219/// When a runtime-backed turn-state handle is attached, this snapshots the
220/// runtime-owned turn machine; otherwise it falls back to the in-process
221/// standalone turn state used by core-only execution.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct AgentExecutionSnapshot {
224    pub loop_state: LoopState,
225    pub turn_phase: TurnPhase,
226    /// Machine-owned turn-terminality verdict.
227    ///
228    /// The `TurnTerminalityClassified.terminal` verdict emitted by the canonical
229    /// MeerkatMachine `ClassifyTurnTerminality` input. Consumers mirror this bool
230    /// and must not reclassify [`TurnPhase`] locally.
231    pub turn_terminal: bool,
232    pub active_run_id: Option<RunId>,
233    pub primitive_kind: TurnPrimitiveKind,
234    pub admitted_content_shape: Option<ContentShape>,
235    pub vision_enabled: bool,
236    pub image_tool_results_enabled: bool,
237    pub tool_calls_pending: u32,
238    pub pending_operation_ids: Option<Vec<OperationId>>,
239    pub barrier_operation_ids: Vec<OperationId>,
240    pub has_barrier_ops: bool,
241    pub barrier_satisfied: bool,
242    pub boundary_count: u32,
243    pub cancel_after_boundary: bool,
244    pub terminal_outcome: TurnTerminalOutcome,
245    pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
246    pub extraction_attempts: u32,
247    pub max_extraction_retries: u32,
248    pub applied_cursor: CompletionSeq,
249}
250
251/// Result of polling for external tool updates.
252///
253/// Returned by [`AgentToolDispatcher::poll_external_updates`].
254#[derive(Debug, Clone, Default)]
255pub struct ExternalToolUpdate {
256    /// Notices about completed background operations since last poll.
257    pub notices: Vec<ExternalToolDelta>,
258    /// Names of servers still connecting in the background.
259    pub pending: Vec<String>,
260}
261
262/// Typed command requesting cancellation at the next turn boundary.
263///
264/// Carried over the cancel-after-boundary command channel from the surface
265/// that authorized the request (e.g. `SessionService::cancel_after_boundary`)
266/// to the agent loop, which observes it at the next boundary. The agent
267/// resolves the request against its own live active run, so the command
268/// carries no run id; it is a typed edge signal, replacing the previous raw
269/// `AtomicBool` carrier.
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub struct CancelAfterBoundaryCommand;
272
273/// Producer end of the cancel-after-boundary command channel.
274///
275/// Cloned and handed to the requesting surface via
276/// [`Agent::cancel_after_boundary_handle`]; mirrors the cloneable-handle shape
277/// of the session-side `interrupt_notify` so a surface can request boundary
278/// cancellation without holding a reference to the agent.
279pub type CancelAfterBoundarySender = tokio::sync::mpsc::UnboundedSender<CancelAfterBoundaryCommand>;
280
281/// Typed context supplied by the agent loop when dispatching a tool call.
282///
283/// This is a dispatch-time projection of the already-admitted turn input. It
284/// lets tool surfaces resolve typed turn-scoped references, such as a
285/// `source=current_turn, index=0` image ref, without writing surface-local
286/// metadata into canonical transcript history.
287#[derive(Debug, Clone, Default, PartialEq, Eq)]
288pub struct ToolDispatchContext {
289    current_turn: Option<CurrentTurnContent>,
290    turn_metadata: BTreeMap<String, serde_json::Value>,
291}
292
293impl ToolDispatchContext {
294    pub fn from_current_turn_input(input: &crate::types::ContentInput) -> Self {
295        let blocks = match input {
296            crate::types::ContentInput::Text(_) => None,
297            crate::types::ContentInput::Blocks(blocks) => Some(blocks.clone()),
298        };
299        Self {
300            current_turn: blocks.map(CurrentTurnContent::new),
301            turn_metadata: BTreeMap::new(),
302        }
303    }
304
305    /// Project the typed run input into a dispatch context. The
306    /// pending-tool-results continuation carries no caller content, so it
307    /// projects to an empty context rather than a fabricated empty prompt.
308    pub fn from_run_input(input: &crate::types::RunInput) -> Self {
309        match input {
310            crate::types::RunInput::Content { content } => Self::from_current_turn_input(content),
311            crate::types::RunInput::PendingToolResults => Self::default(),
312        }
313    }
314
315    #[must_use]
316    pub fn with_turn_metadata(mut self, metadata: BTreeMap<String, serde_json::Value>) -> Self {
317        self.turn_metadata = metadata;
318        self
319    }
320
321    pub fn turn_metadata(&self, key: &str) -> Option<&serde_json::Value> {
322        self.turn_metadata.get(key)
323    }
324
325    pub fn current_turn(&self) -> Option<&CurrentTurnContent> {
326        self.current_turn.as_ref()
327    }
328
329    pub fn current_turn_image(
330        &self,
331        image_ref: CurrentTurnImageRef,
332    ) -> Option<&crate::types::ContentBlock> {
333        self.current_turn
334            .as_ref()
335            .and_then(|current_turn| current_turn.image(image_ref))
336    }
337}
338
339/// Typed reference to an image in the current admitted turn.
340///
341/// The wrapped index addresses the turn's *filtered image stream*, not the
342/// raw block list: ref `N` designates the `(N + 1)`-th image block of the
343/// current turn, skipping non-image blocks (so ref `0` is the first image
344/// even when text blocks precede it).
345///
346/// The field is private. In-process code mints refs only via
347/// [`CurrentTurnContent::image_ref`], which returns a ref only when the
348/// referenced image exists. Wire ingress (e.g. the comms `image_ref` tool
349/// input) deserializes a bare JSON integer directly into this type via
350/// `#[serde(transparent)]` — that is the sanctioned parse-at-ingress path,
351/// and resolution through [`CurrentTurnContent::image`] still validates
352/// existence.
353#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
354#[serde(transparent)]
355pub struct CurrentTurnImageRef(usize);
356
357impl std::fmt::Display for CurrentTurnImageRef {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        std::fmt::Display::fmt(&self.0, f)
360    }
361}
362
363/// Multimodal content from the currently admitted turn.
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct CurrentTurnContent {
366    blocks: Vec<crate::types::ContentBlock>,
367}
368
369impl CurrentTurnContent {
370    pub fn new(blocks: Vec<crate::types::ContentBlock>) -> Self {
371        Self { blocks }
372    }
373
374    pub fn blocks(&self) -> &[crate::types::ContentBlock] {
375        &self.blocks
376    }
377
378    /// Mint a typed reference to the `n`-th image of this turn's filtered
379    /// image stream. Returns `Some` only when that image exists, so every
380    /// in-process [`CurrentTurnImageRef`] is resolvable at mint time.
381    pub fn image_ref(&self, n: usize) -> Option<CurrentTurnImageRef> {
382        self.images().nth(n).map(|_| CurrentTurnImageRef(n))
383    }
384
385    pub fn image(&self, image_ref: CurrentTurnImageRef) -> Option<&crate::types::ContentBlock> {
386        self.images().nth(image_ref.0)
387    }
388
389    fn images(&self) -> impl Iterator<Item = &crate::types::ContentBlock> {
390        self.blocks
391            .iter()
392            .filter(|block| matches!(block, crate::types::ContentBlock::Image { .. }))
393    }
394}
395
396/// Completion notice for a detached background operation, projected from
397/// canonical ops-lifecycle terminal state plus dispatcher-owned display metadata.
398///
399/// This is a rebuildable projection (INV-003), not authoritative state.
400/// Terminal class and timing come from `OperationLifecycleSnapshot` (INV-001).
401/// Shell-projected detail is supplementary display only (INV-002).
402#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct DetachedOpCompletion {
404    /// App-facing job identifier (the control noun for surfaces).
405    pub job_id: String,
406    /// Operation kind from canonical ops-lifecycle.
407    pub kind: OperationKind,
408    /// Terminal status from canonical ops-lifecycle.
409    pub status: OperationStatus,
410    /// Terminal outcome from canonical ops-lifecycle.
411    pub terminal_outcome: Option<OperationTerminalOutcome>,
412    /// Canonical display label from ops-lifecycle snapshot.
413    pub display_name: String,
414    /// Dispatcher-projected summary (exit code, output tail). Display only.
415    pub detail: String,
416    /// Monotonic elapsed millis from ops-lifecycle snapshot.
417    pub elapsed_ms: Option<u64>,
418}
419
420/// Dispatcher binding capabilities — what optional bindings this dispatcher supports.
421///
422/// Returned by [`AgentToolDispatcher::capabilities`]. Replaces individual
423/// `supports_*` boolean methods with a single structured query.
424#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
425pub struct DispatcherCapabilities {
426    /// Whether `bind_ops_lifecycle` is implemented.
427    pub ops_lifecycle: bool,
428}
429
430/// Result of a dispatcher binding operation.
431///
432/// Distinguishes "binding was applied" from "binding was skipped" so callers
433/// can decide whether to wire downstream side effects (e.g. bridge tasks).
434///
435/// **Semantics (decision 11 — supported/best-effort/rejected):**
436/// - `Ok(Bound(d))` = **supported** — binding succeeded, side effects should be wired
437/// - `Ok(Skipped(d))` = **best-effort** — inner shared or incompatible, dispatcher unchanged
438/// - `Err(SharedOwnership)` = **rejected** — outer wrapper is shared, caught by factory pre-check
439/// - `Err(Unsupported)` = **rejected** — type doesn't support this binding, caught by `capabilities()`
440pub enum BindOutcome {
441    /// Binding was applied. The dispatcher was rebound.
442    Bound(Arc<dyn AgentToolDispatcher>),
443    /// Binding was skipped — inner dispatcher was shared or unsupported.
444    /// The returned dispatcher is unchanged but safe to use.
445    Skipped(Arc<dyn AgentToolDispatcher>),
446}
447
448impl BindOutcome {
449    /// Extract the dispatcher, regardless of bind status.
450    pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
451        match self {
452            Self::Bound(d) | Self::Skipped(d) => d,
453        }
454    }
455
456    /// Whether the binding was actually applied.
457    pub fn was_bound(&self) -> bool {
458        matches!(self, Self::Bound(_))
459    }
460}
461
462/// Trait for tool dispatchers
463#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
464#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
465pub trait AgentToolDispatcher: Send + Sync {
466    /// Get available tool definitions
467    fn tools(&self) -> Arc<[Arc<ToolDef>]>;
468
469    /// Query exact catalog support for this dispatcher.
470    ///
471    /// Dispatchers report `exact_catalog=true` only when `tool_catalog()`
472    /// returns the exact precedence-resolved winner registry for the plane
473    /// they own. Wrappers that cannot prove exactness must leave this false.
474    fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
475        ToolCatalogCapabilities::default()
476    }
477
478    /// Return the precedence-resolved tool catalog for this dispatcher.
479    ///
480    /// The default implementation mirrors `tools()` as a visible-only inline
481    /// catalog. Callers must gate any deferred-catalog behavior on
482    /// `tool_catalog_capabilities().exact_catalog`.
483    fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
484        self.tools()
485            .iter()
486            .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
487            .collect::<Vec<_>>()
488            .into()
489    }
490
491    /// Return non-draining pending source names for exact-catalog discovery.
492    ///
493    /// Pending sources are catalog-level discovery metadata rather than
494    /// provider-visible tools. The default implementation reports none.
495    fn pending_catalog_sources(&self) -> Arc<[String]> {
496        Arc::from([])
497    }
498
499    /// Execute a tool call, returning the transcript result and any async operations.
500    ///
501    /// The `ToolDispatchOutcome` separates transcript data (`result`) from
502    /// execution metadata (`async_ops`). Most tools return no async ops;
503    /// use `ToolDispatchOutcome::from(result)` for synchronous tools.
504    async fn dispatch(
505        &self,
506        call: ToolCallView<'_>,
507    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
508
509    /// Execute a tool call with the current turn's typed dispatch context.
510    ///
511    /// Most tools do not need turn-local context and inherit the plain
512    /// `dispatch` behavior. Context-sensitive surfaces override this method
513    /// rather than reaching into session history or prompt text.
514    async fn dispatch_with_context(
515        &self,
516        call: ToolCallView<'_>,
517        _context: &ToolDispatchContext,
518    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
519        self.dispatch(call).await
520    }
521
522    /// Poll for external tool updates from background operations (e.g. async MCP loading).
523    ///
524    /// The default implementation returns an empty update. Implementations that
525    /// support background tool loading (like `McpRouterAdapter`) override this
526    /// to drain completed results and report pending servers.
527    async fn poll_external_updates(&self) -> ExternalToolUpdate {
528        ExternalToolUpdate::default()
529    }
530
531    /// Snapshot the live external tool-surface machine state, if supported.
532    ///
533    /// This is a hidden diagnostic surface for MeerkatMachine mapping work.
534    /// Dispatchers that do not own dynamic external tool mutation should
535    /// return `None`.
536    fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
537        None
538    }
539
540    /// Query which optional bindings this dispatcher supports.
541    fn capabilities(&self) -> DispatcherCapabilities {
542        DispatcherCapabilities::default()
543    }
544
545    /// Bind a session-canonical ops registry into this dispatcher.
546    ///
547    /// Dispatchers that emit session-visible `AsyncOpRef`s must route those
548    /// operation IDs into the bound registry. Under the identity-first Mob
549    /// regime the owner binding passed here is the canonical bridge session
550    /// binding, even though many compatibility surfaces still spell it
551    /// `session_id`. Default returns Unsupported.
552    fn bind_ops_lifecycle(
553        self: Arc<Self>,
554        _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
555        _owner_bridge_session_id: crate::types::SessionId,
556    ) -> Result<BindOutcome, OpsLifecycleBindError> {
557        Err(OpsLifecycleBindError::Unsupported)
558    }
559
560    /// Return the completion enrichment provider, if available.
561    ///
562    /// Dispatchers with shell job management return a provider that maps
563    /// operation IDs to display details (job ID, status detail string).
564    fn completion_enrichment(
565        &self,
566    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
567        None
568    }
569
570    /// Bind a session-scoped MCP server lifecycle handle (Phase 5G / T5g).
571    ///
572    /// Dispatchers that manage per-server MCP handshake lifecycle (like
573    /// `McpRouterAdapter`) use the handle to mirror connection state into
574    /// the session's MeerkatMachine DSL. The default implementation is a
575    /// no-op for dispatchers that have no MCP handshake to route.
576    fn bind_mcp_server_lifecycle_handle(
577        &self,
578        _handle: Arc<dyn crate::handles::McpServerLifecycleHandle>,
579    ) {
580    }
581
582    /// Bind the session-canonical external tool-surface handle.
583    ///
584    /// MCP dispatchers use this to route add/remove/reload/call lifecycle
585    /// semantics through the session's MeerkatMachine DSL instead of their
586    /// standalone compatibility projection. The default implementation is a
587    /// no-op for dispatchers that do not own dynamic external tool surfaces.
588    fn bind_external_tool_surface_handle(
589        &self,
590        _handle: Arc<dyn crate::handles::ExternalToolSurfaceHandle>,
591    ) {
592    }
593}
594
595/// Compute whether the current exact catalog should stay inline or switch to deferred mode.
596pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
597where
598    T: AgentToolDispatcher + ?Sized,
599{
600    let capabilities = dispatcher.tool_catalog_capabilities();
601    if !capabilities.exact_catalog {
602        return ToolCatalogMode::Inline;
603    }
604    let pending_sources = dispatcher.pending_catalog_sources();
605    let catalog = dispatcher.tool_catalog();
606    select_catalog_mode_from_snapshot(
607        capabilities.exact_catalog,
608        catalog.as_ref(),
609        pending_sources.as_ref(),
610    )
611}
612
613/// Compute whether the catalog control plane should be composed for this
614/// dispatcher, even if the current adaptive snapshot remains inline.
615pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
616where
617    T: AgentToolDispatcher + ?Sized,
618{
619    let capabilities = dispatcher.tool_catalog_capabilities();
620    if !capabilities.exact_catalog {
621        return false;
622    }
623    if capabilities.may_require_catalog_control_plane {
624        return true;
625    }
626
627    let pending_sources = dispatcher.pending_catalog_sources();
628    if !pending_sources.is_empty() {
629        return true;
630    }
631
632    let catalog = dispatcher.tool_catalog();
633    deferred_session_entry_count(catalog.as_ref()) > 0
634}
635
636/// Error from [`AgentToolDispatcher::bind_ops_lifecycle`].
637#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
638pub enum OpsLifecycleBindError {
639    #[error("ops lifecycle binding is unsupported")]
640    Unsupported,
641    #[error("dispatcher has shared ownership and cannot be rebound")]
642    SharedOwnership,
643}
644
645/// A tool dispatcher that filters tools based on a policy
646///
647/// Legacy tool lists are filtered once at construction time based on the
648/// allowed_tools list. Exact-catalog dispatchers keep catalog callability live.
649/// The inner dispatcher is used for actual dispatch, but only allowed tools are
650/// exposed via tools() and dispatch() returns AccessDenied for filtered tools.
651pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
652    inner: Arc<T>,
653    allowed_tools: ToolNameSet,
654    /// Pre-computed filtered tool list for non-exact dispatchers.
655    filtered_tools: Arc<[Arc<ToolDef>]>,
656}
657
658impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
659    pub fn new<I, N>(inner: Arc<T>, allowed_tools: I) -> Self
660    where
661        I: IntoIterator<Item = N>,
662        N: Into<ToolName>,
663    {
664        let allowed_set: ToolNameSet = allowed_tools
665            .into_iter()
666            .map(Into::into)
667            .collect::<ToolNameSet>();
668
669        let filtered: Vec<Arc<ToolDef>> = if inner.tool_catalog_capabilities().exact_catalog {
670            inner
671                .tool_catalog()
672                .iter()
673                .filter(|entry| entry.currently_callable())
674                .map(|entry| Arc::clone(&entry.tool))
675                .filter(|t| allowed_set.contains(t.name.as_str()))
676                .collect()
677        } else {
678            inner
679                .tools()
680                .iter()
681                .filter(|t| allowed_set.contains(t.name.as_str()))
682                .map(Arc::clone)
683                .collect()
684        };
685
686        Self {
687            inner,
688            allowed_tools: allowed_set,
689            filtered_tools: filtered.into(),
690        }
691    }
692}
693
694#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
695#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
696impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
697    fn tools(&self) -> Arc<[Arc<ToolDef>]> {
698        if self.inner.tool_catalog_capabilities().exact_catalog {
699            return self
700                .inner
701                .tool_catalog()
702                .iter()
703                .filter(|entry| entry.currently_callable())
704                .map(|entry| Arc::clone(&entry.tool))
705                .filter(|tool| self.allowed_tools.contains(tool.name.as_str()))
706                .collect::<Vec<_>>()
707                .into();
708        }
709        Arc::clone(&self.filtered_tools)
710    }
711
712    async fn dispatch(
713        &self,
714        call: ToolCallView<'_>,
715    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
716        self.dispatch_with_context(call, &ToolDispatchContext::default())
717            .await
718    }
719
720    async fn dispatch_with_context(
721        &self,
722        call: ToolCallView<'_>,
723        context: &ToolDispatchContext,
724    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
725        if !self.allowed_tools.contains(call.name) {
726            let inner_knows_tool = if self.inner.tool_catalog_capabilities().exact_catalog {
727                self.inner
728                    .tool_catalog()
729                    .iter()
730                    .any(|entry| entry.tool.name == call.name)
731            } else {
732                self.inner.tools().iter().any(|tool| tool.name == call.name)
733            };
734            if !inner_knows_tool {
735                return Err(crate::error::ToolError::not_found(call.name));
736            }
737            return Err(crate::error::ToolError::access_denied(call.name));
738        }
739        self.inner.dispatch_with_context(call, context).await
740    }
741
742    fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
743        self.inner.tool_catalog_capabilities()
744    }
745
746    fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
747        if !self.inner.tool_catalog_capabilities().exact_catalog {
748            return self
749                .tools()
750                .iter()
751                .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
752                .collect::<Vec<_>>()
753                .into();
754        }
755        self.inner
756            .tool_catalog()
757            .iter()
758            .filter(|entry| self.allowed_tools.contains(entry.tool.name.as_str()))
759            .cloned()
760            .collect::<Vec<_>>()
761            .into()
762    }
763
764    fn pending_catalog_sources(&self) -> Arc<[String]> {
765        self.inner.pending_catalog_sources()
766    }
767
768    async fn poll_external_updates(&self) -> ExternalToolUpdate {
769        self.inner.poll_external_updates().await
770    }
771
772    fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
773        self.inner.external_tool_surface_snapshot()
774    }
775
776    fn capabilities(&self) -> DispatcherCapabilities {
777        self.inner.capabilities()
778    }
779
780    fn bind_ops_lifecycle(
781        self: Arc<Self>,
782        registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
783        owner_bridge_session_id: crate::types::SessionId,
784    ) -> Result<BindOutcome, OpsLifecycleBindError> {
785        let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
786        if Arc::strong_count(&owned.inner) == 1 {
787            let outcome = owned
788                .inner
789                .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
790            let bound = outcome.was_bound();
791            let d = outcome.into_dispatcher();
792            let allowed_tools = owned.allowed_tools.into_iter().collect::<Vec<_>>();
793            Ok(if bound {
794                BindOutcome::Bound(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
795            } else {
796                BindOutcome::Skipped(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
797            })
798        } else {
799            Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
800                inner: owned.inner,
801                allowed_tools: owned.allowed_tools,
802                filtered_tools: owned.filtered_tools,
803            })))
804        }
805    }
806
807    fn completion_enrichment(
808        &self,
809    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
810        self.inner.completion_enrichment()
811    }
812}
813
814/// Trait for session stores
815#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
816#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
817pub trait AgentSessionStore: Send + Sync {
818    async fn save(&self, session: &Session) -> Result<(), AgentError>;
819    async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
820}
821
822/// Runtime policy for inlining peer lifecycle updates into session context.
823#[derive(Debug, Clone, Copy, PartialEq, Eq)]
824pub enum InlinePeerNotificationPolicy {
825    /// Always inline batched peer lifecycle updates.
826    Always,
827    /// Never inline batched peer lifecycle updates.
828    Never,
829    /// Inline only when post-drain peer count is at or below this threshold.
830    AtMost(usize),
831}
832
833/// Default inline threshold when no explicit value is configured.
834pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
835
836impl InlinePeerNotificationPolicy {
837    /// Resolve policy from transport/build-layer config representation.
838    pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
839        match raw {
840            None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
841            Some(-1) => Ok(Self::Always),
842            Some(0) => Ok(Self::Never),
843            Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
844            Some(v) => Err(v),
845        }
846    }
847}
848
849/// Error returned when a comms runtime capability is not available.
850#[derive(Debug, thiserror::Error)]
851pub enum CommsCapabilityError {
852    /// The runtime does not support this capability.
853    #[error("comms capability not supported: {0}")]
854    Unsupported(String),
855}
856
857/// Trait for comms runtime that can be used with the agent
858#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
859#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
860pub trait CommsRuntime: Send + Sync {
861    /// Canonical runtime routing identity for this peer, if available.
862    ///
863    /// `PeerId` is the UUID-shaped routing key used by peer directories and
864    /// trust stores. Implementations that only have the legacy string carrier
865    /// may return a parsed UUID-shaped `public_key()` value; implementations
866    /// with Ed25519 public keys should override this and return the pubkey-
867    /// derived canonical [`PeerId`].
868    fn peer_id(&self) -> Option<PeerId> {
869        self.public_key()
870            .as_deref()
871            .and_then(|public_key| PeerId::parse(public_key).ok())
872    }
873
874    /// Runtime-local transport/auth public key, if available.
875    ///
876    /// Returns an Ed25519 public key string in `ed25519:<base64>` format.
877    /// This is not the canonical routing [`PeerId`]; use [`Self::peer_id`]
878    /// for roster/projection identity and peer-directory lookups.
879    fn public_key(&self) -> Option<String> {
880        None
881    }
882
883    /// Runtime-local Ed25519 public key bytes, if available.
884    ///
885    /// This is the typed form of [`Self::public_key`]. Trust installation
886    /// paths that need to verify `PeerId`/pubkey consistency should prefer
887    /// this method over reparsing the string carrier.
888    fn public_key_bytes(&self) -> Option<[u8; 32]> {
889        None
890    }
891
892    /// Runtime-local canonical comms routing name, if available.
893    ///
894    /// This is the peer name used in trusted-peer descriptors and peer
895    /// directories. It is separate from the advertised transport address so
896    /// callers do not recover identity by parsing transport strings.
897    fn comms_name(&self) -> Option<String> {
898        None
899    }
900
901    /// Runtime-local advertised comms address, if available.
902    ///
903    /// This is the canonical address the runtime expects peers to use when
904    /// constructing a [`TrustedPeerDescriptor`]. Implementations that do not
905    /// expose a stable advertised address can return `None`.
906    fn advertised_address(&self) -> Option<String> {
907        None
908    }
909
910    /// Runtime-local bootstrap proof for the initial supervisor bind, if
911    /// available.
912    fn bridge_bootstrap_token(&self) -> Option<String> {
913        None
914    }
915
916    /// Apply a comms trust projection mutation authorized by generated
917    /// machine/composition authority.
918    ///
919    /// This is the only mutable trust-store seam.
920    async fn apply_trust_mutation(
921        &self,
922        _mutation: CommsTrustMutation,
923    ) -> Result<CommsTrustMutationResult, SendError> {
924        Err(SendError::Unsupported(
925            "apply_trust_mutation not supported for this CommsRuntime".to_string(),
926        ))
927    }
928
929    /// Bind this target runtime to the generated MobMachine owner token whose
930    /// trust handoffs may mutate mob-owned trust rows.
931    ///
932    /// Mob runtimes call this before submitting a generated mob trust mutation.
933    /// Implementations must fail closed when they cannot remember and compare
934    /// the owner token during [`Self::apply_trust_mutation`].
935    async fn install_generated_mob_trust_owner(
936        &self,
937        _owner: Arc<dyn std::any::Any + Send + Sync>,
938    ) -> Result<(), SendError> {
939        Err(SendError::Unsupported(
940            "generated mob trust owner binding not supported for this CommsRuntime".to_string(),
941        ))
942    }
943
944    /// Read-only preflight for binding this target runtime to a recovered
945    /// MobMachine owner token.
946    ///
947    /// Resume uses this to validate every generated trust repair target before
948    /// mutating any trust projection row. Implementations must not change the
949    /// stored owner token here; [`Self::install_recovered_generated_mob_trust_owner`]
950    /// performs the actual binding after the full batch has passed preflight.
951    async fn validate_recovered_generated_mob_trust_owner(
952        &self,
953        _owner: Arc<dyn std::any::Any + Send + Sync>,
954    ) -> Result<(), SendError> {
955        Err(SendError::Unsupported(
956            "recovered generated mob trust owner validation not supported for this CommsRuntime"
957                .to_string(),
958        ))
959    }
960
961    /// Rebind this target runtime to the owner token of a recovered
962    /// MobMachine authority.
963    ///
964    /// Recovery reconstructs generated authority from persisted machine state,
965    /// which gives it a fresh process-local owner token. Implementations may
966    /// bind this owner only when no generated MobMachine owner is already
967    /// installed, or when it is the same owner token. They must fail closed
968    /// rather than replacing a different live owner through recovery plumbing.
969    async fn install_recovered_generated_mob_trust_owner(
970        &self,
971        _owner: Arc<dyn std::any::Any + Send + Sync>,
972    ) -> Result<(), SendError> {
973        Err(SendError::Unsupported(
974            "recovered generated mob trust owner binding not supported for this CommsRuntime"
975                .to_string(),
976        ))
977    }
978
979    /// Register a peer for admission-only trust without listing it in the
980    /// directory.
981    ///
982    /// Used for control-plane edges — the canonical case is the supervisor
983    /// bridge for session-backed mob members: lifecycle notifications
984    /// (`mob.peer_added`, `mob.peer_retired`, …) must land at the member's
985    /// inbox, but the supervisor must not appear as an ordinary sendable
986    /// peer in `comms.peers` / REST / RPC / MCP. The admission gate consults
987    /// both the public and private trust sets; `resolve_peer_directory()`
988    /// consults only the public set.
989    async fn add_private_trusted_peer(
990        &self,
991        _peer: TrustedPeerDescriptor,
992    ) -> Result<(), SendError> {
993        Err(SendError::Unsupported(
994            "generated comms private trust mutation authority required".to_string(),
995        ))
996    }
997
998    /// Remove a previously registered private-trust edge by peer ID.
999    ///
1000    /// Returns `true` if the edge was present and removed, `false` if it
1001    /// was not.
1002    async fn remove_private_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
1003        Err(SendError::Unsupported(
1004            "generated comms private trust mutation authority required".to_string(),
1005        ))
1006    }
1007
1008    /// Dispatch a canonical comms command.
1009    async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
1010        Err(SendError::Unsupported(
1011            "send not implemented for this CommsRuntime".to_string(),
1012        ))
1013    }
1014
1015    #[doc(hidden)]
1016    fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
1017        let scope_desc = match scope {
1018            StreamScope::Session(session_id) => format!("session {session_id}"),
1019            StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
1020        };
1021        Err(StreamError::NotFound(scope_desc))
1022    }
1023
1024    /// List peers visible to this runtime.
1025    async fn peers(&self) -> Vec<PeerDirectoryEntry> {
1026        Vec::new()
1027    }
1028
1029    /// Count peers visible to this runtime.
1030    ///
1031    /// Implementations can override this to avoid materializing a full peer list.
1032    async fn peer_count(&self) -> usize {
1033        self.peers().await.len()
1034    }
1035
1036    #[doc(hidden)]
1037    async fn send_and_stream(
1038        &self,
1039        cmd: CommsCommand,
1040    ) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
1041        let receipt = self.send(cmd).await?;
1042        Err(SendAndStreamError::StreamAttach {
1043            receipt,
1044            error: StreamError::Internal(
1045                "send_and_stream is not implemented for this runtime".to_string(),
1046            ),
1047        })
1048    }
1049
1050    /// Drain comms inbox and return messages formatted for the LLM
1051    async fn drain_messages(&self) -> Vec<String>;
1052    /// Get a notification when new messages arrive
1053    fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
1054    /// Returns true if a DISMISS signal was seen during the last `drain_messages` call.
1055    fn dismiss_received(&self) -> bool {
1056        false
1057    }
1058    /// Get an event injector for this runtime's inbox.
1059    ///
1060    /// Surfaces use this to push external events into the agent inbox.
1061    /// Returns `None` if the implementation doesn't support event injection.
1062    fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
1063        None
1064    }
1065
1066    /// Internal runtime seam for interaction-scoped streaming.
1067    #[doc(hidden)]
1068    fn interaction_event_injector(
1069        &self,
1070    ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
1071        None
1072    }
1073
1074    /// Drain comms inbox and return structured interactions.
1075    ///
1076    /// Default implementation wraps `drain_messages()` results as `InteractionContent::Message`
1077    /// with generated IDs.
1078    async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
1079        self.drain_messages()
1080            .await
1081            .into_iter()
1082            .map(|text| crate::interaction::InboxInteraction {
1083                id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
1084                from_route: None,
1085                from: "unknown".into(),
1086                content: crate::interaction::InteractionContent::Message {
1087                    body: text.clone(),
1088                    blocks: None,
1089                },
1090                rendered_text: text,
1091                handling_mode: crate::types::HandlingMode::Queue,
1092                render_metadata: None,
1093            })
1094            .collect()
1095    }
1096
1097    /// Look up and remove a one-shot subscriber for the given interaction.
1098    ///
1099    /// Returns the event sender if a subscriber was registered (via `inject_with_subscription`).
1100    /// The entry is removed from the registry on lookup (one-shot).
1101    fn interaction_subscriber(
1102        &self,
1103        _id: &crate::interaction::InteractionId,
1104    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
1105        None
1106    }
1107
1108    /// Take and clear the one-shot sender for an interaction-scoped stream.
1109    fn take_interaction_stream_sender(
1110        &self,
1111        _id: &crate::interaction::InteractionId,
1112    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
1113        self.interaction_subscriber(_id)
1114    }
1115
1116    /// Signal that an interaction has reached a terminal state (complete or failed).
1117    ///
1118    /// Implementations should transition the reservation FSM to `Completed` and
1119    /// clean up registry entries. Called from the keep-alive loop after sending
1120    /// terminal events to the tap.
1121    fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
1122
1123    /// Access the session's peer-interaction DSL handle (W1-A).
1124    ///
1125    /// Returns `None` for transport-only comms runtimes. A runtime that emits
1126    /// semantic peer request/response receipts must return `Some` after the
1127    /// surface installs machine authority.
1128    fn peer_interaction_handle(
1129        &self,
1130    ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
1131        None
1132    }
1133
1134    /// Access peer request/response authority only when the runtime has the
1135    /// complete machine-owned lifecycle pair.
1136    ///
1137    /// Semantic peer request/response ingress requires both the peer
1138    /// interaction handle and the paired interaction-stream handle. The stream
1139    /// handle itself stays hidden behind runtime ownership; this witness lets
1140    /// authority boundaries fail closed instead of treating a lone peer handle
1141    /// as sufficient.
1142    fn peer_request_response_authority_handle(
1143        &self,
1144    ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
1145        None
1146    }
1147
1148    /// Drain classified inbox interactions.
1149    ///
1150    /// Returns interactions with pre-computed classification from ingress.
1151    /// The host loop routes on the stored `PeerInputClass` instead of
1152    /// re-classifying after drain.
1153    ///
1154    /// Default returns `Unsupported`. Comms-enabled runtimes must override.
1155    async fn drain_classified_inbox_interactions(
1156        &self,
1157    ) -> Result<Vec<crate::interaction::ClassifiedInboxInteraction>, CommsCapabilityError> {
1158        Err(CommsCapabilityError::Unsupported(
1159            "drain_classified_inbox_interactions".to_string(),
1160        ))
1161    }
1162
1163    /// Drain canonical peer/event ingress candidates.
1164    ///
1165    /// This remains the live runtime drain bridge for call sites that consume
1166    /// the `PeerInputCandidate` noun directly. The underlying drain unit is
1167    /// identical to `ClassifiedInboxInteraction`, so the default
1168    /// implementation simply forwards the classified drain path.
1169    async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
1170        self.drain_classified_inbox_interactions()
1171            .await
1172            .unwrap_or_default()
1173    }
1174
1175    /// Snapshot the currently queued peer-ingress surface without draining it.
1176    ///
1177    /// This is a hidden diagnostic capability used while mapping the internal
1178    /// MeerkatMachine boundary onto existing comms ownership.
1179    async fn peer_ingress_queue_snapshot(
1180        &self,
1181    ) -> Result<crate::interaction::PeerIngressQueueSnapshot, CommsCapabilityError> {
1182        Err(CommsCapabilityError::Unsupported(
1183            "peer_ingress_queue_snapshot".to_string(),
1184        ))
1185    }
1186
1187    /// Snapshot the current peer runtime surface for MeerkatMachine mapping.
1188    ///
1189    /// This extends the queued ingress snapshot with the local trust membership
1190    /// that governs peer admission.
1191    async fn peer_ingress_runtime_snapshot(
1192        &self,
1193    ) -> Result<crate::interaction::PeerIngressRuntimeSnapshot, CommsCapabilityError> {
1194        Err(CommsCapabilityError::Unsupported(
1195            "peer_ingress_runtime_snapshot".to_string(),
1196        ))
1197    }
1198
1199    /// Snapshot only the public trust projection owned by generated public
1200    /// peer authority.
1201    ///
1202    /// Private/control-plane trust edges are admitted by separate generated
1203    /// private authority and must not be reconciled or removed by public peer
1204    /// projection owners.
1205    async fn public_trusted_peer_projection_snapshot(
1206        &self,
1207    ) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
1208        Err(CommsCapabilityError::Unsupported(
1209            "public_trusted_peer_projection_snapshot".to_string(),
1210        ))
1211    }
1212
1213    /// Snapshot the public trust projection owned by one generated source.
1214    ///
1215    /// This is the behavior-authority read used by generated trust
1216    /// reconciliation. Compatibility/public snapshots may still union public
1217    /// rows for display, but generated removals must diff only against rows
1218    /// previously installed by the same generated owner.
1219    async fn trusted_peer_projection_snapshot_for_source(
1220        &self,
1221        _source_kind: crate::comms::GeneratedCommsTrustAuthoritySourceKind,
1222    ) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
1223        Err(CommsCapabilityError::Unsupported(
1224            "trusted_peer_projection_snapshot_for_source".to_string(),
1225        ))
1226    }
1227
1228    /// Get a notification that fires only for actionable peer input.
1229    ///
1230    /// Default returns `Unsupported`. Comms-enabled runtimes must override.
1231    /// Used by the factory to bridge into `WaitTool` interrupt.
1232    fn actionable_input_notify(&self) -> Result<Arc<tokio::sync::Notify>, CommsCapabilityError> {
1233        Err(CommsCapabilityError::Unsupported(
1234            "actionable_input_notify".to_string(),
1235        ))
1236    }
1237}
1238
1239/// The main Agent struct
1240pub struct Agent<C, T, S>
1241where
1242    C: AgentLlmClient + ?Sized,
1243    T: AgentToolDispatcher + ?Sized,
1244    S: AgentSessionStore + ?Sized,
1245{
1246    config: AgentConfig,
1247    client: Arc<C>,
1248    tools: Arc<T>,
1249    tool_scope: ToolScope,
1250    store: Arc<S>,
1251    session: Session,
1252    budget: Budget,
1253    retry_policy: RetryPolicy,
1254    depth: u32,
1255    pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
1256    pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
1257    pub(super) hook_run_overrides: HookRunOverrides,
1258    /// Optional context compaction strategy.
1259    pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
1260    /// Input tokens from the last LLM response (for compaction trigger).
1261    pub(crate) last_input_tokens: u64,
1262    /// Session-scoped compaction cadence tracked across runs.
1263    pub(crate) compaction_cadence: SessionCompactionCadence,
1264    /// Optional memory store for indexing compaction discards.
1265    pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
1266    /// Optional skill engine for per-turn `/skill-ref` activation.
1267    pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
1268    /// Skill references to resolve and inject for the next turn.
1269    /// Set by surfaces before calling `run()`, consumed on run start.
1270    pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
1271    /// Per-interaction event tap for streaming events to subscribers.
1272    pub(crate) event_tap: crate::event_tap::EventTap,
1273    /// Shared control state for runtime system-context appends.
1274    pub(crate) system_context_state:
1275        Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
1276    /// Optional default event channel configured at build time.
1277    /// Used by run methods when no per-call event channel is provided.
1278    pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
1279    /// Optional session checkpointer for keep-alive persistence.
1280    ///
1281    /// Wired by `AgentBuilder::with_checkpointer`, installed by
1282    /// `PersistentSessionService`, and consumed by
1283    /// `Agent::checkpoint_current_session`.
1284    pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
1285    /// Optional blob store used to hydrate image refs at execution seams.
1286    pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
1287    /// Original error detail preserved from `terminalize_fatal_error` so
1288    /// `build_result` can include the actual failure message (e.g. the API
1289    /// error body) instead of only the generic terminal-cause description.
1290    pub(crate) terminal_error_detail: Option<String>,
1291    /// True once the current run has accepted `RunCompleted` hooks.
1292    pub(crate) run_completed_hooks_applied: bool,
1293    /// True once the current run's public `RunCompleted` event has been
1294    /// emitted. Extraction may continue afterward as a separate post-run phase.
1295    pub(crate) run_completed_event_emitted: bool,
1296    /// Comms intents that should be silently injected into the session
1297    /// without triggering an LLM turn. Matched against `InteractionContent::Request.intent`.
1298    #[allow(dead_code)] // Used by comms_impl when comms feature is enabled
1299    pub(crate) silent_comms_intents: Vec<String>,
1300    /// Optional shared lifecycle registry for async operations.
1301    pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
1302    /// Optional completion feed for cursor-based completion delivery.
1303    pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
1304    /// Shared epoch cursor state for runtime-backed cursor writeback.
1305    pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
1306    /// Local cursor into the completion feed — only the agent boundary advances this.
1307    pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
1308    /// Optional enrichment provider for completion display details.
1309    pub(crate) completion_enrichment:
1310        Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
1311    /// Shared effective mob authority handle. Owned by the agent, passed to
1312    /// mob tools at construction for authorization reads. Updated by
1313    /// `apply_session_effects` after each tool batch as a derived projection
1314    /// of the canonical `session.build_state().mob_tool_authority_context`.
1315    pub(crate) mob_authority_handle:
1316        Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
1317    /// Runtime-backed turn-state handle, provided by the session runtime bindings.
1318    pub(crate) turn_state_handle: Option<Arc<dyn crate::TurnStateHandle>>,
1319    /// True when the runtime control plane must stamp execution kind metadata.
1320    pub(crate) runtime_execution_kind_required: bool,
1321    /// Typed execution intent for the current run, when this turn is owned by
1322    /// the runtime control plane rather than a direct surface call.
1323    pub(crate) runtime_execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
1324    /// Runtime-backed external tool-surface diagnostic handle, when provided
1325    /// by the session runtime bindings.
1326    pub(crate) external_tool_surface_handle: Option<Arc<dyn crate::ExternalToolSurfaceHandle>>,
1327    /// Runtime-backed auth lease handle (Phase 1.5-rev).
1328    pub(crate) auth_lease_handle: Option<crate::handles::GeneratedAuthLeaseHandle>,
1329    /// Runtime-backed MCP server lifecycle handle (Phase 5G / T5g). When set,
1330    /// the agent loop reads `pending_server_ids()` at each CallingLlm boundary
1331    /// to decide whether to emit the `[MCP_PENDING]` system notice.
1332    pub(crate) mcp_server_lifecycle_handle:
1333        Option<Arc<dyn crate::handles::McpServerLifecycleHandle>>,
1334    /// Producer end of the typed cancel-after-boundary command channel.
1335    ///
1336    /// Retained so [`Agent::cancel_after_boundary_handle`] can hand cloned
1337    /// senders to the surface that requests boundary-only cancellation. The
1338    /// agent never sends on this end itself; it only drains the matching
1339    /// receiver at turn boundaries.
1340    pub(crate) cancel_after_boundary_tx: CancelAfterBoundarySender,
1341    /// Consumer end of the typed cancel-after-boundary command channel.
1342    ///
1343    /// Drained (non-blocking) at each turn boundary by
1344    /// `observe_cancel_after_boundary_request`, replacing the previous
1345    /// `.swap`-polled `AtomicBool`. A delivered [`CancelAfterBoundaryCommand`]
1346    /// is observed at most once per boundary, mirroring the prior edge
1347    /// semantics.
1348    pub(crate) cancel_after_boundary_rx:
1349        tokio::sync::mpsc::UnboundedReceiver<CancelAfterBoundaryCommand>,
1350    /// Optional resolver for model-specific operational defaults (e.g., call timeout).
1351    /// Consulted at each LLM call for hot-swap-aware profile default resolution.
1352    pub(crate) model_defaults_resolver:
1353        Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
1354    /// Explicit call-timeout override from the build/config composition seam.
1355    /// Takes precedence over profile-derived defaults.
1356    pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
1357    /// Structured-output extraction state carried into RunResult.
1358    pub(crate) extraction_state: extraction::ExtractionState,
1359    /// Last published hidden deferred-catalog names.
1360    pub(crate) last_hidden_deferred_catalog_names: BTreeSet<crate::types::ToolName>,
1361    /// Last published pending catalog sources.
1362    pub(crate) last_pending_catalog_sources: BTreeSet<String>,
1363    /// Dispatch-time projection of the current turn input for contextual tools.
1364    pub(crate) tool_dispatch_context: ToolDispatchContext,
1365    /// Runtime-owned dispatch metadata for this turn.
1366    pub(crate) turn_tool_dispatch_metadata: BTreeMap<String, serde_json::Value>,
1367    /// Typed tool-execution policy (per-call timeouts + concurrency bound)
1368    /// applied to the normal LLM-driven tool dispatch loop. Populated by the
1369    /// composition seam via `AgentBuilder::with_tools_config`; defaults to
1370    /// `ToolsConfig::default()` for standalone/test construction.
1371    pub(crate) tools_config: crate::config::ToolsConfig,
1372}
1373
1374#[cfg(test)]
1375#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1376mod tests {
1377    use super::{
1378        AgentToolDispatcher, CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS,
1379        FilteredToolDispatcher, InlinePeerNotificationPolicy, ToolDispatchContext,
1380    };
1381    use crate::comms::{
1382        PeerAddress, PeerId, PeerName, PeerTransport, SendError, TrustedPeerDescriptor,
1383    };
1384    use crate::types::{ContentBlock, ContentInput, ToolCallView, ToolDef, ToolResult};
1385    use async_trait::async_trait;
1386    use serde_json::json;
1387    use std::sync::Arc;
1388    use tokio::sync::Notify;
1389
1390    struct NoopCommsRuntime {
1391        notify: Arc<Notify>,
1392    }
1393
1394    struct ContextAwareToolDispatcher;
1395
1396    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1397    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1398    impl AgentToolDispatcher for ContextAwareToolDispatcher {
1399        fn tools(&self) -> Arc<[Arc<ToolDef>]> {
1400            Arc::from([Arc::new(ToolDef {
1401                name: "inspect_context".into(),
1402                description: "inspect context".to_string(),
1403                input_schema: json!({"type": "object"}),
1404                provenance: None,
1405            })])
1406        }
1407
1408        async fn dispatch(
1409            &self,
1410            call: ToolCallView<'_>,
1411        ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1412            Ok(ToolResult::new(
1413                call.id.to_string(),
1414                json!({"saw_context_image": false}).to_string(),
1415                false,
1416            )
1417            .into())
1418        }
1419
1420        async fn dispatch_with_context(
1421            &self,
1422            call: ToolCallView<'_>,
1423            context: &ToolDispatchContext,
1424        ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1425            let saw_context_image = context
1426                .current_turn()
1427                .and_then(|turn| turn.image_ref(0))
1428                .and_then(|image_ref| context.current_turn_image(image_ref))
1429                .is_some();
1430            Ok(ToolResult::new(
1431                call.id.to_string(),
1432                json!({"saw_context_image": saw_context_image}).to_string(),
1433                false,
1434            )
1435            .into())
1436        }
1437    }
1438
1439    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1440    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1441    impl CommsRuntime for NoopCommsRuntime {
1442        async fn drain_messages(&self) -> Vec<String> {
1443            Vec::new()
1444        }
1445
1446        fn inbox_notify(&self) -> std::sync::Arc<Notify> {
1447            self.notify.clone()
1448        }
1449    }
1450
1451    #[tokio::test]
1452    async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
1453        let runtime = NoopCommsRuntime {
1454            notify: Arc::new(Notify::new()),
1455        };
1456        assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
1457        // The only mutable trust seam is apply_trust_mutation; without a
1458        // generated handoff it fails closed.
1459        let peer = TrustedPeerDescriptor {
1460            peer_id: PeerId::new(),
1461            name: PeerName::new("peer-a").expect("valid peer name"),
1462            address: PeerAddress::new(PeerTransport::Inproc, "peer-a"),
1463            pubkey: [0u8; 32],
1464        };
1465        let result =
1466            <NoopCommsRuntime as CommsRuntime>::add_private_trusted_peer(&runtime, peer).await;
1467        assert!(matches!(result, Err(SendError::Unsupported(_))));
1468    }
1469
1470    #[tokio::test]
1471    async fn filtered_tool_dispatcher_preserves_dispatch_context() {
1472        let dispatcher =
1473            FilteredToolDispatcher::new(Arc::new(ContextAwareToolDispatcher), ["inspect_context"]);
1474        let args = serde_json::value::RawValue::from_string("{}".to_string())
1475            .expect("empty object should be valid JSON");
1476        let call = ToolCallView {
1477            id: "ctx-1",
1478            name: "inspect_context",
1479            args: &args,
1480        };
1481        let context = ToolDispatchContext::from_current_turn_input(&ContentInput::Blocks(vec![
1482            ContentBlock::Image {
1483                media_type: "image/png".to_string(),
1484                data: "abc".into(),
1485            },
1486        ]));
1487
1488        let outcome = dispatcher
1489            .dispatch_with_context(call, &context)
1490            .await
1491            .expect("filtered wrapper should dispatch");
1492        let payload: serde_json::Value =
1493            serde_json::from_str(&outcome.result.text_content()).expect("tool result JSON");
1494        assert_eq!(payload["saw_context_image"], true);
1495    }
1496
1497    #[test]
1498    fn test_inline_peer_notification_policy_from_raw() {
1499        assert_eq!(
1500            InlinePeerNotificationPolicy::try_from_raw(None),
1501            Ok(InlinePeerNotificationPolicy::AtMost(
1502                DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
1503            ))
1504        );
1505        assert_eq!(
1506            InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
1507            Ok(InlinePeerNotificationPolicy::Always)
1508        );
1509        assert_eq!(
1510            InlinePeerNotificationPolicy::try_from_raw(Some(0)),
1511            Ok(InlinePeerNotificationPolicy::Never)
1512        );
1513        assert_eq!(
1514            InlinePeerNotificationPolicy::try_from_raw(Some(25)),
1515            Ok(InlinePeerNotificationPolicy::AtMost(25))
1516        );
1517        assert_eq!(
1518            InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
1519            Err(-42)
1520        );
1521    }
1522
1523    /// UNIT-002: DetachedOpCompletion serializes without operation_id.
1524    /// The app-facing control noun is job_id (CONTRACT-003).
1525    #[test]
1526    fn unit_002_detached_op_completion_has_no_operation_id() {
1527        use crate::agent::DetachedOpCompletion;
1528        use crate::ops_lifecycle::{OperationKind, OperationStatus};
1529
1530        let completion = DetachedOpCompletion {
1531            job_id: "j_test".into(),
1532            kind: OperationKind::BackgroundToolOp,
1533            status: OperationStatus::Completed,
1534            terminal_outcome: None,
1535            display_name: "test cmd".into(),
1536            detail: "ok".into(),
1537            elapsed_ms: None,
1538        };
1539        #[allow(clippy::unwrap_used)]
1540        let json = serde_json::to_value(&completion).unwrap();
1541        assert!(
1542            json.get("operation_id").is_none(),
1543            "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
1544        );
1545        assert!(
1546            json.get("job_id").is_some(),
1547            "job_id must be the app-facing control noun"
1548        );
1549    }
1550}