mod builder;
pub mod comms_impl;
pub mod compact;
mod extraction;
mod hook_impl;
mod runner;
pub mod skills;
mod state;
use crate::budget::Budget;
use crate::comms::{
CommsCommand, EventStream, PeerDirectoryEntry, SendError, SendReceipt, StreamError,
StreamScope, TrustedPeerSpec,
};
use crate::compact::SessionCompactionCadence;
use crate::config::{AgentConfig, HookRunOverrides};
use crate::error::AgentError;
use crate::event::ExternalToolDelta;
use crate::hooks::HookEngine;
use crate::ops_lifecycle::{OperationKind, OperationStatus, OperationTerminalOutcome};
use crate::retry::RetryPolicy;
use crate::schema::{CompiledSchema, SchemaError};
use crate::session::Session;
use crate::state::LoopState;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use crate::tool_catalog::{
ToolCatalogCapabilities, ToolCatalogEntry, ToolCatalogMode, deferred_session_entry_count,
select_catalog_mode_from_snapshot,
};
use crate::tool_scope::ToolScope;
use crate::types::{
AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
ToolDef, Usage,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;
pub use builder::AgentBuilder;
pub use runner::AgentRunner;
#[deprecated(
since = "0.2.0",
note = "Use ToolError::CallbackPending or AgentError::CallbackPending instead"
)]
pub const CALLBACK_TOOL_PREFIX: &str = "CALLBACK_TOOL_PENDING:";
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentLlmClient: Send + Sync {
async fn stream_response(
&self,
messages: &[Message],
tools: &[Arc<ToolDef>],
max_tokens: u32,
temperature: Option<f32>,
provider_params: Option<&Value>,
) -> Result<LlmStreamResult, AgentError>;
fn provider(&self) -> &'static str;
fn model(&self) -> &str;
fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
Ok(CompiledSchema {
schema: output_schema.schema.as_value().clone(),
warnings: Vec::new(),
})
}
}
pub struct LlmStreamResult {
blocks: Vec<AssistantBlock>,
stop_reason: StopReason,
usage: Usage,
}
impl LlmStreamResult {
pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
Self {
blocks,
stop_reason,
usage,
}
}
pub fn blocks(&self) -> &[AssistantBlock] {
&self.blocks
}
pub fn stop_reason(&self) -> StopReason {
self.stop_reason
}
pub fn usage(&self) -> &Usage {
&self.usage
}
pub fn into_message(self) -> BlockAssistantMessage {
BlockAssistantMessage {
blocks: self.blocks,
stop_reason: self.stop_reason,
}
}
pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
(self.blocks, self.stop_reason, self.usage)
}
}
#[derive(Debug, Clone, Default)]
pub struct ExternalToolUpdate {
pub notices: Vec<ExternalToolDelta>,
pub pending: Vec<String>,
pub background_completions: Vec<DetachedOpCompletion>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetachedOpCompletion {
pub job_id: String,
pub kind: OperationKind,
pub status: OperationStatus,
pub terminal_outcome: Option<OperationTerminalOutcome>,
pub display_name: String,
pub detail: String,
pub elapsed_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DispatcherCapabilities {
pub ops_lifecycle: bool,
}
pub enum BindOutcome {
Bound(Arc<dyn AgentToolDispatcher>),
Skipped(Arc<dyn AgentToolDispatcher>),
}
impl BindOutcome {
pub fn into_dispatcher(self) -> Arc<dyn AgentToolDispatcher> {
match self {
Self::Bound(d) | Self::Skipped(d) => d,
}
}
pub fn was_bound(&self) -> bool {
matches!(self, Self::Bound(_))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentToolDispatcher: Send + Sync {
fn tools(&self) -> Arc<[Arc<ToolDef>]>;
fn tool_catalog_capabilities(&self) -> ToolCatalogCapabilities {
ToolCatalogCapabilities::default()
}
fn tool_catalog(&self) -> Arc<[ToolCatalogEntry]> {
self.tools()
.iter()
.map(|tool| ToolCatalogEntry::session_inline(Arc::clone(tool), true))
.collect::<Vec<_>>()
.into()
}
fn pending_catalog_sources(&self) -> Arc<[String]> {
Arc::from([])
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError>;
async fn poll_external_updates(&self) -> ExternalToolUpdate {
ExternalToolUpdate::default()
}
fn capabilities(&self) -> DispatcherCapabilities {
DispatcherCapabilities::default()
}
fn bind_ops_lifecycle(
self: Arc<Self>,
_registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
_owner_session_id: crate::types::SessionId,
) -> Result<BindOutcome, OpsLifecycleBindError> {
Err(OpsLifecycleBindError::Unsupported)
}
fn completion_enrichment(
&self,
) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
None
}
}
pub fn select_tool_catalog_mode<T>(dispatcher: &T) -> ToolCatalogMode
where
T: AgentToolDispatcher + ?Sized,
{
let capabilities = dispatcher.tool_catalog_capabilities();
if !capabilities.exact_catalog {
return ToolCatalogMode::Inline;
}
let pending_sources = dispatcher.pending_catalog_sources();
let catalog = dispatcher.tool_catalog();
select_catalog_mode_from_snapshot(
capabilities.exact_catalog,
catalog.as_ref(),
pending_sources.as_ref(),
)
}
pub fn should_compose_tool_catalog_control_plane<T>(dispatcher: &T) -> bool
where
T: AgentToolDispatcher + ?Sized,
{
let capabilities = dispatcher.tool_catalog_capabilities();
if !capabilities.exact_catalog {
return false;
}
if capabilities.may_require_catalog_control_plane {
return true;
}
let pending_sources = dispatcher.pending_catalog_sources();
if !pending_sources.is_empty() {
return true;
}
let catalog = dispatcher.tool_catalog();
deferred_session_entry_count(catalog.as_ref()) > 0
}
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum OpsLifecycleBindError {
#[error("ops lifecycle binding is unsupported")]
Unsupported,
#[error("dispatcher has shared ownership and cannot be rebound")]
SharedOwnership,
}
pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
inner: Arc<T>,
allowed_tools: HashSet<String>,
filtered_tools: Arc<[Arc<ToolDef>]>,
}
impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
pub fn new(inner: Arc<T>, allowed_tools: Vec<String>) -> Self {
let allowed_set: HashSet<String> = allowed_tools.into_iter().collect();
let inner_tools = inner.tools();
let filtered: Vec<Arc<ToolDef>> = inner_tools
.iter()
.filter(|t| allowed_set.contains(t.name.as_str()))
.map(Arc::clone)
.collect();
Self {
inner,
allowed_tools: allowed_set,
filtered_tools: filtered.into(),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
Arc::clone(&self.filtered_tools)
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<crate::ops::ToolDispatchOutcome, crate::error::ToolError> {
if !self.allowed_tools.contains(call.name) {
return Err(crate::error::ToolError::access_denied(call.name));
}
self.inner.dispatch(call).await
}
async fn poll_external_updates(&self) -> ExternalToolUpdate {
self.inner.poll_external_updates().await
}
fn capabilities(&self) -> DispatcherCapabilities {
self.inner.capabilities()
}
fn bind_ops_lifecycle(
self: Arc<Self>,
registry: Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>,
owner_session_id: crate::types::SessionId,
) -> Result<BindOutcome, OpsLifecycleBindError> {
let owned = Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
if Arc::strong_count(&owned.inner) == 1 {
let outcome = owned.inner.bind_ops_lifecycle(registry, owner_session_id)?;
let bound = outcome.was_bound();
let d = outcome.into_dispatcher();
Ok(if bound {
BindOutcome::Bound(Arc::new(FilteredToolDispatcher {
inner: d,
allowed_tools: owned.allowed_tools,
filtered_tools: owned.filtered_tools,
}))
} else {
BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
inner: d,
allowed_tools: owned.allowed_tools,
filtered_tools: owned.filtered_tools,
}))
})
} else {
Ok(BindOutcome::Skipped(Arc::new(FilteredToolDispatcher {
inner: owned.inner,
allowed_tools: owned.allowed_tools,
filtered_tools: owned.filtered_tools,
})))
}
}
fn completion_enrichment(
&self,
) -> Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>> {
self.inner.completion_enrichment()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentSessionStore: Send + Sync {
async fn save(&self, session: &Session) -> Result<(), AgentError>;
async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InlinePeerNotificationPolicy {
Always,
Never,
AtMost(usize),
}
pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
impl InlinePeerNotificationPolicy {
pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
match raw {
None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
Some(-1) => Ok(Self::Always),
Some(0) => Ok(Self::Never),
Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
Some(v) => Err(v),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum CommsCapabilityError {
#[error("comms capability not supported: {0}")]
Unsupported(String),
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait CommsRuntime: Send + Sync {
fn public_key(&self) -> Option<String> {
None
}
async fn add_trusted_peer(&self, _peer: TrustedPeerSpec) -> Result<(), SendError> {
Err(SendError::Unsupported(
"add_trusted_peer not supported for this CommsRuntime".to_string(),
))
}
async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
Err(SendError::Unsupported(
"remove_trusted_peer not supported for this CommsRuntime".to_string(),
))
}
async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
Err(SendError::Unsupported(
"send not implemented for this CommsRuntime".to_string(),
))
}
#[doc(hidden)]
fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
let scope_desc = match scope {
StreamScope::Session(session_id) => format!("session {session_id}"),
};
Err(StreamError::NotFound(scope_desc))
}
async fn peers(&self) -> Vec<PeerDirectoryEntry> {
Vec::new()
}
async fn peer_count(&self) -> usize {
self.peers().await.len()
}
async fn drain_messages(&self) -> Vec<String>;
fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
fn dismiss_received(&self) -> bool {
false
}
fn event_injector(&self) -> Option<Arc<dyn crate::EventInjector>> {
None
}
#[doc(hidden)]
fn interaction_event_injector(
&self,
) -> Option<Arc<dyn crate::event_injector::SubscribableInjector>> {
None
}
async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
self.drain_messages()
.await
.into_iter()
.map(|text| crate::interaction::InboxInteraction {
id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
from: "unknown".into(),
content: crate::interaction::InteractionContent::Message {
body: text.clone(),
blocks: None,
},
rendered_text: text,
handling_mode: crate::types::HandlingMode::Queue,
render_metadata: None,
})
.collect()
}
fn interaction_subscriber(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
None
}
fn take_interaction_stream_sender(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
self.interaction_subscriber(_id)
}
fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate>;
}
pub struct Agent<C, T, S>
where
C: AgentLlmClient + ?Sized,
T: AgentToolDispatcher + ?Sized,
S: AgentSessionStore + ?Sized,
{
config: AgentConfig,
client: Arc<C>,
tools: Arc<T>,
tool_scope: ToolScope,
store: Arc<S>,
session: Session,
budget: Budget,
retry_policy: RetryPolicy,
state: LoopState,
depth: u32,
pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
pub(super) hook_run_overrides: HookRunOverrides,
pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
pub(crate) last_input_tokens: u64,
pub(crate) compaction_cadence: SessionCompactionCadence,
pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
pub(crate) event_tap: crate::event_tap::EventTap,
pub(crate) system_context_state:
Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
#[allow(dead_code)] pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
pub(crate) blob_store: Option<Arc<dyn crate::BlobStore>>,
pub(crate) pending_fatal_diagnostic: Option<AgentError>,
#[allow(dead_code)] pub(crate) silent_comms_intents: Vec<String>,
pub(crate) ops_lifecycle: Option<Arc<dyn crate::ops_lifecycle::OpsLifecycleRegistry>>,
pub(crate) completion_feed: Option<Arc<dyn crate::completion_feed::CompletionFeed>>,
pub(crate) epoch_cursor_state: Option<Arc<crate::runtime_epoch::EpochCursorState>>,
pub(crate) applied_cursor: crate::completion_feed::CompletionSeq,
pub(crate) completion_enrichment:
Option<Arc<dyn crate::completion_feed::CompletionEnrichmentProvider>>,
pub(crate) mob_authority_handle:
Option<Arc<std::sync::RwLock<crate::service::MobToolAuthorityContext>>>,
pub(crate) turn_authority: crate::turn_execution_authority::TurnExecutionAuthority,
pub(crate) model_defaults_resolver:
Option<Arc<dyn crate::model_defaults::ModelOperationalDefaultsResolver>>,
pub(crate) call_timeout_override: crate::config::CallTimeoutOverride,
pub(crate) extraction_result: Option<serde_json::Value>,
pub(crate) extraction_schema_warnings: Option<Vec<crate::schema::SchemaWarning>>,
pub(crate) extraction_last_error: Option<String>,
pub(crate) last_hidden_deferred_catalog_names: BTreeSet<String>,
pub(crate) last_pending_catalog_sources: BTreeSet<String>,
}
#[cfg(test)]
mod tests {
use super::{
CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS, InlinePeerNotificationPolicy,
};
use crate::comms::{SendError, TrustedPeerSpec};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Notify;
struct NoopCommsRuntime {
notify: Arc<Notify>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl CommsRuntime for NoopCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> std::sync::Arc<Notify> {
self.notify.clone()
}
async fn drain_peer_input_candidates(&self) -> Vec<crate::interaction::PeerInputCandidate> {
Vec::new()
}
}
#[tokio::test]
async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
let peer = TrustedPeerSpec {
name: "peer-a".to_string(),
peer_id: "ed25519:test".to_string(),
address: "inproc://peer-a".to_string(),
};
let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
#[tokio::test]
async fn test_remove_trusted_peer_default_unsupported() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
let result =
<NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, "ed25519:test").await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
#[test]
fn test_inline_peer_notification_policy_from_raw() {
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(None),
Ok(InlinePeerNotificationPolicy::AtMost(
DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
Ok(InlinePeerNotificationPolicy::Always)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(0)),
Ok(InlinePeerNotificationPolicy::Never)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(25)),
Ok(InlinePeerNotificationPolicy::AtMost(25))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
Err(-42)
);
}
#[test]
fn unit_001_terminal_status_values() {
use crate::ops_lifecycle::OperationStatus;
assert!(OperationStatus::Completed.is_terminal());
assert!(OperationStatus::Failed.is_terminal());
assert!(OperationStatus::Cancelled.is_terminal());
assert!(OperationStatus::Aborted.is_terminal());
assert!(OperationStatus::Retired.is_terminal());
assert!(OperationStatus::Terminated.is_terminal());
assert!(!OperationStatus::Running.is_terminal());
assert!(!OperationStatus::Provisioning.is_terminal());
assert!(!OperationStatus::Retiring.is_terminal());
assert!(!OperationStatus::Absent.is_terminal());
}
#[test]
fn unit_002_detached_op_completion_has_no_operation_id() {
use crate::agent::DetachedOpCompletion;
use crate::ops_lifecycle::{OperationKind, OperationStatus};
let completion = DetachedOpCompletion {
job_id: "j_test".into(),
kind: OperationKind::BackgroundToolOp,
status: OperationStatus::Completed,
terminal_outcome: None,
display_name: "test cmd".into(),
detail: "ok".into(),
elapsed_ms: None,
};
#[allow(clippy::unwrap_used)]
let json = serde_json::to_value(&completion).unwrap();
assert!(
json.get("operation_id").is_none(),
"operation_id must not appear in serialized DetachedOpCompletion (CONTRACT-003)"
);
assert!(
json.get("job_id").is_some(),
"job_id must be the app-facing control noun"
);
}
}