mod builder;
pub mod comms_impl;
pub mod compact;
mod extraction;
mod hook_impl;
#[cfg(test)]
mod hooks_behavior_tests;
mod runner;
pub mod skills;
mod state;
#[cfg(test)]
#[doc(hidden)]
pub(crate) mod test_turn_state_handle;
use crate::budget::Budget;
use crate::comms::{
CommsCommand, CommsTrustMutation, CommsTrustMutationResult, EventStream, PeerDirectoryEntry,
PeerId, SendAndStreamError, SendError, SendReceipt, StreamError, StreamScope,
TrustedPeerDescriptor,
};
use crate::compact::SessionCompactionCadence;
use crate::completion_feed::CompletionSeq;
use crate::config::{AgentConfig, HookRunOverrides};
use crate::error::AgentError;
use crate::event::ExternalToolDelta;
use crate::hooks::HookEngine;
use crate::lifecycle::RunId;
use crate::lifecycle::run_primitive::ProviderParamsOverride;
use crate::ops::OperationId;
use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
use crate::retry::RetryPolicy;
use crate::schema::{CompiledSchema, SchemaError};
use crate::session::Session;
use crate::state::LoopState;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use crate::tool_catalog::{
ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
select_catalog_mode_from_snapshot,
};
use crate::tool_scope::ToolScope;
use crate::turn_execution_authority::{
ContentShape, TurnPhase, TurnPrimitiveKind, TurnTerminalCauseKind, TurnTerminalOutcome,
};
use crate::types::{
AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
ToolDef, ToolName, ToolNameSet, Usage,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
pub use builder::{AgentBuildPolicyError, AgentBuilder, DefaultSystemPromptPolicy};
pub use runner::{AgentRunner, SnapshotProjectionError, SystemContextStateError};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentLlmClient: Send + Sync {
async fn stream_response(
&self,
messages: &[Message],
tools: &[Arc<ToolDef>],
max_tokens: u32,
temperature: Option<f32>,
provider_params: Option<&ProviderParamsOverride>,
) -> Result<LlmStreamResult, AgentError>;
fn provider(&self) -> crate::provider::Provider;
fn model(&self) -> &str;
fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
Ok(CompiledSchema {
schema: output_schema.schema.as_value().clone(),
warnings: Vec::new(),
})
}
}
pub type AgentLlmClientDecorator =
Arc<dyn Fn(Arc<dyn AgentLlmClient>) -> Arc<dyn AgentLlmClient> + Send + Sync + 'static>;
pub struct LlmStreamResult {
blocks: Vec<AssistantBlock>,
stop_reason: StopReason,
usage: Usage,
}
impl LlmStreamResult {
pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
Self {
blocks,
stop_reason,
usage,
}
}
pub fn blocks(&self) -> &[AssistantBlock] {
&self.blocks
}
pub fn stop_reason(&self) -> StopReason {
self.stop_reason
}
pub fn usage(&self) -> &Usage {
&self.usage
}
pub fn into_message(self) -> BlockAssistantMessage {
BlockAssistantMessage::new(self.blocks, self.stop_reason)
}
pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
(self.blocks, self.stop_reason, self.usage)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentExecutionSnapshot {
pub loop_state: LoopState,
pub turn_phase: TurnPhase,
pub turn_terminal: bool,
pub active_run_id: Option<RunId>,
pub primitive_kind: TurnPrimitiveKind,
pub admitted_content_shape: Option<ContentShape>,
pub vision_enabled: bool,
pub image_tool_results_enabled: bool,
pub tool_calls_pending: u32,
pub pending_operation_ids: Option<Vec<OperationId>>,
pub barrier_operation_ids: Vec<OperationId>,
pub has_barrier_ops: bool,
pub barrier_satisfied: bool,
pub boundary_count: u32,
pub cancel_after_boundary: bool,
pub terminal_outcome: TurnTerminalOutcome,
pub terminal_cause_kind: Option<TurnTerminalCauseKind>,
pub extraction_attempts: u32,
pub max_extraction_retries: u32,
pub applied_cursor: CompletionSeq,
}
#[derive(Debug, Clone, Default)]
pub struct ExternalToolUpdate {
pub notices: Vec<ExternalToolDelta>,
pub pending: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CancelAfterBoundaryCommand;
pub type CancelAfterBoundarySender = tokio::sync::mpsc::UnboundedSender<CancelAfterBoundaryCommand>;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ToolDispatchContext {
current_turn: Option<CurrentTurnContent>,
turn_metadata: BTreeMap<String, serde_json::Value>,
}
impl ToolDispatchContext {
pub fn from_current_turn_input(input: &crate::types::ContentInput) -> Self {
let blocks = match input {
crate::types::ContentInput::Text(_) => None,
crate::types::ContentInput::Blocks(blocks) => Some(blocks.clone()),
};
Self {
current_turn: blocks.map(CurrentTurnContent::new),
turn_metadata: BTreeMap::new(),
}
}
pub fn from_run_input(input: &crate::types::RunInput) -> Self {
match input {
crate::types::RunInput::Content { content } => Self::from_current_turn_input(content),
crate::types::RunInput::PendingToolResults => Self::default(),
}
}
#[must_use]
pub fn with_turn_metadata(mut self, metadata: BTreeMap<String, serde_json::Value>) -> Self {
self.turn_metadata = metadata;
self
}
pub fn turn_metadata(&self, key: &str) -> Option<&serde_json::Value> {
self.turn_metadata.get(key)
}
pub fn current_turn(&self) -> Option<&CurrentTurnContent> {
self.current_turn.as_ref()
}
pub fn current_turn_image(
&self,
image_ref: CurrentTurnImageRef,
) -> Option<&crate::types::ContentBlock> {
self.current_turn
.as_ref()
.and_then(|current_turn| current_turn.image(image_ref))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct CurrentTurnImageRef(usize);
impl std::fmt::Display for CurrentTurnImageRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CurrentTurnContent {
blocks: Vec<crate::types::ContentBlock>,
}
impl CurrentTurnContent {
pub fn new(blocks: Vec<crate::types::ContentBlock>) -> Self {
Self { blocks }
}
pub fn blocks(&self) -> &[crate::types::ContentBlock] {
&self.blocks
}
pub fn image_ref(&self, n: usize) -> Option<CurrentTurnImageRef> {
self.images().nth(n).map(|_| CurrentTurnImageRef(n))
}
pub fn image(&self, image_ref: CurrentTurnImageRef) -> Option<&crate::types::ContentBlock> {
self.images().nth(image_ref.0)
}
fn images(&self) -> impl Iterator<Item = &crate::types::ContentBlock> {
self.blocks
.iter()
.filter(|block| matches!(block, crate::types::ContentBlock::Image { .. }))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetachedOpCompletion {
pub job_id: String,
pub kind: OperationKind,
pub status: OperationStatus,
pub terminal_outcome: Option<OperationTerminalOutcome>,
pub display_name: String,
pub detail: String,
pub elapsed_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DispatcherCapabilities {
pub ops_lifecycle: bool,
}
pub enum BindOutcome {
Bound(Arc<dyn AgentToolDispatcher>),
Skipped(Arc<dyn AgentToolDispatcher>),
}
impl BindOutcome {
pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
match self {
Self::Bound(d) | Self::Skipped(d) => d,
}
}
pub fn was_bound(&self) -> bool {
matches!(self, Self::Bound(_))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentToolDispatcher: Send + Sync {
fn tools(&self) -> Arc<[Arc<ToolDef>]>;
fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
ToolCatalogCapabilities::default()
}
fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
self.tools()
.iter()
.map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
.collect::<Vec<_>>()
.into()
}
fn pending_catalog_sources(&self) -> Arc<[String]> {
Arc::from([])
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
async fn dispatch_with_context(
&self,
call: ToolCallView<'_>,
_context: &ToolDispatchContext,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
self.dispatch(call).await
}
async fn poll_external_updates(&self) -> ExternalToolUpdate {
ExternalToolUpdate::default()
}
fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
None
}
fn capabilities(&self) -> DispatcherCapabilities {
DispatcherCapabilities::default()
}
fn bind_ops_lifecycle(
self: Arc<Self>,
_registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
_owner_bridge_session_id: crate::types::SessionId,
) -> Result<BindOutcome, OpsLifecycleBindError> {
Err(OpsLifecycleBindError::Unsupported)
}
fn completion_enrichment(
&self,
) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
None
}
fn bind_mcp_server_lifecycle_handle(
&self,
_handle: Arc<dyn crate::handles::McpServerLifecycleHandle>,
) {
}
fn bind_external_tool_surface_handle(
&self,
_handle: Arc<dyn crate::handles::ExternalToolSurfaceHandle>,
) {
}
}
pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
where
T: AgentToolDispatcher + ?Sized,
{
let capabilities = dispatcher.tool_catalog_capabilities();
if !capabilities.exact_catalog {
return ToolCatalogMode::Inline;
}
let pending_sources = dispatcher.pending_catalog_sources();
let catalog = dispatcher.tool_catalog();
select_catalog_mode_from_snapshot(
capabilities.exact_catalog,
catalog.as_ref(),
pending_sources.as_ref(),
)
}
pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
where
T: AgentToolDispatcher + ?Sized,
{
let capabilities = dispatcher.tool_catalog_capabilities();
if !capabilities.exact_catalog {
return false;
}
if capabilities.may_require_catalog_control_plane {
return true;
}
let pending_sources = dispatcher.pending_catalog_sources();
if !pending_sources.is_empty() {
return true;
}
let catalog = dispatcher.tool_catalog();
deferred_session_entry_count(catalog.as_ref()) > 0
}
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum OpsLifecycleBindError {
#[error("ops lifecycle binding is unsupported")]
Unsupported,
#[error("dispatcher has shared ownership and cannot be rebound")]
SharedOwnership,
}
pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
inner: Arc<T>,
allowed_tools: ToolNameSet,
filtered_tools: Arc<[Arc<ToolDef>]>,
}
impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
pub fn new<I, N>(inner: Arc<T>, allowed_tools: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ToolName>,
{
let allowed_set: ToolNameSet = allowed_tools
.into_iter()
.map(Into::into)
.collect::<ToolNameSet>();
let filtered: Vec<Arc<ToolDef>> = if inner.tool_catalog_capabilities().exact_catalog {
inner
.tool_catalog()
.iter()
.filter(|entry| entry.currently_callable())
.map(|entry| Arc::clone(&entry.tool))
.filter(|t| allowed_set.contains(t.name.as_str()))
.collect()
} else {
inner
.tools()
.iter()
.filter(|t| allowed_set.contains(t.name.as_str()))
.map(Arc::clone)
.collect()
};
Self {
inner,
allowed_tools: allowed_set,
filtered_tools: filtered.into(),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
if self.inner.tool_catalog_capabilities().exact_catalog {
return self
.inner
.tool_catalog()
.iter()
.filter(|entry| entry.currently_callable())
.map(|entry| Arc::clone(&entry.tool))
.filter(|tool| self.allowed_tools.contains(tool.name.as_str()))
.collect::<Vec<_>>()
.into();
}
Arc::clone(&self.filtered_tools)
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
self.dispatch_with_context(call, &ToolDispatchContext::default())
.await
}
async fn dispatch_with_context(
&self,
call: ToolCallView<'_>,
context: &ToolDispatchContext,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
if !self.allowed_tools.contains(call.name) {
let inner_knows_tool = if self.inner.tool_catalog_capabilities().exact_catalog {
self.inner
.tool_catalog()
.iter()
.any(|entry| entry.tool.name == call.name)
} else {
self.inner.tools().iter().any(|tool| tool.name == call.name)
};
if !inner_knows_tool {
return Err(crate::error::ToolError::not_found(call.name));
}
return Err(crate::error::ToolError::access_denied(call.name));
}
self.inner.dispatch_with_context(call, context).await
}
fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
self.inner.tool_catalog_capabilities()
}
fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
if !self.inner.tool_catalog_capabilities().exact_catalog {
return self
.tools()
.iter()
.map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
.collect::<Vec<_>>()
.into();
}
self.inner
.tool_catalog()
.iter()
.filter(|entry| self.allowed_tools.contains(entry.tool.name.as_str()))
.cloned()
.collect::<Vec<_>>()
.into()
}
fn pending_catalog_sources(&self) -> Arc<[String]> {
self.inner.pending_catalog_sources()
}
async fn poll_external_updates(&self) -> ExternalToolUpdate {
self.inner.poll_external_updates().await
}
fn external_tool_surface_snapshot(&self) -> Option<crate::ExternalToolSurfaceSnapshot> {
self.inner.external_tool_surface_snapshot()
}
fn capabilities(&self) -> DispatcherCapabilities {
self.inner.capabilities()
}
fn bind_ops_lifecycle(
self: Arc<Self>,
registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
owner_bridge_session_id: crate::types::SessionId,
) -> Result<BindOutcome, OpsLifecycleBindError> {
let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
if Arc::strong_count(&owned.inner) == 1 {
let outcome = owned
.inner
.bind_ops_lifecycle(registry, owner_bridge_session_id)?;
let bound = outcome.was_bound();
let d = outcome.into_dispatcher();
let allowed_tools = owned.allowed_tools.into_iter().collect::<Vec<_>>();
Ok(if bound {
BindOutcome::Bound(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
} else {
BindOutcome::Skipped(Arc::new(FilteredToolDispatcher::new(d, allowed_tools)))
})
} else {
Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
inner: owned.inner,
allowed_tools: owned.allowed_tools,
filtered_tools: owned.filtered_tools,
})))
}
}
fn completion_enrichment(
&self,
) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
self.inner.completion_enrichment()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentSessionStore: Send + Sync {
async fn save(&self, session: &Session) -> Result<(), AgentError>;
async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InlinePeerNotificationPolicy {
Always,
Never,
AtMost(usize),
}
pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
impl InlinePeerNotificationPolicy {
pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
match raw {
None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
Some(-1) => Ok(Self::Always),
Some(0) => Ok(Self::Never),
Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
Some(v) => Err(v),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum CommsCapabilityError {
#[error("comms capability not supported: {0}")]
Unsupported(String),
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait CommsRuntime: Send + Sync {
fn peer_id(&self) -> Option<PeerId> {
self.public_key()
.as_deref()
.and_then(|public_key| PeerId::parse(public_key).ok())
}
fn public_key(&self) -> Option<String> {
None
}
fn public_key_bytes(&self) -> Option<[u8; 32]> {
None
}
fn comms_name(&self) -> Option<String> {
None
}
fn advertised_address(&self) -> Option<String> {
None
}
fn bridge_bootstrap_token(&self) -> Option<String> {
None
}
async fn apply_trust_mutation(
&self,
_mutation: CommsTrustMutation,
) -> Result<CommsTrustMutationResult, SendError> {
Err(SendError::Unsupported(
"apply_trust_mutation not supported for this CommsRuntime".to_string(),
))
}
async fn install_generated_mob_trust_owner(
&self,
_owner: Arc<dyn std::any::Any + Send + Sync>,
) -> Result<(), SendError> {
Err(SendError::Unsupported(
"generated mob trust owner binding not supported for this CommsRuntime".to_string(),
))
}
async fn validate_recovered_generated_mob_trust_owner(
&self,
_owner: Arc<dyn std::any::Any + Send + Sync>,
) -> Result<(), SendError> {
Err(SendError::Unsupported(
"recovered generated mob trust owner validation not supported for this CommsRuntime"
.to_string(),
))
}
async fn install_recovered_generated_mob_trust_owner(
&self,
_owner: Arc<dyn std::any::Any + Send + Sync>,
) -> Result<(), SendError> {
Err(SendError::Unsupported(
"recovered generated mob trust owner binding not supported for this CommsRuntime"
.to_string(),
))
}
async fn add_private_trusted_peer(
&self,
_peer: TrustedPeerDescriptor,
) -> Result<(), SendError> {
Err(SendError::Unsupported(
"generated comms private trust mutation authority required".to_string(),
))
}
async fn remove_private_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
Err(SendError::Unsupported(
"generated comms private trust mutation authority required".to_string(),
))
}
async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
Err(SendError::Unsupported(
"send not implemented for this CommsRuntime".to_string(),
))
}
#[doc(hidden)]
fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
let scope_desc = match scope {
StreamScope::Session(session_id) => format!("session {session_id}"),
StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
};
Err(StreamError::NotFound(scope_desc))
}
async fn peers(&self) -> Vec<PeerDirectoryEntry> {
Vec::new()
}
async fn peer_count(&self) -> usize {
self.peers().await.len()
}
#[doc(hidden)]
async fn send_and_stream(
&self,
cmd: CommsCommand,
) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
let receipt = self.send(cmd).await?;
Err(SendAndStreamError::StreamAttach {
receipt,
error: StreamError::Internal(
"send_and_stream is not implemented for this runtime".to_string(),
),
})
}
async fn drain_messages(&self) -> Vec<String>;
fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
fn dismiss_received(&self) -> bool {
false
}
fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
None
}
#[doc(hidden)]
fn interaction_event_injector(
&self,
) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
None
}
async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
self.drain_messages()
.await
.into_iter()
.map(|text| crate::interaction::InboxInteraction {
id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
from_route: None,
from: "unknown".into(),
content: crate::interaction::InteractionContent::Message {
body: text.clone(),
blocks: None,
},
rendered_text: text,
handling_mode: crate::types::HandlingMode::Queue,
render_metadata: None,
})
.collect()
}
fn interaction_subscriber(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
None
}
fn take_interaction_stream_sender(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
self.interaction_subscriber(_id)
}
fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
fn peer_interaction_handle(
&self,
) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
None
}
fn peer_request_response_authority_handle(
&self,
) -> Option<std::sync::Arc<dyn crate::handles::PeerInteractionHandle>> {
None
}
async fn drain_classified_inbox_interactions(
&self,
) -> Result<Vec<crate::interaction::ClassifiedInboxInteraction>, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"drain_classified_inbox_interactions".to_string(),
))
}
async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
self.drain_classified_inbox_interactions()
.await
.unwrap_or_default()
}
async fn peer_ingress_queue_snapshot(
&self,
) -> Result<crate::interaction::PeerIngressQueueSnapshot, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"peer_ingress_queue_snapshot".to_string(),
))
}
async fn peer_ingress_runtime_snapshot(
&self,
) -> Result<crate::interaction::PeerIngressRuntimeSnapshot, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"peer_ingress_runtime_snapshot".to_string(),
))
}
async fn public_trusted_peer_projection_snapshot(
&self,
) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"public_trusted_peer_projection_snapshot".to_string(),
))
}
async fn trusted_peer_projection_snapshot_for_source(
&self,
_source_kind: crate::comms::GeneratedCommsTrustAuthoritySourceKind,
) -> Result<Vec<crate::comms::TrustedPeerDescriptor>, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"trusted_peer_projection_snapshot_for_source".to_string(),
))
}
fn actionable_input_notify(&self) -> Result<Arc<tokio::sync::Notify>, CommsCapabilityError> {
Err(CommsCapabilityError::Unsupported(
"actionable_input_notify".to_string(),
))
}
}
pub struct Agent<C, T, S>
where
C: AgentLlmClient + ?Sized,
T: AgentToolDispatcher + ?Sized,
S: AgentSessionStore + ?Sized,
{
config: AgentConfig,
client: Arc<C>,
tools: Arc<T>,
tool_scope: ToolScope,
store: Arc<S>,
session: Session,
budget: Budget,
retry_policy: RetryPolicy,
depth: u32,
pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
pub(super) hook_run_overrides: HookRunOverrides,
pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
pub(crate) last_input_tokens: u64,
pub(crate) compaction_cadence: SessionCompactionCadence,
pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
pub(crate) event_tap: crate::event_tap::EventTap,
pub(crate) system_context_state:
Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
pub(crate) terminal_error_detail: Option<String>,
pub(crate) run_completed_hooks_applied: bool,
pub(crate) run_completed_event_emitted: bool,
#[allow(dead_code)] pub(crate) silent_comms_intents: Vec<String>,
pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
pub(crate) completion_enrichment:
Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
pub(crate) mob_authority_handle:
Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
pub(crate) turn_state_handle: Option<Arc<dyn crate::TurnStateHandle>>,
pub(crate) runtime_execution_kind_required: bool,
pub(crate) runtime_execution_kind: Option<crate::lifecycle::RuntimeExecutionKind>,
pub(crate) external_tool_surface_handle: Option<Arc<dyn crate::ExternalToolSurfaceHandle>>,
pub(crate) auth_lease_handle: Option<crate::handles::GeneratedAuthLeaseHandle>,
pub(crate) mcp_server_lifecycle_handle:
Option<Arc<dyn crate::handles::McpServerLifecycleHandle>>,
pub(crate) cancel_after_boundary_tx: CancelAfterBoundarySender,
pub(crate) cancel_after_boundary_rx:
tokio::sync::mpsc::UnboundedReceiver<CancelAfterBoundaryCommand>,
pub(crate) model_defaults_resolver:
Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
pub(crate) extraction_state: extraction::ExtractionState,
pub(crate) last_hidden_deferred_catalog_names: BTreeSet<crate::types::ToolName>,
pub(crate) last_pending_catalog_sources: BTreeSet<String>,
pub(crate) tool_dispatch_context: ToolDispatchContext,
pub(crate) turn_tool_dispatch_metadata: BTreeMap<String, serde_json::Value>,
pub(crate) tools_config: crate::config::ToolsConfig,
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::{
AgentToolDispatcher, CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS,
FilteredToolDispatcher, InlinePeerNotificationPolicy, ToolDispatchContext,
};
use crate::comms::{
PeerAddress, PeerId, PeerName, PeerTransport, SendError, TrustedPeerDescriptor,
};
use crate::types::{ContentBlock, ContentInput, ToolCallView, ToolDef, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::Notify;
struct NoopCommsRuntime {
notify: Arc<Notify>,
}
struct ContextAwareToolDispatcher;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AgentToolDispatcher for ContextAwareToolDispatcher {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
Arc::from([Arc::new(ToolDef {
name: "inspect_context".into(),
description: "inspect context".to_string(),
input_schema: json!({"type": "object"}),
provenance: None,
})])
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
Ok(ToolResult::new(
call.id.to_string(),
json!({"saw_context_image": false}).to_string(),
false,
)
.into())
}
async fn dispatch_with_context(
&self,
call: ToolCallView<'_>,
context: &ToolDispatchContext,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
let saw_context_image = context
.current_turn()
.and_then(|turn| turn.image_ref(0))
.and_then(|image_ref| context.current_turn_image(image_ref))
.is_some();
Ok(ToolResult::new(
call.id.to_string(),
json!({"saw_context_image": saw_context_image}).to_string(),
false,
)
.into())
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl CommsRuntime for NoopCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> std::sync::Arc<Notify> {
self.notify.clone()
}
}
#[tokio::test]
async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
let peer = TrustedPeerDescriptor {
peer_id: PeerId::new(),
name: PeerName::new("peer-a").expect("valid peer name"),
address: PeerAddress::new(PeerTransport::Inproc, "peer-a"),
pubkey: [0u8; 32],
};
let result =
<NoopCommsRuntime as CommsRuntime>::add_private_trusted_peer(&runtime, peer).await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
#[tokio::test]
async fn filtered_tool_dispatcher_preserves_dispatch_context() {
let dispatcher =
FilteredToolDispatcher::new(Arc::new(ContextAwareToolDispatcher), ["inspect_context"]);
let args = serde_json::value::RawValue::from_string("{}".to_string())
.expect("empty object should be valid JSON");
let call = ToolCallView {
id: "ctx-1",
name: "inspect_context",
args: &args,
};
let context = ToolDispatchContext::from_current_turn_input(&ContentInput::Blocks(vec![
ContentBlock::Image {
media_type: "image/png".to_string(),
data: "abc".into(),
},
]));
let outcome = dispatcher
.dispatch_with_context(call, &context)
.await
.expect("filtered wrapper should dispatch");
let payload: serde_json::Value =
serde_json::from_str(&outcome.result.text_content()).expect("tool result JSON");
assert_eq!(payload["saw_context_image"], true);
}
#[test]
fn test_inline_peer_notification_policy_from_raw() {
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(None),
Ok(InlinePeerNotificationPolicy::AtMost(
DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
Ok(InlinePeerNotificationPolicy::Always)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(0)),
Ok(InlinePeerNotificationPolicy::Never)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(25)),
Ok(InlinePeerNotificationPolicy::AtMost(25))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
Err(-42)
);
}
#[test]
fn unit_002_detached_op_completion_has_no_operation_id() {
use crate::agent::DetachedOpCompletion;
use crate::ops_lifecycle::{OperationKind, OperationStatus};
let completion = DetachedOpCompletion {
job_id: "j_test".into(),
kind: OperationKind::BackgroundToolOp,
status: OperationStatus::Completed,
terminal_outcome: None,
display_name: "test cmd".into(),
detail: "ok".into(),
elapsed_ms: None,
};
#[allow(clippy::unwrap_used)]
let json = serde_json::to_value(&completion).unwrap();
assert!(
json.get("operation_id").is_none(),
"operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
);
assert!(
json.get("job_id").is_some(),
"job_id must be the app-facing control noun"
);
}
}