Skip to main content

meerkat_core/
agent.rs

1//! Agent - the core agent orchestrator
2//!
3//! The Agent struct ties together all components and runs the agent loop.
4
5mod builder;
6pub mod comms_impl;
7pub mod compact;
8mod extraction;
9mod hook_impl;
10mod runner;
11pub mod skills;
12mod state;
13
14use crate::budget::Budget;
15use crate::comms::{
16    CommsCommand, EventStream, PeerDirectoryEntry, SendError, SendReceipt, StreamError,
17    StreamScope, TrustedPeerSpec,
18};
19use crate::compact::SessionCompactionCadence;
20use crate::config::{AgentConfig, HookRunOverrides};
21use crate::error::AgentError;
22use crate::event::ExternalToolDelta;
23use crate::hooks::HookEngine;
24use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
25use crate::retry::RetryPolicy;
26use crate::schema::{CompiledSchema, SchemaError};
27use crate::session::Session;
28use crate::state::LoopState;
29#[cfg(target_arch = "wasm32")]
30use crate::tokio;
31use crate::tool_catalog::{
32    ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
33    select_catalog_mode_from_snapshot,
34};
35use crate::tool_scope::ToolScope;
36use crate::types::{
37    AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
38    ToolDef, Usage,
39};
40use async_trait::async_trait;
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43use std::collections::{BTreeSet, HashSet};
44use std::sync::Arc;
45
46pub use builder::AgentBuilder;
47pub use runner::AgentRunner;
48
49/// Special error prefix to signal tool calls that must be routed externally.
50///
51/// DEPRECATED: Use `ToolError::CallbackPending` or `AgentError::CallbackPending` instead.
52/// This constant is kept for backward compatibility but will be removed in a future version.
53#[deprecated(
54    since = "0.2.0",
55    note = "Use ToolError::CallbackPending or AgentError::CallbackPending instead"
56)]
57pub const CALLBACK_TOOL_PREFIX: &str = "CALLBACK_TOOL_PENDING:";
58
59/// Trait for LLM clients that can be used with the agent
60#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
61#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
62pub trait AgentLlmClient: Send + Sync {
63    /// Stream a response from the LLM
64    async fn stream_response(
65        &self,
66        messages: &[Message],
67        tools: &[Arc<ToolDef>],
68        max_tokens: u32,
69        temperature: Option<f32>,
70        provider_params: Option<&Value>,
71    ) -> Result<LlmStreamResult, AgentError>;
72
73    /// Get the provider name
74    fn provider(&self) -> &'static str;
75
76    /// Get the current effective model identifier.
77    ///
78    /// Used by the agent loop for profile-default resolution (e.g., call timeout
79    /// defaults that vary per model family). Must reflect the current model even
80    /// after hot-swap.
81    fn model(&self) -> &str;
82
83    /// Compile an output schema for this provider.
84    ///
85    /// Default implementation normalizes the schema without provider-specific lowering.
86    /// Adapters override this to apply provider-specific transformations (e.g.,
87    /// Anthropic adds `additionalProperties: false`, Gemini strips unsupported keywords).
88    fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
89        // Default passthrough: normalized clone, no provider-specific lowering
90        Ok(CompiledSchema {
91            schema: output_schema.schema.as_value().clone(),
92            warnings: Vec::new(),
93        })
94    }
95}
96
97/// Result of streaming from the LLM
98pub struct LlmStreamResult {
99    blocks: Vec<AssistantBlock>,
100    stop_reason: StopReason,
101    usage: Usage,
102}
103
104impl LlmStreamResult {
105    pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
106        Self {
107            blocks,
108            stop_reason,
109            usage,
110        }
111    }
112
113    pub fn blocks(&self) -> &[AssistantBlock] {
114        &self.blocks
115    }
116    pub fn stop_reason(&self) -> StopReason {
117        self.stop_reason
118    }
119    pub fn usage(&self) -> &Usage {
120        &self.usage
121    }
122
123    pub fn into_message(self) -> BlockAssistantMessage {
124        BlockAssistantMessage {
125            blocks: self.blocks,
126            stop_reason: self.stop_reason,
127        }
128    }
129
130    pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
131        (self.blocks, self.stop_reason, self.usage)
132    }
133}
134
135/// Result of polling for external tool updates.
136///
137/// Returned by [`AgentToolDispatcher::poll_external_updates`].
138#[derive(Debug, Clone, Default)]
139pub struct ExternalToolUpdate {
140    /// Notices about completed background operations since last poll.
141    pub notices: Vec<ExternalToolDelta>,
142    /// Names of servers still connecting in the background.
143    pub pending: Vec<String>,
144    /// Detached background operation completions since last poll.
145    pub background_completions: Vec<DetachedOpCompletion>,
146}
147
148/// Completion notice for a detached background operation, projected from
149/// canonical ops-lifecycle terminal state plus dispatcher-owned display metadata.
150///
151/// This is a rebuildable projection (INV-003), not authoritative state.
152/// Terminal class and timing come from `OperationLifecycleSnapshot` (INV-001).
153/// Shell-projected detail is supplementary display only (INV-002).
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct DetachedOpCompletion {
156    /// App-facing job identifier (the control noun for surfaces).
157    pub job_id: String,
158    /// Operation kind from canonical ops-lifecycle.
159    pub kind: OperationKind,
160    /// Terminal status from canonical ops-lifecycle.
161    pub status: OperationStatus,
162    /// Terminal outcome from canonical ops-lifecycle.
163    pub terminal_outcome: Option<OperationTerminalOutcome>,
164    /// Canonical display label from ops-lifecycle snapshot.
165    pub display_name: String,
166    /// Dispatcher-projected summary (exit code, output tail). Display only.
167    pub detail: String,
168    /// Monotonic elapsed millis from ops-lifecycle snapshot.
169    pub elapsed_ms: Option<u64>,
170}
171
172/// Dispatcher binding capabilities — what optional bindings this dispatcher supports.
173///
174/// Returned by [`AgentToolDispatcher::capabilities`]. Replaces individual
175/// `supports_*` boolean methods with a single structured query.
176#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
177pub struct DispatcherCapabilities {
178    /// Whether `bind_ops_lifecycle` is implemented.
179    pub ops_lifecycle: bool,
180}
181
182/// Result of a dispatcher binding operation.
183///
184/// Distinguishes "binding was applied" from "binding was skipped" so callers
185/// can decide whether to wire downstream side effects (e.g. bridge tasks).
186///
187/// **Semantics (decision 11 — supported/best-effort/rejected):**
188/// - `Ok(Bound(d))` = **supported** — binding succeeded, side effects should be wired
189/// - `Ok(Skipped(d))` = **best-effort** — inner shared or incompatible, dispatcher unchanged
190/// - `Err(SharedOwnership)` = **rejected** — outer wrapper is shared, caught by factory pre-check
191/// - `Err(Unsupported)` = **rejected** — type doesn't support this binding, caught by `capabilities()`
192pub enum BindOutcome {
193    /// Binding was applied. The dispatcher was rebound.
194    Bound(Arc<dyn AgentToolDispatcher>),
195    /// Binding was skipped — inner dispatcher was shared or unsupported.
196    /// The returned dispatcher is unchanged but safe to use.
197    Skipped(Arc<dyn AgentToolDispatcher>),
198}
199
200impl BindOutcome {
201    /// Extract the dispatcher, regardless of bind status.
202    pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
203        match self {
204            Self::Bound(d) | Self::Skipped(d) => d,
205        }
206    }
207
208    /// Whether the binding was actually applied.
209    pub fn was_bound(&self) -> bool {
210        matches!(self, Self::Bound(_))
211    }
212}
213
214/// Trait for tool dispatchers
215#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
216#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
217pub trait AgentToolDispatcher: Send + Sync {
218    /// Get available tool definitions
219    fn tools(&self) -> Arc<[Arc<ToolDef>]>;
220
221    /// Query exact catalog support for this dispatcher.
222    ///
223    /// Dispatchers report `exact_catalog=true` only when `tool_catalog()`
224    /// returns the exact precedence-resolved winner registry for the plane
225    /// they own. Wrappers that cannot prove exactness must leave this false.
226    fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
227        ToolCatalogCapabilities::default()
228    }
229
230    /// Return the precedence-resolved tool catalog for this dispatcher.
231    ///
232    /// The default implementation mirrors `tools()` as a visible-only inline
233    /// catalog. Callers must gate any deferred-catalog behavior on
234    /// `tool_catalog_capabilities().exact_catalog`.
235    fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
236        self.tools()
237            .iter()
238            .map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
239            .collect::<Vec<_>>()
240            .into()
241    }
242
243    /// Return non-draining pending source names for exact-catalog discovery.
244    ///
245    /// Pending sources are catalog-level discovery metadata rather than
246    /// provider-visible tools. The default implementation reports none.
247    fn pending_catalog_sources(&self) -> Arc<[String]> {
248        Arc::from([])
249    }
250
251    /// Execute a tool call, returning the transcript result and any async operations.
252    ///
253    /// The `ToolDispatchOutcome` separates transcript data (`result`) from
254    /// execution metadata (`async_ops`). Most tools return no async ops;
255    /// use `ToolDispatchOutcome::from(result)` for synchronous tools.
256    async fn dispatch(
257        &self,
258        call: ToolCallView<'_>,
259    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
260
261    /// Poll for external tool updates from background operations (e.g. async MCP loading).
262    ///
263    /// The default implementation returns an empty update. Implementations that
264    /// support background tool loading (like `McpRouterAdapter`) override this
265    /// to drain completed results and report pending servers.
266    async fn poll_external_updates(&self) -> ExternalToolUpdate {
267        ExternalToolUpdate::default()
268    }
269
270    /// Query which optional bindings this dispatcher supports.
271    fn capabilities(&self) -> DispatcherCapabilities {
272        DispatcherCapabilities::default()
273    }
274
275    /// Bind a session-canonical ops registry into this dispatcher.
276    ///
277    /// Dispatchers that emit session-visible `AsyncOpRef`s must route those
278    /// operation IDs into the bound registry. Default returns Unsupported.
279    fn bind_ops_lifecycle(
280        self: Arc<Self>,
281        _registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
282        _owner_session_id: crate::types::SessionId,
283    ) -> Result<BindOutcome, OpsLifecycleBindError> {
284        Err(OpsLifecycleBindError::Unsupported)
285    }
286
287    /// Return the completion enrichment provider, if available.
288    ///
289    /// Dispatchers with shell job management return a provider that maps
290    /// operation IDs to display details (job ID, status detail string).
291    fn completion_enrichment(
292        &self,
293    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
294        None
295    }
296}
297
298/// Compute whether the current exact catalog should stay inline or switch to deferred mode.
299pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
300where
301    T: AgentToolDispatcher + ?Sized,
302{
303    let capabilities = dispatcher.tool_catalog_capabilities();
304    if !capabilities.exact_catalog {
305        return ToolCatalogMode::Inline;
306    }
307    let pending_sources = dispatcher.pending_catalog_sources();
308    let catalog = dispatcher.tool_catalog();
309    select_catalog_mode_from_snapshot(
310        capabilities.exact_catalog,
311        catalog.as_ref(),
312        pending_sources.as_ref(),
313    )
314}
315
316/// Compute whether the catalog control plane should be composed for this
317/// dispatcher, even if the current adaptive snapshot remains inline.
318pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
319where
320    T: AgentToolDispatcher + ?Sized,
321{
322    let capabilities = dispatcher.tool_catalog_capabilities();
323    if !capabilities.exact_catalog {
324        return false;
325    }
326    if capabilities.may_require_catalog_control_plane {
327        return true;
328    }
329
330    let pending_sources = dispatcher.pending_catalog_sources();
331    if !pending_sources.is_empty() {
332        return true;
333    }
334
335    let catalog = dispatcher.tool_catalog();
336    deferred_session_entry_count(catalog.as_ref()) > 0
337}
338
339/// Error from [`AgentToolDispatcher::bind_ops_lifecycle`].
340#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
341pub enum OpsLifecycleBindError {
342    #[error("ops lifecycle binding is unsupported")]
343    Unsupported,
344    #[error("dispatcher has shared ownership and cannot be rebound")]
345    SharedOwnership,
346}
347
348/// A tool dispatcher that filters tools based on a policy
349///
350/// Tools are filtered once at construction time based on the allowed_tools list.
351/// The inner dispatcher is used for actual dispatch, but only allowed tools are
352/// exposed via tools() and dispatch() returns AccessDenied for filtered tools.
353pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
354    inner: Arc<T>,
355    allowed_tools: HashSet<String>,
356    /// Pre-computed filtered tool list (computed once at construction)
357    filtered_tools: Arc<[Arc<ToolDef>]>,
358}
359
360impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
361    pub fn new(inner: Arc<T>, allowed_tools: Vec<String>) -> Self {
362        let allowed_set: HashSet<String> = allowed_tools.into_iter().collect();
363
364        // Filter tools once at construction - the tool registry is static for agent lifetime
365        let inner_tools = inner.tools();
366        let filtered: Vec<Arc<ToolDef>> = inner_tools
367            .iter()
368            .filter(|t| allowed_set.contains(t.name.as_str()))
369            .map(Arc::clone)
370            .collect();
371
372        Self {
373            inner,
374            allowed_tools: allowed_set,
375            filtered_tools: filtered.into(),
376        }
377    }
378}
379
380#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
381#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
382impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
383    fn tools(&self) -> Arc<[Arc<ToolDef>]> {
384        Arc::clone(&self.filtered_tools)
385    }
386
387    async fn dispatch(
388        &self,
389        call: ToolCallView<'_>,
390    ) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
391        if !self.allowed_tools.contains(call.name) {
392            return Err(crate::error::ToolError::access_denied(call.name));
393        }
394        self.inner.dispatch(call).await
395    }
396
397    async fn poll_external_updates(&self) -> ExternalToolUpdate {
398        self.inner.poll_external_updates().await
399    }
400
401    fn capabilities(&self) -> DispatcherCapabilities {
402        self.inner.capabilities()
403    }
404
405    fn bind_ops_lifecycle(
406        self: Arc<Self>,
407        registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
408        owner_session_id: crate::types::SessionId,
409    ) -> Result<BindOutcome, OpsLifecycleBindError> {
410        let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
411        if Arc::strong_count(&owned.inner) == 1 {
412            let outcome = owned.inner.bind_ops_lifecycle(registry, owner_session_id)?;
413            let bound = outcome.was_bound();
414            let d = outcome.into_dispatcher();
415            Ok(if bound {
416                BindOutcome::Bound(Arc::new(FilteredToolDispatcher {
417                    inner: d,
418                    allowed_tools: owned.allowed_tools,
419                    filtered_tools: owned.filtered_tools,
420                }))
421            } else {
422                BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
423                    inner: d,
424                    allowed_tools: owned.allowed_tools,
425                    filtered_tools: owned.filtered_tools,
426                }))
427            })
428        } else {
429            Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
430                inner: owned.inner,
431                allowed_tools: owned.allowed_tools,
432                filtered_tools: owned.filtered_tools,
433            })))
434        }
435    }
436
437    fn completion_enrichment(
438        &self,
439    ) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
440        self.inner.completion_enrichment()
441    }
442}
443
444/// Trait for session stores
445#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
446#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
447pub trait AgentSessionStore: Send + Sync {
448    async fn save(&self, session: &Session) -> Result<(), AgentError>;
449    async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
450}
451
452/// Runtime policy for inlining peer lifecycle updates into session context.
453#[derive(Debug, Clone, Copy, PartialEq, Eq)]
454pub enum InlinePeerNotificationPolicy {
455    /// Always inline batched peer lifecycle updates.
456    Always,
457    /// Never inline batched peer lifecycle updates.
458    Never,
459    /// Inline only when post-drain peer count is at or below this threshold.
460    AtMost(usize),
461}
462
463/// Default inline threshold when no explicit value is configured.
464pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
465
466impl InlinePeerNotificationPolicy {
467    /// Resolve policy from transport/build-layer config representation.
468    pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
469        match raw {
470            None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
471            Some(-1) => Ok(Self::Always),
472            Some(0) => Ok(Self::Never),
473            Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
474            Some(v) => Err(v),
475        }
476    }
477}
478
479/// Error returned when a comms runtime capability is not available.
480#[derive(Debug, thiserror::Error)]
481pub enum CommsCapabilityError {
482    /// The runtime does not support this capability.
483    #[error("comms capability not supported: {0}")]
484    Unsupported(String),
485}
486
487/// Trait for comms runtime that can be used with the agent
488#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
489#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
490pub trait CommsRuntime: Send + Sync {
491    /// Runtime-local public key identifier, if available.
492    ///
493    /// Returns a peer ID string in `ed25519:<base64>` format.
494    fn public_key(&self) -> Option<String> {
495        None
496    }
497
498    /// Register a trusted peer for future peer sends.
499    ///
500    /// Runtimes that manage trust dynamically should accept this as a mutable
501    /// control-plane operation and return `SendError::Unsupported` if not
502    /// available.
503    async fn add_trusted_peer(&self, _peer: TrustedPeerSpec) -> Result<(), SendError> {
504        Err(SendError::Unsupported(
505            "add_trusted_peer not supported for this CommsRuntime".to_string(),
506        ))
507    }
508
509    /// Remove a previously trusted peer by peer ID.
510    ///
511    /// Returns `true` if the peer was found and removed, `false` if it
512    /// was not present. After removal, messages from this peer should be
513    /// rejected and `peers()` should no longer return it.
514    async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
515        Err(SendError::Unsupported(
516            "remove_trusted_peer not supported for this CommsRuntime".to_string(),
517        ))
518    }
519
520    /// Dispatch a canonical comms command.
521    async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
522        Err(SendError::Unsupported(
523            "send not implemented for this CommsRuntime".to_string(),
524        ))
525    }
526
527    #[doc(hidden)]
528    fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
529        let scope_desc = match scope {
530            StreamScope::Session(session_id) => format!("session {session_id}"),
531        };
532        Err(StreamError::NotFound(scope_desc))
533    }
534
535    /// List peers visible to this runtime.
536    async fn peers(&self) -> Vec<PeerDirectoryEntry> {
537        Vec::new()
538    }
539
540    /// Count peers visible to this runtime.
541    ///
542    /// Implementations can override this to avoid materializing a full peer list.
543    async fn peer_count(&self) -> usize {
544        self.peers().await.len()
545    }
546
547    /// Drain comms inbox and return messages formatted for the LLM
548    async fn drain_messages(&self) -> Vec<String>;
549    /// Get a notification when new messages arrive
550    fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
551    /// Returns true if a DISMISS signal was seen during the last `drain_messages` call.
552    fn dismiss_received(&self) -> bool {
553        false
554    }
555    /// Get an event injector for this runtime's inbox.
556    ///
557    /// Surfaces use this to push external events into the agent inbox.
558    /// Returns `None` if the implementation doesn't support event injection.
559    fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
560        None
561    }
562
563    /// Internal runtime seam for interaction-scoped streaming.
564    #[doc(hidden)]
565    fn interaction_event_injector(
566        &self,
567    ) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
568        None
569    }
570
571    /// Drain comms inbox and return structured interactions.
572    ///
573    /// Default implementation wraps `drain_messages()` results as `InteractionContent::Message`
574    /// with generated IDs.
575    async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
576        self.drain_messages()
577            .await
578            .into_iter()
579            .map(|text| crate::interaction::InboxInteraction {
580                id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
581                from: "unknown".into(),
582                content: crate::interaction::InteractionContent::Message {
583                    body: text.clone(),
584                    blocks: None,
585                },
586                rendered_text: text,
587                handling_mode: crate::types::HandlingMode::Queue,
588                render_metadata: None,
589            })
590            .collect()
591    }
592
593    /// Look up and remove a one-shot subscriber for the given interaction.
594    ///
595    /// Returns the event sender if a subscriber was registered (via `inject_with_subscription`).
596    /// The entry is removed from the registry on lookup (one-shot).
597    fn interaction_subscriber(
598        &self,
599        _id: &crate::interaction::InteractionId,
600    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
601        None
602    }
603
604    /// Take and clear the one-shot sender for an interaction-scoped stream.
605    fn take_interaction_stream_sender(
606        &self,
607        _id: &crate::interaction::InteractionId,
608    ) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
609        self.interaction_subscriber(_id)
610    }
611
612    /// Signal that an interaction has reached a terminal state (complete or failed).
613    ///
614    /// Implementations should transition the reservation FSM to `Completed` and
615    /// clean up registry entries. Called from the keep-alive loop after sending
616    /// terminal events to the tap.
617    fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
618
619    /// Drain machine-authored peer/event ingress candidates.
620    ///
621    /// Runtime-backed peer ingress must route through this canonical seam.
622    async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate>;
623}
624
625/// The main Agent struct
626pub struct Agent<C, T, S>
627where
628    C: AgentLlmClient + ?Sized,
629    T: AgentToolDispatcher + ?Sized,
630    S: AgentSessionStore + ?Sized,
631{
632    config: AgentConfig,
633    client: Arc<C>,
634    tools: Arc<T>,
635    tool_scope: ToolScope,
636    store: Arc<S>,
637    session: Session,
638    budget: Budget,
639    retry_policy: RetryPolicy,
640    state: LoopState,
641    depth: u32,
642    pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
643    pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
644    pub(super) hook_run_overrides: HookRunOverrides,
645    /// Optional context compaction strategy.
646    pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
647    /// Input tokens from the last LLM response (for compaction trigger).
648    pub(crate) last_input_tokens: u64,
649    /// Session-scoped compaction cadence tracked across runs.
650    pub(crate) compaction_cadence: SessionCompactionCadence,
651    /// Optional memory store for indexing compaction discards.
652    pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
653    /// Optional skill engine for per-turn `/skill-ref` activation.
654    pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
655    /// Skill references to resolve and inject for the next turn.
656    /// Set by surfaces before calling `run()`, consumed on run start.
657    pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
658    /// Per-interaction event tap for streaming events to subscribers.
659    pub(crate) event_tap: crate::event_tap::EventTap,
660    /// Shared control state for runtime system-context appends.
661    pub(crate) system_context_state:
662        Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
663    /// Optional default event channel configured at build time.
664    /// Used by run methods when no per-call event channel is provided.
665    pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
666    /// Optional session checkpointer for host-mode persistence.
667    #[allow(dead_code)] // Used by persistent session service; Phase 9-10 wiring pending
668    pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
669    /// Optional blob store used to hydrate image refs at execution seams.
670    pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
671    /// Run-scoped diagnostic stash for originating hard-failure errors.
672    ///
673    /// When an exhausted hard LLM-call failure (e.g., CallTimeout, NetworkTimeout)
674    /// routes through machine-owned FatalFailure, the originating `AgentError` is
675    /// stashed here so `build_result()` can surface it instead of a generic
676    /// `TerminalFailure`. This is ephemeral, consumed once, and non-authoritative
677    /// for terminal phase/classification.
678    pub(crate) pending_fatal_diagnostic: Option<AgentError>,
679    /// Comms intents that should be silently injected into the session
680    /// without triggering an LLM turn. Matched against `InteractionContent::Request.intent`.
681    #[allow(dead_code)] // Used by comms_impl when comms feature is enabled
682    pub(crate) silent_comms_intents: Vec<String>,
683    /// Optional shared lifecycle registry for async operations.
684    ///
685    /// When set, the agent loop waits on the exact turn-local operation IDs
686    /// registered in `turn_authority.pending_op_refs()`.
687    pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
688    /// Optional completion feed for cursor-based completion delivery.
689    pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
690    /// Shared epoch cursor state for runtime-backed cursor writeback.
691    pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
692    /// Local cursor into the completion feed — only the agent boundary advances this.
693    pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
694    /// Optional enrichment provider for completion display details.
695    pub(crate) completion_enrichment:
696        Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
697    /// Shared effective mob authority handle. Owned by the agent, passed to
698    /// mob tools at construction for authorization reads. Updated by
699    /// `apply_session_effects` after each tool batch as a derived projection
700    /// of the canonical `session.build_state().mob_tool_authority_context`.
701    pub(crate) mob_authority_handle:
702        Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
703    /// Machine authority for turn-execution state transitions (RMAT).
704    pub(crate) turn_authority: crate::turn_execution_authority::TurnExecutionAuthority,
705    /// Optional resolver for model-specific operational defaults (e.g., call timeout).
706    /// Consulted at each LLM call for hot-swap-aware profile default resolution.
707    pub(crate) model_defaults_resolver:
708        Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
709    /// Explicit call-timeout override from the build/config composition seam.
710    /// Takes precedence over profile-derived defaults.
711    pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
712    /// Populated on successful extraction validation — carried into RunResult.
713    pub(crate) extraction_result: Option<serde_json::Value>,
714    /// Schema warnings from compilation — carried into RunResult.
715    pub(crate) extraction_schema_warnings: Option<Vec<crate::schema::SchemaWarning>>,
716    /// Last validation error (for retry prompt).
717    pub(crate) extraction_last_error: Option<String>,
718    /// Last published hidden deferred-catalog names.
719    pub(crate) last_hidden_deferred_catalog_names: BTreeSet<String>,
720    /// Last published pending catalog sources.
721    pub(crate) last_pending_catalog_sources: BTreeSet<String>,
722}
723
724#[cfg(test)]
725mod tests {
726    use super::{
727        CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS, InlinePeerNotificationPolicy,
728    };
729    use crate::comms::{SendError, TrustedPeerSpec};
730    use async_trait::async_trait;
731    use std::sync::Arc;
732    use tokio::sync::Notify;
733
734    struct NoopCommsRuntime {
735        notify: Arc<Notify>,
736    }
737
738    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
739    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
740    impl CommsRuntime for NoopCommsRuntime {
741        async fn drain_messages(&self) -> Vec<String> {
742            Vec::new()
743        }
744
745        fn inbox_notify(&self) -> std::sync::Arc<Notify> {
746            self.notify.clone()
747        }
748
749        async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
750            Vec::new()
751        }
752    }
753
754    #[tokio::test]
755    async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
756        let runtime = NoopCommsRuntime {
757            notify: Arc::new(Notify::new()),
758        };
759        assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
760        let peer = TrustedPeerSpec {
761            name: "peer-a".to_string(),
762            peer_id: "ed25519:test".to_string(),
763            address: "inproc://peer-a".to_string(),
764        };
765        let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
766        assert!(matches!(result, Err(SendError::Unsupported(_))));
767    }
768
769    #[tokio::test]
770    async fn test_remove_trusted_peer_default_unsupported() {
771        let runtime = NoopCommsRuntime {
772            notify: Arc::new(Notify::new()),
773        };
774        let result =
775            <NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, "ed25519:test").await;
776        assert!(matches!(result, Err(SendError::Unsupported(_))));
777    }
778
779    #[test]
780    fn test_inline_peer_notification_policy_from_raw() {
781        assert_eq!(
782            InlinePeerNotificationPolicy::try_from_raw(None),
783            Ok(InlinePeerNotificationPolicy::AtMost(
784                DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
785            ))
786        );
787        assert_eq!(
788            InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
789            Ok(InlinePeerNotificationPolicy::Always)
790        );
791        assert_eq!(
792            InlinePeerNotificationPolicy::try_from_raw(Some(0)),
793            Ok(InlinePeerNotificationPolicy::Never)
794        );
795        assert_eq!(
796            InlinePeerNotificationPolicy::try_from_raw(Some(25)),
797            Ok(InlinePeerNotificationPolicy::AtMost(25))
798        );
799        assert_eq!(
800            InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
801            Err(-42)
802        );
803    }
804
805    /// UNIT-001: OperationStatus::is_terminal() returns true for all terminal
806    /// variants and false for non-terminal ones.
807    #[test]
808    fn unit_001_terminal_status_values() {
809        use crate::ops_lifecycle::OperationStatus;
810        assert!(OperationStatus::Completed.is_terminal());
811        assert!(OperationStatus::Failed.is_terminal());
812        assert!(OperationStatus::Cancelled.is_terminal());
813        assert!(OperationStatus::Aborted.is_terminal());
814        assert!(OperationStatus::Retired.is_terminal());
815        assert!(OperationStatus::Terminated.is_terminal());
816        assert!(!OperationStatus::Running.is_terminal());
817        assert!(!OperationStatus::Provisioning.is_terminal());
818        assert!(!OperationStatus::Retiring.is_terminal());
819        assert!(!OperationStatus::Absent.is_terminal());
820    }
821
822    /// UNIT-002: DetachedOpCompletion serializes without operation_id.
823    /// The app-facing control noun is job_id (CONTRACT-003).
824    #[test]
825    fn unit_002_detached_op_completion_has_no_operation_id() {
826        use crate::agent::DetachedOpCompletion;
827        use crate::ops_lifecycle::{OperationKind, OperationStatus};
828
829        let completion = DetachedOpCompletion {
830            job_id: "j_test".into(),
831            kind: OperationKind::BackgroundToolOp,
832            status: OperationStatus::Completed,
833            terminal_outcome: None,
834            display_name: "test cmd".into(),
835            detail: "ok".into(),
836            elapsed_ms: None,
837        };
838        #[allow(clippy::unwrap_used)]
839        let json = serde_json::to_value(&completion).unwrap();
840        assert!(
841            json.get("operation_id").is_none(),
842            "operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
843        );
844        assert!(
845            json.get("job_id").is_some(),
846            "job_id must be the app-facing control noun"
847        );
848    }
849}