mod builder;
pub mod comms_impl;
pub mod compact;
mod extraction;
mod hook_impl;
mod runner;
pub mod skills;
mod state;
use crate::budget::Budget;
use crate::comms::{
CommsCommand, EventStream, PeerDirectoryEntry, SendAndStreamError, SendError, SendReceipt,
StreamError, StreamScope, TrustedPeerSpec,
};
use crate::config::{AgentConfig, HookRunOverrides};
use crate::error::AgentError;
use crate::hooks::HookEngine;
use crate::retry::RetryPolicy;
use crate::schema::{CompiledSchema, SchemaError};
use crate::session::Session;
use crate::state::LoopState;
use crate::sub_agent::SubAgentManager;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use crate::tool_scope::ToolScope;
use crate::types::{
AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
ToolDef, ToolResult, Usage,
};
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
pub use builder::AgentBuilder;
pub use runner::AgentRunner;
#[deprecated(
since = "0.2.0",
note = "Use ToolError::CallbackPending or AgentError::CallbackPending instead"
)]
pub const CALLBACK_TOOL_PREFIX: &str = "CALLBACK_TOOL_PENDING:";
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentLlmClient: Send + Sync {
async fn stream_response(
&self,
messages: &[Message],
tools: &[Arc<ToolDef>],
max_tokens: u32,
temperature: Option<f32>,
provider_params: Option<&Value>,
) -> Result<LlmStreamResult, AgentError>;
fn provider(&self) -> &'static str;
fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
Ok(CompiledSchema {
schema: output_schema.schema.as_value().clone(),
warnings: Vec::new(),
})
}
}
pub struct LlmStreamResult {
blocks: Vec<AssistantBlock>,
stop_reason: StopReason,
usage: Usage,
}
impl LlmStreamResult {
pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
Self {
blocks,
stop_reason,
usage,
}
}
pub fn blocks(&self) -> &[AssistantBlock] {
&self.blocks
}
pub fn stop_reason(&self) -> StopReason {
self.stop_reason
}
pub fn usage(&self) -> &Usage {
&self.usage
}
pub fn into_message(self) -> BlockAssistantMessage {
BlockAssistantMessage {
blocks: self.blocks,
stop_reason: self.stop_reason,
}
}
pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
(self.blocks, self.stop_reason, self.usage)
}
}
#[derive(Debug, Clone)]
pub struct ExternalToolNotice {
pub server: String,
pub operation: crate::event::ToolConfigChangeOperation,
pub status: String,
pub tool_count: Option<usize>,
}
#[derive(Debug, Clone, Default)]
pub struct ExternalToolUpdate {
pub notices: Vec<ExternalToolNotice>,
pub pending: Vec<String>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentToolDispatcher: Send + Sync {
fn tools(&self) -> Arc<[Arc<ToolDef>]>;
async fn dispatch(&self, call: ToolCallView<'_>)
-> Result<ToolResult, crate::error::ToolError>;
async fn poll_external_updates(&self) -> ExternalToolUpdate {
ExternalToolUpdate::default()
}
}
pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
inner: Arc<T>,
allowed_tools: HashSet<String>,
filtered_tools: Arc<[Arc<ToolDef>]>,
}
impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
pub fn new(inner: Arc<T>, allowed_tools: Vec<String>) -> Self {
let allowed_set: HashSet<String> = allowed_tools.into_iter().collect();
let inner_tools = inner.tools();
let filtered: Vec<Arc<ToolDef>> = inner_tools
.iter()
.filter(|t| allowed_set.contains(t.name.as_str()))
.map(Arc::clone)
.collect();
Self {
inner,
allowed_tools: allowed_set,
filtered_tools: filtered.into(),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
Arc::clone(&self.filtered_tools)
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<ToolResult, crate::error::ToolError> {
if !self.allowed_tools.contains(call.name) {
return Err(crate::error::ToolError::access_denied(call.name));
}
self.inner.dispatch(call).await
}
async fn poll_external_updates(&self) -> ExternalToolUpdate {
self.inner.poll_external_updates().await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait AgentSessionStore: Send + Sync {
async fn save(&self, session: &Session) -> Result<(), AgentError>;
async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InlinePeerNotificationPolicy {
Always,
Never,
AtMost(usize),
}
pub const DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS: usize = 50;
impl InlinePeerNotificationPolicy {
pub fn try_from_raw(raw: Option<i32>) -> Result<Self, i32> {
match raw {
None => Ok(Self::AtMost(DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS)),
Some(-1) => Ok(Self::Always),
Some(0) => Ok(Self::Never),
Some(v) if v > 0 => Ok(Self::AtMost(v as usize)),
Some(v) => Err(v),
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait CommsRuntime: Send + Sync {
fn public_key(&self) -> Option<String> {
None
}
async fn add_trusted_peer(&self, _peer: TrustedPeerSpec) -> Result<(), SendError> {
Err(SendError::Unsupported(
"add_trusted_peer not supported for this CommsRuntime".to_string(),
))
}
async fn remove_trusted_peer(&self, _peer_id: &str) -> Result<bool, SendError> {
Err(SendError::Unsupported(
"remove_trusted_peer not supported for this CommsRuntime".to_string(),
))
}
async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
Err(SendError::Unsupported(
"send not implemented for this CommsRuntime".to_string(),
))
}
fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
let scope_desc = match scope {
StreamScope::Session(session_id) => format!("session {session_id}"),
StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
};
Err(StreamError::NotFound(scope_desc))
}
async fn peers(&self) -> Vec<PeerDirectoryEntry> {
Vec::new()
}
async fn peer_count(&self) -> usize {
self.peers().await.len()
}
async fn send_and_stream(
&self,
cmd: CommsCommand,
) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
let receipt = self.send(cmd).await?;
Err(SendAndStreamError::StreamAttach {
receipt,
error: StreamError::Internal(
"send_and_stream is not implemented for this runtime".to_string(),
),
})
}
async fn drain_messages(&self) -> Vec<String>;
fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
fn dismiss_received(&self) -> bool {
false
}
fn event_injector(&self) -> Option<Arc<dyn crate::SubscribableInjector>> {
None
}
async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
self.drain_messages()
.await
.into_iter()
.map(|text| crate::interaction::InboxInteraction {
id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
from: "unknown".into(),
content: crate::interaction::InteractionContent::Message { body: text.clone() },
rendered_text: text,
})
.collect()
}
fn interaction_subscriber(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
None
}
fn take_interaction_stream_sender(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
self.interaction_subscriber(_id)
}
fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
}
pub struct Agent<C, T, S>
where
C: AgentLlmClient + ?Sized,
T: AgentToolDispatcher + ?Sized,
S: AgentSessionStore + ?Sized,
{
config: AgentConfig,
client: Arc<C>,
tools: Arc<T>,
tool_scope: ToolScope,
store: Arc<S>,
session: Session,
budget: Budget,
retry_policy: RetryPolicy,
state: LoopState,
sub_agent_manager: Arc<SubAgentManager>,
depth: u32,
pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
pub(super) hook_run_overrides: HookRunOverrides,
pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
pub(crate) last_input_tokens: u64,
pub(crate) last_compaction_turn: Option<u32>,
pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
pub(crate) skill_engine: Option<Arc<crate::skills::SkillRuntime>>,
pub pending_skill_references: Option<Vec<crate::skills::SkillKey>>,
pub(crate) event_tap: crate::event_tap::EventTap,
pub(crate) system_context_state:
Arc<std::sync::Mutex<crate::session::SessionSystemContextState>>,
pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
pub(crate) checkpointer: Option<Arc<dyn crate::checkpoint::SessionCheckpointer>>,
pub(crate) default_scoped_event_tx:
Option<tokio::sync::mpsc::Sender<crate::event::ScopedAgentEvent>>,
pub(crate) default_scope_path: Vec<crate::event::StreamScopeFrame>,
pub(crate) silent_comms_intents: Vec<String>,
pub(crate) inline_peer_notification_policy: InlinePeerNotificationPolicy,
pub(crate) peer_notification_suppression_active: bool,
pub(crate) host_drain_active: bool,
}
#[cfg(test)]
mod tests {
use super::{
CommsRuntime, DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS, InlinePeerNotificationPolicy,
};
use crate::comms::{SendError, TrustedPeerSpec};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Notify;
struct NoopCommsRuntime {
notify: Arc<Notify>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl CommsRuntime for NoopCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> std::sync::Arc<Notify> {
self.notify.clone()
}
}
#[tokio::test]
async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
let peer = TrustedPeerSpec {
name: "peer-a".to_string(),
peer_id: "ed25519:test".to_string(),
address: "inproc://peer-a".to_string(),
};
let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
#[tokio::test]
async fn test_remove_trusted_peer_default_unsupported() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
let result =
<NoopCommsRuntime as CommsRuntime>::remove_trusted_peer(&runtime, "ed25519:test").await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
#[test]
fn test_inline_peer_notification_policy_from_raw() {
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(None),
Ok(InlinePeerNotificationPolicy::AtMost(
DEFAULT_MAX_INLINE_PEER_NOTIFICATIONS
))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-1)),
Ok(InlinePeerNotificationPolicy::Always)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(0)),
Ok(InlinePeerNotificationPolicy::Never)
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(25)),
Ok(InlinePeerNotificationPolicy::AtMost(25))
);
assert_eq!(
InlinePeerNotificationPolicy::try_from_raw(Some(-42)),
Err(-42)
);
}
}