Skip to main content

meerkat_core/
agent.rs

1//! Agent - the core agent orchestrator
2//!
3//! The Agent struct ties together all components and runs the agent loop.
4
5mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10#[cfg(test)]
11mod hooks_behavior_tests;
12mod runner;
13pub mod skills;
14mod state;
15#[doc(hidden)]
16pub mod test_turn_state_handle;
17use crate::budget::Budget;
18use crate::comms::{
19    CommsCommand, EventStream, PeerDirectoryEntry, PeerId, SendAndStreamError, SendError,
20    SendReceipt, StreamError, StreamScope, TrustedPeerDescriptor,
21};
22use crate::compact::SessionCompactionCadence;
23use crate::completion_feed::CompletionSeq;
24use crate::config::{AgentConfig, HookRunOverrides};
25use crate::error::AgentError;
26use crate::event::ExternalToolDelta;
27use crate::hooks::HookEngine;
28use crate::lifecycle::RunId;
29use crate::lifecycle::run_primitive::ProviderParamsOverride;
30use crate::ops::OperationId;
31use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
32use crate::retry::RetryPolicy;
33use crate::schema::{CompiledSchema, SchemaError};
34use crate::session::Session;
35use crate::state::LoopState;
36#[cfg(target_arch = "wasm32")]
37use crate::tokio;
38use crate::tool_catalog::{
39    ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
40    select_catalog_mode_from_snapshot,
41};
42use crate::tool_scope::ToolScope;
43use crate::turn_execution_authority::{
44    ContentShape, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
45};
46use crate::types::{
47    AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
48    ToolDef, ToolName, ToolNameSet, Usage,
49};
50use async_trait::async_trait;
51use serde::{Deserialize, Serialize};
52use std::collections::BTreeSet;
53use std::sync::Arc;
54
55pub use builder::{AgentBuildPolicyError, AgentBuilder};
56pub use runner::AgentRunner;
57
58/// Trait for LLM clients that can be used with the agent
59#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
61pub trait AgentLlmClient: Send + Sync {
62    /// Stream a response from the LLM
63    async fn stream_response(
64        &self,
65        messages: &[Message],
66        tools: &[Arc<ToolDef>],
67        max_tokens: u32,
68        temperature: Option<f32>,
69        provider_params: Option<&ProviderParamsOverride>,
70    ) -> Result<LlmStreamResult, AgentError>;
71
72    /// Get the provider name
73    fn provider(&self) -> &'static str;
74
75    /// Get the current effective model identifier.
76    ///
77    /// Used by the agent loop for profile-default resolution (e.g., call timeout
78    /// defaults that vary per model family). Must reflect the current model even
79    /// after hot-swap.
80    fn model(&self) -> &str;
81
82    /// Compile an output schema for this provider.
83    ///
84    /// Default implementation normalizes the schema without provider-specific lowering.
85    /// Adapters override this to apply provider-specific transformations (e.g.,
86    /// Anthropic adds `additionalProperties: false`, Gemini strips unsupported keywords).
87    fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
88        // Default passthrough: normalized clone, no provider-specific lowering
89        Ok(CompiledSchema {
90            schema: output_schema.schema.as_value().clone(),
91            warnings: Vec::new(),
92        })
93    }
94}
95
96/// Hook for wrapping the final agent-facing LLM client.
97///
98/// Factories and runtimes apply this after provider/raw-client adaptation so
99/// embedders can compose cross-cutting behavior without provider-specific
100/// registry hooks.
101pub type AgentLlmClientDecorator =
102    Arc<dyn Fn(Arc<dyn AgentLlmClient>) -> Arc<dyn AgentLlmClient> + Send + Sync + 'static>;
103
104/// Result of streaming from the LLM
105pub struct LlmStreamResult {
106    blocks: Vec<AssistantBlock>,
107    stop_reason: StopReason,
108    usage: Usage,
109}
110
111impl LlmStreamResult {
112    pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
113        Self {
114            blocks,
115            stop_reason,
116            usage,
117        }
118    }
119
120    pub fn blocks(&self) -> &[AssistantBlock] {
121        &self.blocks
122    }
123    pub fn stop_reason(&self) -> StopReason {
124        self.stop_reason
125    }
126    pub fn usage(&self) -> &Usage {
127        &self.usage
128    }
129
130    pub fn into_message(self) -> BlockAssistantMessage {
131        BlockAssistantMessage::new(self.blocks, self.stop_reason)
132    }
133
134    pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
135        (self.blocks, self.stop_reason, self.usage)
136    }
137}
138
139/// Snapshot of the core agent's live execution state.
140///
141/// When a runtime-backed turn-state handle is attached, this snapshots the
142/// runtime-owned turn machine; otherwise it falls back to the in-process
143/// standalone turn state used by core-only execution.
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct AgentExecutionSnapshot {
146    pub loop_state: LoopState,
147    pub turn_phase: TurnPhase,
148    pub active_run_id: Option<RunId>,
149    pub primitive_kind: TurnPrimitiveKind,
150    pub admitted_content_shape: Option<ContentShape>,
151    pub vision_enabled: bool,
152    pub image_tool_results_enabled: bool,
153    pub tool_calls_pending: u32,
154    pub pending_operation_ids: Option<Vec<OperationId>>,
155    pub barrier_operation_ids: Vec<OperationId>,
156    pub has_barrier_ops: bool,
157    pub barrier_satisfied: bool,
158    pub boundary_count: u32,
159    pub cancel_after_boundary: bool,
160    pub terminal_outcome: TurnTerminalOutcome,
161    pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
162    pub extraction_attempts: u32,
163    pub max_extraction_retries: u32,
164    pub applied_cursor: CompletionSeq,
165}
166
167/// Result of polling for external tool updates.
168///
169/// Returned by [`AgentToolDispatcher::poll_external_updates`].
170#[derive(Debug, Clone, Default)]
171pub struct ExternalToolUpdate {
172    /// Notices about completed background operations since last poll.
173    pub notices: Vec<ExternalToolDelta>,
174    /// Names of servers still connecting in the background.
175    pub pending: Vec<String>,
176    /// Detached background operation completions since last poll.
177    pub background_completions: Vec<DetachedOpCompletion>,
178}
179
180/// Typed context supplied by the agent loop when dispatching a tool call.
181///
182/// This is a dispatch-time projection of the already-admitted turn input. It
183/// lets tool surfaces resolve typed turn-scoped references, such as a
184/// `source=current_turn, index=0` image ref, without writing surface-local
185/// metadata into canonical transcript history.
186#[derive(Debug, Clone, Default, PartialEq, Eq)]
187pub struct ToolDispatchContext {
188    current_turn: Option<CurrentTurnContent>,
189}
190
191impl ToolDispatchContext {
192    pub fn from_current_turn_input(input: &crate::types::ContentInput) -> Self {
193        let blocks = match input {
194            crate::types::ContentInput::Text(_) => None,
195            crate::types::ContentInput::Blocks(blocks) => Some(blocks.clone()),
196        };
197        Self {
198            current_turn: blocks.map(CurrentTurnContent::new),
199        }
200    }
201
202    pub fn current_turn(&self) -> Option<&CurrentTurnContent> {
203        self.current_turn.as_ref()
204    }
205
206    pub fn current_turn_image(&self, index: usize) -> Option<&crate::types::ContentBlock> {
207        self.current_turn
208            .as_ref()
209            .and_then(|current_turn| current_turn.image(index))
210    }
211}
212
213/// Multimodal content from the currently admitted turn.
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct CurrentTurnContent {
216    blocks: Vec<crate::types::ContentBlock>,
217}
218
219impl CurrentTurnContent {
220    pub fn new(blocks: Vec<crate::types::ContentBlock>) -> Self {
221        Self { blocks }
222    }
223
224    pub fn blocks(&self) -> &[crate::types::ContentBlock] {
225        &self.blocks
226    }
227
228    pub fn image(&self, index: usize) -> Option<&crate::types::ContentBlock> {
229        self.blocks
230            .iter()
231            .filter(|block| matches!(block, crate::types::ContentBlock::Image { .. }))
232            .nth(index)
233    }
234}
235
236/// Completion notice for a detached background operation, projected from
237/// canonical ops-lifecycle terminal state plus dispatcher-owned display metadata.
238///
239/// This is a rebuildable projection (INV-003), not authoritative state.
240/// Terminal class and timing come from `OperationLifecycleSnapshot` (INV-001).
241/// Shell-projected detail is supplementary display only (INV-002).
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct DetachedOpCompletion {
244    /// App-facing job identifier (the control noun for surfaces).
245    pub job_id: String,
246    /// Operation kind from canonical ops-lifecycle.
247    pub kind: OperationKind,
248    /// Terminal status from canonical ops-lifecycle.
249    pub status: OperationStatus,
250    /// Terminal outcome from canonical ops-lifecycle.
251    pub terminal_outcome: Option<OperationTerminalOutcome>,
252    /// Canonical display label from ops-lifecycle snapshot.
253    pub display_name: String,
254    /// Dispatcher-projected summary (exit code, output tail). Display only.
255    pub detail: String,
256    /// Monotonic elapsed millis from ops-lifecycle snapshot.
257    pub elapsed_ms: Option<u64>,
258}
259
260/// Dispatcher binding capabilities — what optional bindings this dispatcher supports.
261///
262/// Returned by [`AgentToolDispatcher::capabilities`]. Replaces individual
263/// `supports_*` boolean methods with a single structured query.
264#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
265pub struct DispatcherCapabilities {
266    /// Whether `bind_ops_lifecycle` is implemented.
267    pub ops_lifecycle: bool,
268}
269
270/// Result of a dispatcher binding operation.
271///
272/// Distinguishes "binding was applied" from "binding was skipped" so callers
273/// can decide whether to wire downstream side effects (e.g. bridge tasks).
274///
275/// **Semantics (decision 11 — supported/best-effort/rejected):**
276/// - `Ok(Bound(d))` = **supported** — binding succeeded, side effects should be wired
277/// - `Ok(Skipped(d))` = **best-effort** — inner shared or incompatible, dispatcher unchanged
278/// - `Err(SharedOwnership)` = **rejected** — outer wrapper is shared, caught by factory pre-check
279/// - `Err(Unsupported)` = **rejected** — type doesn't support this binding, caught by `capabilities()`
280pub enum BindOutcome {
281    /// Binding was applied. The dispatcher was rebound.
282    Bound(Arc<dyn AgentToolDispatcher>),
283    /// Binding was skipped — inner dispatcher was shared or unsupported.
284    /// The returned dispatcher is unchanged but safe to use.
285    Skipped(Arc<dyn AgentToolDispatcher>),
286}
287
288impl BindOutcome {
289    /// Extract the dispatcher, regardless of bind status.
290    pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
291        match self {
292            Self::Bound(d) | Self::Skipped(d) => d,
293        }
294    }
295
296    /// Whether the binding was actually applied.
297    pub fn was_bound(&self) -> bool {
298        matches!(self, Self::Bound(_))
299    }
300}
301
302/// Trait for tool dispatchers
303#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
304#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
305pub trait AgentToolDispatcher: Send + Sync {
306    /// Get available tool definitions
307    fn tools(&self) -> Arc<[Arc<ToolDef>]>;
308
309    /// Query exact catalog support for this dispatcher.
310    ///
311    /// Dispatchers report `exact_catalog=true` only when `tool_catalog()`
312    /// returns the exact precedence-resolved winner registry for the plane
313    /// they own. Wrappers that cannot prove exactness must leave this false.
314    fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
315        ToolCatalogCapabilities::default()
316    }
317
318    /// Return the precedence-resolved tool catalog for this dispatcher.
319    ///
320    /// The default implementation mirrors `tools()` as a visible-only inline
321    /// catalog. Callers must gate any deferred-catalog behavior on
322    /// `tool_catalog_capabilities().exact_catalog`.
323    fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
324        self.tools()
325            .iter()
326            .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
327            .collect::<Vec<_>>()
328            .into()
329    }
330
331    /// Return non-draining pending source names for exact-catalog discovery.
332    ///
333    /// Pending sources are catalog-level discovery metadata rather than
334    /// provider-visible tools. The default implementation reports none.
335    fn pending_catalog_sources(&self) -> Arc<[String]> {
336        Arc::from([])
337    }
338
339    /// Execute a tool call, returning the transcript result and any async operations.
340    ///
341    /// The `ToolDispatchOutcome` separates transcript data (`result`) from
342    /// execution metadata (`async_ops`). Most tools return no async ops;
343    /// use `ToolDispatchOutcome::from(result)` for synchronous tools.
344    async fn dispatch(
345        &self,
346        call: ToolCallView<'_>,
347    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
348
349    /// Execute a tool call with the current turn's typed dispatch context.
350    ///
351    /// Most tools do not need turn-local context and inherit the plain
352    /// `dispatch` behavior. Context-sensitive surfaces override this method
353    /// rather than reaching into session history or prompt text.
354    async fn dispatch_with_context(
355        &self,
356        call: ToolCallView<'_>,
357        _context: &ToolDispatchContext,
358    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
359        self.dispatch(call).await
360    }
361
362    /// Poll for external tool updates from background operations (e.g. async MCP loading).
363    ///
364    /// The default implementation returns an empty update. Implementations that
365    /// support background tool loading (like `McpRouterAdapter`) override this
366    /// to drain completed results and report pending servers.
367    async fn poll_external_updates(&self) -> ExternalToolUpdate {
368        ExternalToolUpdate::default()
369    }
370
371    /// Snapshot the live external tool-surface machine state, if supported.
372    ///
373    /// This is a hidden diagnostic surface for MeerkatMachine mapping work.
374    /// Dispatchers that do not own dynamic external tool mutation should
375    /// return `None`.
376    fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
377        None
378    }
379
380    /// Query which optional bindings this dispatcher supports.
381    fn capabilities(&self) -> DispatcherCapabilities {
382        DispatcherCapabilities::default()
383    }
384
385    /// Bind a session-canonical ops registry into this dispatcher.
386    ///
387    /// Dispatchers that emit session-visible `AsyncOpRef`s must route those
388    /// operation IDs into the bound registry. Under the identity-first Mob
389    /// regime the owner binding passed here is the canonical bridge session
390    /// binding, even though many compatibility surfaces still spell it
391    /// `session_id`. Default returns Unsupported.
392    fn bind_ops_lifecycle(
393        self: Arc<Self>,
394        _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
395        _owner_bridge_session_id: crate::types::SessionId,
396    ) -> Result<BindOutcome, OpsLifecycleBindError> {
397        Err(OpsLifecycleBindError::Unsupported)
398    }
399
400    /// Return the completion enrichment provider, if available.
401    ///
402    /// Dispatchers with shell job management return a provider that maps
403    /// operation IDs to display details (job ID, status detail string).
404    fn completion_enrichment(
405        &self,
406    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
407        None
408    }
409
410    /// Bind a session-scoped MCP server lifecycle handle (Phase 5G / T5g).
411    ///
412    /// Dispatchers that manage per-server MCP handshake lifecycle (like
413    /// `McpRouterAdapter`) use the handle to mirror connection state into
414    /// the session's MeerkatMachine DSL. The default implementation is a
415    /// no-op for dispatchers that have no MCP handshake to route.
416    fn bind_mcp_server_lifecycle_handle(
417        &self,
418        _handle: Arc<dyn crate::handles::McpServerLifecycleHandle>,
419    ) {
420    }
421
422    /// Bind the session-canonical external tool-surface handle.
423    ///
424    /// MCP dispatchers use this to route add/remove/reload/call lifecycle
425    /// semantics through the session's MeerkatMachine DSL instead of their
426    /// standalone compatibility projection. The default implementation is a
427    /// no-op for dispatchers that do not own dynamic external tool surfaces.
428    fn bind_external_tool_surface_handle(
429        &self,
430        _handle: Arc<dyn crate::handles::ExternalToolSurfaceHandle>,
431    ) {
432    }
433}
434
435/// Compute whether the current exact catalog should stay inline or switch to deferred mode.
436pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
437where
438    T: AgentToolDispatcher + ?Sized,
439{
440    let capabilities = dispatcher.tool_catalog_capabilities();
441    if !capabilities.exact_catalog {
442        return ToolCatalogMode::Inline;
443    }
444    let pending_sources = dispatcher.pending_catalog_sources();
445    let catalog = dispatcher.tool_catalog();
446    select_catalog_mode_from_snapshot(
447        capabilities.exact_catalog,
448        catalog.as_ref(),
449        pending_sources.as_ref(),
450    )
451}
452
453/// Compute whether the catalog control plane should be composed for this
454/// dispatcher, even if the current adaptive snapshot remains inline.
455pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
456where
457    T: AgentToolDispatcher + ?Sized,
458{
459    let capabilities = dispatcher.tool_catalog_capabilities();
460    if !capabilities.exact_catalog {
461        return false;
462    }
463    if capabilities.may_require_catalog_control_plane {
464        return true;
465    }
466
467    let pending_sources = dispatcher.pending_catalog_sources();
468    if !pending_sources.is_empty() {
469        return true;
470    }
471
472    let catalog = dispatcher.tool_catalog();
473    deferred_session_entry_count(catalog.as_ref()) > 0
474}
475
476/// Error from [`AgentToolDispatcher::bind_ops_lifecycle`].
477#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
478pub enum OpsLifecycleBindError {
479    #[error("ops lifecycle binding is unsupported")]
480    Unsupported,
481    #[error("dispatcher has shared ownership and cannot be rebound")]
482    SharedOwnership,
483}
484
485/// A tool dispatcher that filters tools based on a policy
486///
487/// Legacy tool lists are filtered once at construction time based on the
488/// allowed_tools list. Exact-catalog dispatchers keep catalog callability live.
489/// The inner dispatcher is used for actual dispatch, but only allowed tools are
490/// exposed via tools() and dispatch() returns AccessDenied for filtered tools.
491pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
492    inner: Arc<T>,
493    allowed_tools: ToolNameSet,
494    /// Pre-computed filtered tool list for non-exact dispatchers.
495    filtered_tools: Arc<[Arc<ToolDef>]>,
496}
497
498impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
499    pub fn new<I, N>(inner: Arc<T>, allowed_tools: I) -> Self
500    where
501        I: IntoIterator<Item = N>,
502        N: Into<ToolName>,
503    {
504        let allowed_set: ToolNameSet = allowed_tools
505            .into_iter()
506            .map(Into::into)
507            .collect::<ToolNameSet>();
508
509        let filtered: Vec<Arc<ToolDef>> = if inner.tool_catalog_capabilities().exact_catalog {
510            inner
511                .tool_catalog()
512                .iter()
513                .filter(|entry| entry.currently_callable())
514                .map(|entry| Arc::clone(&entry.tool))
515                .filter(|t| allowed_set.contains(t.name.as_str()))
516                .collect()
517        } else {
518            inner
519                .tools()
520                .iter()
521                .filter(|t| allowed_set.contains(t.name.as_str()))
522                .map(Arc::clone)
523                .collect()
524        };
525
526        Self {
527            inner,
528            allowed_tools: allowed_set,
529            filtered_tools: filtered.into(),
530        }
531    }
532}
533
534#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
535#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
536impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
537    fn tools(&self) -> Arc<[Arc<ToolDef>]> {
538        if self.inner.tool_catalog_capabilities().exact_catalog {
539            return self
540                .inner
541                .tool_catalog()
542                .iter()
543                .filter(|entry| entry.currently_callable())
544                .map(|entry| Arc::clone(&entry.tool))
545                .filter(|tool| self.allowed_tools.contains(tool.name.as_str()))
546                .collect::<Vec<_>>()
547                .into();
548        }
549        Arc::clone(&self.filtered_tools)
550    }
551
552    async fn dispatch(
553        &self,
554        call: ToolCallView<'_>,
555    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
556        self.dispatch_with_context(call, &ToolDispatchContext::default())
557            .await
558    }
559
560    async fn dispatch_with_context(
561        &self,
562        call: ToolCallView<'_>,
563        context: &ToolDispatchContext,
564    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
565        if !self.allowed_tools.contains(call.name) {
566            let inner_knows_tool = if self.inner.tool_catalog_capabilities().exact_catalog {
567                self.inner
568                    .tool_catalog()
569                    .iter()
570                    .any(|entry| entry.tool.name == call.name)
571            } else {
572                self.inner.tools().iter().any(|tool| tool.name == call.name)
573            };
574            if !inner_knows_tool {
575                return Err(crate::error::ToolError::not_found(call.name));
576            }
577            return Err(crate::error::ToolError::access_denied(call.name));
578        }
579        self.inner.dispatch_with_context(call, context).await
580    }
581
582    fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
583        self.inner.tool_catalog_capabilities()
584    }
585
586    fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
587        if !self.inner.tool_catalog_capabilities().exact_catalog {
588            return self
589                .tools()
590                .iter()
591                .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
592                .collect::<Vec<_>>()
593                .into();
594        }
595        self.inner
596            .tool_catalog()
597            .iter()
598            .filter(|entry| self.allowed_tools.contains(entry.tool.name.as_str()))
599            .cloned()
600            .collect::<Vec<_>>()
601            .into()
602    }
603
604    fn pending_catalog_sources(&self) -> Arc<[String]> {
605        self.inner.pending_catalog_sources()
606    }
607
608    async fn poll_external_updates(&self) -> ExternalToolUpdate {
609        self.inner.poll_external_updates().await
610    }
611
612    fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
613        self.inner.external_tool_surface_snapshot()
614    }
615
616    fn capabilities(&self) -> DispatcherCapabilities {
617        self.inner.capabilities()
618    }
619
620    fn bind_ops_lifecycle(
621        self: Arc<Self>,
622        registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
623        owner_bridge_session_id: crate::types::SessionId,
624    ) -> Result<BindOutcome, OpsLifecycleBindError> {
625        let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
626        if Arc::strong_count(&owned.inner) == 1 {
627            let outcome = owned
628                .inner
629                .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
630            let bound = outcome.was_bound();
631            let d = outcome.into_dispatcher();
632            let allowed_tools = owned.allowed_tools.into_iter().collect::<Vec<_>>();
633            Ok(if bound {
634                BindOutcome::Bound(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
635            } else {
636                BindOutcome::Skipped(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
637            })
638        } else {
639            Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
640                inner: owned.inner,
641                allowed_tools: owned.allowed_tools,
642                filtered_tools: owned.filtered_tools,
643            })))
644        }
645    }
646
647    fn completion_enrichment(
648        &self,
649    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
650        self.inner.completion_enrichment()
651    }
652}
653
654/// Trait for session stores
655#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
656#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
657pub trait AgentSessionStore: Send + Sync {
658    async fn save(&self, session: &Session) -> Result<(), AgentError>;
659    async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
660}
661
662/// Runtime policy for inlining peer lifecycle updates into session context.
663#[derive(Debug, Clone, Copy, PartialEq, Eq)]
664pub enum InlinePeerNotificationPolicy {
665    /// Always inline batched peer lifecycle updates.
666    Always,
667    /// Never inline batched peer lifecycle updates.
668    Never,
669    /// Inline only when post-drain peer count is at or below this threshold.
670    AtMost(usize),
671}
672
673/// Default inline threshold when no explicit value is configured.
674pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
675
676impl InlinePeerNotificationPolicy {
677    /// Resolve policy from transport/build-layer config representation.
678    pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
679        match raw {
680            None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
681            Some(-1) => Ok(Self::Always),
682            Some(0) => Ok(Self::Never),
683            Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
684            Some(v) => Err(v),
685        }
686    }
687}
688
689/// Error returned when a comms runtime capability is not available.
690#[derive(Debug, thiserror::Error)]
691pub enum CommsCapabilityError {
692    /// The runtime does not support this capability.
693    #[error("comms capability not supported: {0}")]
694    Unsupported(String),
695}
696
697/// Trait for comms runtime that can be used with the agent
698#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
699#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
700pub trait CommsRuntime: Send + Sync {
701    /// Canonical runtime routing identity for this peer, if available.
702    ///
703    /// `PeerId` is the UUID-shaped routing key used by peer directories and
704    /// trust stores. Implementations that only have the legacy string carrier
705    /// may return a parsed UUID-shaped `public_key()` value; implementations
706    /// with Ed25519 public keys should override this and return the pubkey-
707    /// derived canonical [`PeerId`].
708    fn peer_id(&self) -> Option<PeerId> {
709        self.public_key()
710            .as_deref()
711            .and_then(|public_key| PeerId::parse(public_key).ok())
712    }
713
714    /// Runtime-local transport/auth public key, if available.
715    ///
716    /// Returns an Ed25519 public key string in `ed25519:<base64>` format.
717    /// This is not the canonical routing [`PeerId`]; use [`Self::peer_id`]
718    /// for roster/projection identity and peer-directory lookups.
719    fn public_key(&self) -> Option<String> {
720        None
721    }
722
723    /// Runtime-local Ed25519 public key bytes, if available.
724    ///
725    /// This is the typed form of [`Self::public_key`]. Trust installation
726    /// paths that need to verify `PeerId`/pubkey consistency should prefer
727    /// this method over reparsing the string carrier.
728    fn public_key_bytes(&self) -> Option<[u8; 32]> {
729        None
730    }
731
732    /// Runtime-local canonical comms routing name, if available.
733    ///
734    /// This is the peer name used in trusted-peer descriptors and peer
735    /// directories. It is separate from the advertised transport address so
736    /// callers do not recover identity by parsing transport strings.
737    fn comms_name(&self) -> Option<String> {
738        None
739    }
740
741    /// Runtime-local advertised comms address, if available.
742    ///
743    /// This is the canonical address the runtime expects peers to use when
744    /// constructing a [`TrustedPeerDescriptor`]. Implementations that do not
745    /// expose a stable advertised address can return `None`.
746    fn advertised_address(&self) -> Option<String> {
747        None
748    }
749
750    /// Runtime-local bootstrap proof for the initial supervisor bind, if
751    /// available.
752    fn bridge_bootstrap_token(&self) -> Option<String> {
753        None
754    }
755
756    /// Register a trusted peer for future peer sends.
757    ///
758    /// Runtimes that manage trust dynamically should accept this as a mutable
759    /// control-plane operation and return `SendError::Unsupported` if not
760    /// available.
761    async fn add_trusted_peer(&self, _peer: TrustedPeerDescriptor) -> Result<(), SendError> {
762        Err(SendError::Unsupported(
763            "add_trusted_peer not supported for this CommsRuntime".to_string(),
764        ))
765    }
766
767    /// Remove a previously trusted peer by peer ID.
768    ///
769    /// Returns `true` if the peer was found and removed, `false` if it
770    /// was not present. After removal, messages from this peer should be
771    /// rejected and `peers()` should no longer return it.
772    async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
773        Err(SendError::Unsupported(
774            "remove_trusted_peer not supported for this CommsRuntime".to_string(),
775        ))
776    }
777
778    /// Register a peer for admission-only trust without listing it in the
779    /// directory.
780    ///
781    /// Used for control-plane edges — the canonical case is the supervisor
782    /// bridge for session-backed mob members: lifecycle notifications
783    /// (`mob.peer_added`, `mob.peer_retired`, …) must land at the member's
784    /// inbox, but the supervisor must not appear as an ordinary sendable
785    /// peer in `comms.peers` / REST / RPC / MCP. The admission gate consults
786    /// both the public and private trust sets; `resolve_peer_directory()`
787    /// consults only the public set.
788    async fn add_private_trusted_peer(
789        &self,
790        _peer: TrustedPeerDescriptor,
791    ) -> Result<(), SendError> {
792        Err(SendError::Unsupported(
793            "add_private_trusted_peer not supported for this CommsRuntime".to_string(),
794        ))
795    }
796
797    /// Remove a previously registered private-trust edge by peer ID.
798    ///
799    /// Returns `true` if the edge was present and removed, `false` if it
800    /// was not.
801    async fn remove_private_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
802        Err(SendError::Unsupported(
803            "remove_private_trusted_peer not supported for this CommsRuntime".to_string(),
804        ))
805    }
806
807    /// Dispatch a canonical comms command.
808    async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
809        Err(SendError::Unsupported(
810            "send not implemented for this CommsRuntime".to_string(),
811        ))
812    }
813
814    #[doc(hidden)]
815    fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
816        let scope_desc = match scope {
817            StreamScope::Session(session_id) => format!("session {session_id}"),
818            StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
819        };
820        Err(StreamError::NotFound(scope_desc))
821    }
822
823    /// List peers visible to this runtime.
824    async fn peers(&self) -> Vec<PeerDirectoryEntry> {
825        Vec::new()
826    }
827
828    /// Count peers visible to this runtime.
829    ///
830    /// Implementations can override this to avoid materializing a full peer list.
831    async fn peer_count(&self) -> usize {
832        self.peers().await.len()
833    }
834
835    #[doc(hidden)]
836    async fn send_and_stream(
837        &self,
838        cmd: CommsCommand,
839    ) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
840        let receipt = self.send(cmd).await?;
841        Err(SendAndStreamError::StreamAttach {
842            receipt,
843            error: StreamError::Internal(
844                "send_and_stream is not implemented for this runtime".to_string(),
845            ),
846        })
847    }
848
849    /// Drain comms inbox and return messages formatted for the LLM
850    async fn drain_messages(&self) -> Vec<String>;
851    /// Get a notification when new messages arrive
852    fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
853    /// Returns true if a DISMISS signal was seen during the last `drain_messages` call.
854    fn dismiss_received(&self) -> bool {
855        false
856    }
857    /// Get an event injector for this runtime's inbox.
858    ///
859    /// Surfaces use this to push external events into the agent inbox.
860    /// Returns `None` if the implementation doesn't support event injection.
861    fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
862        None
863    }
864
865    /// Internal runtime seam for interaction-scoped streaming.
866    #[doc(hidden)]
867    fn interaction_event_injector(
868        &self,
869    ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
870        None
871    }
872
873    /// Drain comms inbox and return structured interactions.
874    ///
875    /// Default implementation wraps `drain_messages()` results as `InteractionContent::Message`
876    /// with generated IDs.
877    async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
878        self.drain_messages()
879            .await
880            .into_iter()
881            .map(|text| crate::interaction::InboxInteraction {
882                id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
883                from_route: None,
884                from: "unknown".into(),
885                content: crate::interaction::InteractionContent::Message {
886                    body: text.clone(),
887                    blocks: None,
888                },
889                rendered_text: text,
890                handling_mode: crate::types::HandlingMode::Queue,
891                render_metadata: None,
892            })
893            .collect()
894    }
895
896    /// Look up and remove a one-shot subscriber for the given interaction.
897    ///
898    /// Returns the event sender if a subscriber was registered (via `inject_with_subscription`).
899    /// The entry is removed from the registry on lookup (one-shot).
900    fn interaction_subscriber(
901        &self,
902        _id: &crate::interaction::InteractionId,
903    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
904        None
905    }
906
907    /// Take and clear the one-shot sender for an interaction-scoped stream.
908    fn take_interaction_stream_sender(
909        &self,
910        _id: &crate::interaction::InteractionId,
911    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
912        self.interaction_subscriber(_id)
913    }
914
915    /// Signal that an interaction has reached a terminal state (complete or failed).
916    ///
917    /// Implementations should transition the reservation FSM to `Completed` and
918    /// clean up registry entries. Called from the keep-alive loop after sending
919    /// terminal events to the tap.
920    fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
921
922    /// Access the session's peer-interaction DSL handle (W1-A).
923    ///
924    /// Returns `None` for transport-only comms runtimes. A runtime that emits
925    /// semantic peer request/response receipts must return `Some` after the
926    /// surface installs machine authority.
927    fn peer_interaction_handle(
928        &self,
929    ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
930        None
931    }
932
933    /// Access peer request/response authority only when the runtime has the
934    /// complete machine-owned lifecycle pair.
935    ///
936    /// Semantic peer request/response ingress requires both the peer
937    /// interaction handle and the paired interaction-stream handle. The stream
938    /// handle itself stays hidden behind runtime ownership; this witness lets
939    /// authority boundaries fail closed instead of treating a lone peer handle
940    /// as sufficient.
941    fn peer_request_response_authority_handle(
942        &self,
943    ) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
944        None
945    }
946
947    /// Drain classified inbox interactions.
948    ///
949    /// Returns interactions with pre-computed classification from ingress.
950    /// The host loop routes on the stored `PeerInputClass` instead of
951    /// re-classifying after drain.
952    ///
953    /// Default returns `Unsupported`. Comms-enabled runtimes must override.
954    async fn drain_classified_inbox_interactions(
955        &self,
956    ) -> Result<Vec<crate::interaction::ClassifiedInboxInteraction>, CommsCapabilityError> {
957        Err(CommsCapabilityError::Unsupported(
958            "drain_classified_inbox_interactions".to_string(),
959        ))
960    }
961
962    /// Drain canonical peer/event ingress candidates.
963    ///
964    /// This remains the live runtime drain bridge for call sites that consume
965    /// the `PeerInputCandidate` noun directly. The underlying drain unit is
966    /// identical to `ClassifiedInboxInteraction`, so the default
967    /// implementation simply forwards the classified drain path.
968    async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
969        self.drain_classified_inbox_interactions()
970            .await
971            .unwrap_or_default()
972    }
973
974    /// Snapshot the currently queued peer-ingress surface without draining it.
975    ///
976    /// This is a hidden diagnostic capability used while mapping the internal
977    /// MeerkatMachine boundary onto existing comms ownership.
978    async fn peer_ingress_queue_snapshot(
979        &self,
980    ) -> Result<crate::interaction::PeerIngressQueueSnapshot, CommsCapabilityError> {
981        Err(CommsCapabilityError::Unsupported(
982            "peer_ingress_queue_snapshot".to_string(),
983        ))
984    }
985
986    /// Snapshot the current peer runtime surface for MeerkatMachine mapping.
987    ///
988    /// This extends the queued ingress snapshot with the local trust membership
989    /// that governs peer admission.
990    async fn peer_ingress_runtime_snapshot(
991        &self,
992    ) -> Result<crate::interaction::PeerIngressRuntimeSnapshot, CommsCapabilityError> {
993        Err(CommsCapabilityError::Unsupported(
994            "peer_ingress_runtime_snapshot".to_string(),
995        ))
996    }
997
998    /// Get a notification that fires only for actionable peer input.
999    ///
1000    /// Default returns `Unsupported`. Comms-enabled runtimes must override.
1001    /// Used by the factory to bridge into `WaitTool` interrupt.
1002    fn actionable_input_notify(&self) -> Result<Arc<tokio::sync::Notify>, CommsCapabilityError> {
1003        Err(CommsCapabilityError::Unsupported(
1004            "actionable_input_notify".to_string(),
1005        ))
1006    }
1007}
1008
1009/// The main Agent struct
1010pub struct Agent<C, T, S>
1011where
1012    C: AgentLlmClient + ?Sized,
1013    T: AgentToolDispatcher + ?Sized,
1014    S: AgentSessionStore + ?Sized,
1015{
1016    config: AgentConfig,
1017    client: Arc<C>,
1018    tools: Arc<T>,
1019    tool_scope: ToolScope,
1020    store: Arc<S>,
1021    session: Session,
1022    budget: Budget,
1023    retry_policy: RetryPolicy,
1024    depth: u32,
1025    pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
1026    pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
1027    pub(super) hook_run_overrides: HookRunOverrides,
1028    /// Optional context compaction strategy.
1029    pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
1030    /// Input tokens from the last LLM response (for compaction trigger).
1031    pub(crate) last_input_tokens: u64,
1032    /// Session-scoped compaction cadence tracked across runs.
1033    pub(crate) compaction_cadence: SessionCompactionCadence,
1034    /// Optional memory store for indexing compaction discards.
1035    pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
1036    /// Optional skill engine for per-turn `/skill-ref` activation.
1037    pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
1038    /// Skill references to resolve and inject for the next turn.
1039    /// Set by surfaces before calling `run()`, consumed on run start.
1040    pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
1041    /// Per-interaction event tap for streaming events to subscribers.
1042    pub(crate) event_tap: crate::event_tap::EventTap,
1043    /// Shared control state for runtime system-context appends.
1044    pub(crate) system_context_state:
1045        Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
1046    /// Optional default event channel configured at build time.
1047    /// Used by run methods when no per-call event channel is provided.
1048    pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
1049    /// Optional session checkpointer for keep-alive persistence.
1050    ///
1051    /// Wired by `AgentBuilder::with_checkpointer`, installed by
1052    /// `PersistentSessionService`, and consumed by
1053    /// `Agent::checkpoint_current_session`.
1054    pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
1055    /// Optional blob store used to hydrate image refs at execution seams.
1056    pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
1057    /// Original error detail preserved from `terminalize_fatal_error` so
1058    /// `build_result` can include the actual failure message (e.g. the API
1059    /// error body) instead of only the generic terminal-cause description.
1060    pub(crate) terminal_error_detail: Option<String>,
1061    /// True once the current run has accepted `RunCompleted` hooks.
1062    pub(crate) run_completed_hooks_applied: bool,
1063    /// True once the current run's public `RunCompleted` event has been
1064    /// emitted. Extraction may continue afterward as a separate post-run phase.
1065    pub(crate) run_completed_event_emitted: bool,
1066    /// Comms intents that should be silently injected into the session
1067    /// without triggering an LLM turn. Matched against `InteractionContent::Request.intent`.
1068    #[allow(dead_code)] // Used by comms_impl when comms feature is enabled
1069    pub(crate) silent_comms_intents: Vec<String>,
1070    /// Optional shared lifecycle registry for async operations.
1071    pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
1072    /// Optional completion feed for cursor-based completion delivery.
1073    pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
1074    /// Shared epoch cursor state for runtime-backed cursor writeback.
1075    pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
1076    /// Local cursor into the completion feed — only the agent boundary advances this.
1077    pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
1078    /// Optional enrichment provider for completion display details.
1079    pub(crate) completion_enrichment:
1080        Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
1081    /// Shared effective mob authority handle. Owned by the agent, passed to
1082    /// mob tools at construction for authorization reads. Updated by
1083    /// `apply_session_effects` after each tool batch as a derived projection
1084    /// of the canonical `session.build_state().mob_tool_authority_context`.
1085    pub(crate) mob_authority_handle:
1086        Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
1087    /// Runtime-backed turn-state handle, provided by the session runtime bindings.
1088    pub(crate) turn_state_handle: Option<Arc<dyn crate::TurnStateHandle>>,
1089    /// True when the runtime control plane must stamp execution kind metadata.
1090    pub(crate) runtime_execution_kind_required: bool,
1091    /// Typed execution intent for the current run, when this turn is owned by
1092    /// the runtime control plane rather than a direct surface call.
1093    pub(crate) runtime_execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
1094    /// Runtime-backed external tool-surface diagnostic handle, when provided
1095    /// by the session runtime bindings.
1096    pub(crate) external_tool_surface_handle: Option<Arc<dyn crate::ExternalToolSurfaceHandle>>,
1097    /// Runtime-backed auth lease handle (Phase 1.5-rev).
1098    pub(crate) auth_lease_handle: Option<Arc<dyn crate::handles::AuthLeaseHandle>>,
1099    /// Runtime-backed MCP server lifecycle handle (Phase 5G / T5g). When set,
1100    /// the agent loop reads `pending_server_ids()` at each CallingLlm boundary
1101    /// to decide whether to emit the `[MCP_PENDING]` system notice.
1102    pub(crate) mcp_server_lifecycle_handle:
1103        Option<Arc<dyn crate::handles::McpServerLifecycleHandle>>,
1104    /// Shared live flag for cancellation at the next turn boundary.
1105    pub(crate) cancel_after_boundary_requested: Arc<std::sync::atomic::AtomicBool>,
1106    /// Optional resolver for model-specific operational defaults (e.g., call timeout).
1107    /// Consulted at each LLM call for hot-swap-aware profile default resolution.
1108    pub(crate) model_defaults_resolver:
1109        Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
1110    /// Explicit call-timeout override from the build/config composition seam.
1111    /// Takes precedence over profile-derived defaults.
1112    pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
1113    /// Structured-output extraction state carried into RunResult.
1114    pub(crate) extraction_state: extraction::ExtractionState,
1115    /// Last published hidden deferred-catalog names.
1116    pub(crate) last_hidden_deferred_catalog_names: BTreeSet<String>,
1117    /// Last published pending catalog sources.
1118    pub(crate) last_pending_catalog_sources: BTreeSet<String>,
1119    /// Dispatch-time projection of the current turn input for contextual tools.
1120    pub(crate) tool_dispatch_context: ToolDispatchContext,
1121}
1122
1123#[cfg(test)]
1124#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1125mod tests {
1126    use super::{
1127        AgentToolDispatcher, CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS,
1128        FilteredToolDispatcher, InlinePeerNotificationPolicy, ToolDispatchContext,
1129    };
1130    use crate::comms::{
1131        PeerAddress, PeerId, PeerName, PeerTransport, SendError, TrustedPeerDescriptor,
1132    };
1133    use crate::types::{ContentBlock, ContentInput, ToolCallView, ToolDef, ToolResult};
1134    use async_trait::async_trait;
1135    use serde_json::json;
1136    use std::sync::Arc;
1137    use tokio::sync::Notify;
1138
1139    struct NoopCommsRuntime {
1140        notify: Arc<Notify>,
1141    }
1142
1143    struct ContextAwareToolDispatcher;
1144
1145    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1146    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1147    impl AgentToolDispatcher for ContextAwareToolDispatcher {
1148        fn tools(&self) -> Arc<[Arc<ToolDef>]> {
1149            Arc::from([Arc::new(ToolDef {
1150                name: "inspect_context".into(),
1151                description: "inspect context".to_string(),
1152                input_schema: json!({"type": "object"}),
1153                provenance: None,
1154            })])
1155        }
1156
1157        async fn dispatch(
1158            &self,
1159            call: ToolCallView<'_>,
1160        ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1161            Ok(ToolResult::new(
1162                call.id.to_string(),
1163                json!({"saw_context_image": false}).to_string(),
1164                false,
1165            )
1166            .into())
1167        }
1168
1169        async fn dispatch_with_context(
1170            &self,
1171            call: ToolCallView<'_>,
1172            context: &ToolDispatchContext,
1173        ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
1174            Ok(ToolResult::new(
1175                call.id.to_string(),
1176                json!({"saw_context_image": context.current_turn_image(0).is_some()}).to_string(),
1177                false,
1178            )
1179            .into())
1180        }
1181    }
1182
1183    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1184    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1185    impl CommsRuntime for NoopCommsRuntime {
1186        async fn drain_messages(&self) -> Vec<String> {
1187            Vec::new()
1188        }
1189
1190        fn inbox_notify(&self) -> std::sync::Arc<Notify> {
1191            self.notify.clone()
1192        }
1193    }
1194
1195    #[tokio::test]
1196    async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
1197        let runtime = NoopCommsRuntime {
1198            notify: Arc::new(Notify::new()),
1199        };
1200        assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
1201        let peer = TrustedPeerDescriptor {
1202            peer_id: PeerId::new(),
1203            name: PeerName::new("peer-a").expect("valid peer name"),
1204            address: PeerAddress::new(PeerTransport::Inproc, "peer-a"),
1205            pubkey: [0u8; 32],
1206        };
1207        let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
1208        assert!(matches!(result, Err(SendError::Unsupported(_))));
1209    }
1210
1211    #[tokio::test]
1212    async fn test_remove_trusted_peer_default_unsupported() {
1213        let runtime = NoopCommsRuntime {
1214            notify: Arc::new(Notify::new()),
1215        };
1216        let peer_id = PeerId::new().to_string();
1217        let result =
1218            <NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, &peer_id).await;
1219        assert!(matches!(result, Err(SendError::Unsupported(_))));
1220    }
1221
1222    #[tokio::test]
1223    async fn filtered_tool_dispatcher_preserves_dispatch_context() {
1224        let dispatcher =
1225            FilteredToolDispatcher::new(Arc::new(ContextAwareToolDispatcher), ["inspect_context"]);
1226        let args = serde_json::value::RawValue::from_string("{}".to_string())
1227            .expect("empty object should be valid JSON");
1228        let call = ToolCallView {
1229            id: "ctx-1",
1230            name: "inspect_context",
1231            args: &args,
1232        };
1233        let context = ToolDispatchContext::from_current_turn_input(&ContentInput::Blocks(vec![
1234            ContentBlock::Image {
1235                media_type: "image/png".to_string(),
1236                data: "abc".into(),
1237            },
1238        ]));
1239
1240        let outcome = dispatcher
1241            .dispatch_with_context(call, &context)
1242            .await
1243            .expect("filtered wrapper should dispatch");
1244        let payload: serde_json::Value =
1245            serde_json::from_str(&outcome.result.text_content()).expect("tool result JSON");
1246        assert_eq!(payload["saw_context_image"], true);
1247    }
1248
1249    #[test]
1250    fn test_inline_peer_notification_policy_from_raw() {
1251        assert_eq!(
1252            InlinePeerNotificationPolicy::try_from_raw(None),
1253            Ok(InlinePeerNotificationPolicy::AtMost(
1254                DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
1255            ))
1256        );
1257        assert_eq!(
1258            InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
1259            Ok(InlinePeerNotificationPolicy::Always)
1260        );
1261        assert_eq!(
1262            InlinePeerNotificationPolicy::try_from_raw(Some(0)),
1263            Ok(InlinePeerNotificationPolicy::Never)
1264        );
1265        assert_eq!(
1266            InlinePeerNotificationPolicy::try_from_raw(Some(25)),
1267            Ok(InlinePeerNotificationPolicy::AtMost(25))
1268        );
1269        assert_eq!(
1270            InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
1271            Err(-42)
1272        );
1273    }
1274
1275    /// UNIT-001: OperationStatus::is_terminal() returns true for all terminal
1276    /// variants and false for non-terminal ones.
1277    #[test]
1278    fn unit_001_terminal_status_values() {
1279        use crate::ops_lifecycle::OperationStatus;
1280        assert!(OperationStatus::Completed.is_terminal());
1281        assert!(OperationStatus::Failed.is_terminal());
1282        assert!(OperationStatus::Cancelled.is_terminal());
1283        assert!(OperationStatus::Aborted.is_terminal());
1284        assert!(OperationStatus::Retired.is_terminal());
1285        assert!(OperationStatus::Terminated.is_terminal());
1286        assert!(!OperationStatus::Running.is_terminal());
1287        assert!(!OperationStatus::Provisioning.is_terminal());
1288        assert!(!OperationStatus::Retiring.is_terminal());
1289        assert!(!OperationStatus::Absent.is_terminal());
1290    }
1291
1292    /// UNIT-002: DetachedOpCompletion serializes without operation_id.
1293    /// The app-facing control noun is job_id (CONTRACT-003).
1294    #[test]
1295    fn unit_002_detached_op_completion_has_no_operation_id() {
1296        use crate::agent::DetachedOpCompletion;
1297        use crate::ops_lifecycle::{OperationKind, OperationStatus};
1298
1299        let completion = DetachedOpCompletion {
1300            job_id: "j_test".into(),
1301            kind: OperationKind::BackgroundToolOp,
1302            status: OperationStatus::Completed,
1303            terminal_outcome: None,
1304            display_name: "test cmd".into(),
1305            detail: "ok".into(),
1306            elapsed_ms: None,
1307        };
1308        #[allow(clippy::unwrap_used)]
1309        let json = serde_json::to_value(&completion).unwrap();
1310        assert!(
1311            json.get("operation_id").is_none(),
1312            "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
1313        );
1314        assert!(
1315            json.get("job_id").is_some(),
1316            "job_id must be the app-facing control noun"
1317        );
1318    }
1319}