mod builder;
pub mod comms_impl;
pub mod compact;
mod extraction;
mod hook_impl;
mod runner;
pub mod skills;
mod state;
use crate::budget::Budget;
use crate::comms::{
CommsCommand, EventStream, PeerDirectoryEntry, SendAndStreamError, SendError, SendReceipt,
StreamError, StreamScope, TrustedPeerSpec,
};
use crate::config::{AgentConfig, HookRunOverrides};
use crate::error::AgentError;
use crate::hooks::HookEngine;
use crate::retry::RetryPolicy;
use crate::schema::{CompiledSchema, SchemaError};
use crate::session::Session;
use crate::state::LoopState;
use crate::sub_agent::SubAgentManager;
use crate::types::{
AssistantBlock, BlockAssistantMessage, Message, OutputSchema, StopReason, ToolCallView,
ToolDef, ToolResult, Usage,
};
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
pub use builder::AgentBuilder;
pub use runner::AgentRunner;
#[deprecated(
since = "0.2.0",
note = "Use ToolError::CallbackPending or AgentError::CallbackPending instead"
)]
pub const CALLBACK_TOOL_PREFIX: &str = "CALLBACK_TOOL_PENDING:";
#[async_trait]
pub trait AgentLlmClient: Send + Sync {
async fn stream_response(
&self,
messages: &[Message],
tools: &[Arc<ToolDef>],
max_tokens: u32,
temperature: Option<f32>,
provider_params: Option<&Value>,
) -> Result<LlmStreamResult, AgentError>;
fn provider(&self) -> &'static str;
fn compile_schema(&self, output_schema: &OutputSchema) -> Result<CompiledSchema, SchemaError> {
Ok(CompiledSchema {
schema: output_schema.schema.as_value().clone(),
warnings: Vec::new(),
})
}
}
pub struct LlmStreamResult {
blocks: Vec<AssistantBlock>,
stop_reason: StopReason,
usage: Usage,
}
impl LlmStreamResult {
pub fn new(blocks: Vec<AssistantBlock>, stop_reason: StopReason, usage: Usage) -> Self {
Self {
blocks,
stop_reason,
usage,
}
}
pub fn blocks(&self) -> &[AssistantBlock] {
&self.blocks
}
pub fn stop_reason(&self) -> StopReason {
self.stop_reason
}
pub fn usage(&self) -> &Usage {
&self.usage
}
pub fn into_message(self) -> BlockAssistantMessage {
BlockAssistantMessage {
blocks: self.blocks,
stop_reason: self.stop_reason,
}
}
pub fn into_parts(self) -> (Vec<AssistantBlock>, StopReason, Usage) {
(self.blocks, self.stop_reason, self.usage)
}
}
#[async_trait]
pub trait AgentToolDispatcher: Send + Sync {
fn tools(&self) -> Arc<[Arc<ToolDef>]>;
async fn dispatch(&self, call: ToolCallView<'_>)
-> Result<ToolResult, crate::error::ToolError>;
}
pub struct FilteredToolDispatcher<T: AgentToolDispatcher + ?Sized> {
inner: Arc<T>,
allowed_tools: HashSet<String>,
filtered_tools: Arc<[Arc<ToolDef>]>,
}
impl<T: AgentToolDispatcher + ?Sized> FilteredToolDispatcher<T> {
pub fn new(inner: Arc<T>, allowed_tools: Vec<String>) -> Self {
let allowed_set: HashSet<String> = allowed_tools.into_iter().collect();
let inner_tools = inner.tools();
let filtered: Vec<Arc<ToolDef>> = inner_tools
.iter()
.filter(|t| allowed_set.contains(t.name.as_str()))
.map(Arc::clone)
.collect();
Self {
inner,
allowed_tools: allowed_set,
filtered_tools: filtered.into(),
}
}
}
#[async_trait]
impl<T: AgentToolDispatcher + ?Sized + 'static> AgentToolDispatcher for FilteredToolDispatcher<T> {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
Arc::clone(&self.filtered_tools)
}
async fn dispatch(
&self,
call: ToolCallView<'_>,
) -> Result<ToolResult, crate::error::ToolError> {
if !self.allowed_tools.contains(call.name) {
return Err(crate::error::ToolError::access_denied(call.name));
}
self.inner.dispatch(call).await
}
}
#[async_trait]
pub trait AgentSessionStore: Send + Sync {
async fn save(&self, session: &Session) -> Result<(), AgentError>;
async fn load(&self, id: &str) -> Result<Option<Session>, AgentError>;
}
#[async_trait]
pub trait CommsRuntime: Send + Sync {
fn public_key(&self) -> Option<String> {
None
}
async fn add_trusted_peer(&self, _peer: TrustedPeerSpec) -> Result<(), SendError> {
Err(SendError::Unsupported(
"add_trusted_peer not supported for this CommsRuntime".to_string(),
))
}
async fn send(&self, _cmd: CommsCommand) -> Result<SendReceipt, SendError> {
Err(SendError::Unsupported(
"send not implemented for this CommsRuntime".to_string(),
))
}
fn stream(&self, scope: StreamScope) -> Result<EventStream, StreamError> {
let scope_desc = match scope {
StreamScope::Session(session_id) => format!("session {}", session_id),
StreamScope::Interaction(interaction_id) => format!("interaction {}", interaction_id.0),
};
Err(StreamError::NotFound(scope_desc))
}
async fn peers(&self) -> Vec<PeerDirectoryEntry> {
Vec::new()
}
async fn send_and_stream(
&self,
cmd: CommsCommand,
) -> Result<(SendReceipt, EventStream), SendAndStreamError> {
let receipt = self.send(cmd).await?;
Err(SendAndStreamError::StreamAttach {
receipt,
error: StreamError::Internal(
"send_and_stream is not implemented for this runtime".to_string(),
),
})
}
async fn drain_messages(&self) -> Vec<String>;
fn inbox_notify(&self) -> Arc<tokio::sync::Notify>;
fn dismiss_received(&self) -> bool {
false
}
fn event_injector(&self) -> Option<Arc<dyn crate::SubscribableInjector>> {
None
}
async fn drain_inbox_interactions(&self) -> Vec<crate::interaction::InboxInteraction> {
self.drain_messages()
.await
.into_iter()
.map(|text| crate::interaction::InboxInteraction {
id: crate::interaction::InteractionId(uuid::Uuid::new_v4()),
from: "unknown".into(),
content: crate::interaction::InteractionContent::Message { body: text.clone() },
rendered_text: text,
})
.collect()
}
fn interaction_subscriber(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
None
}
fn take_interaction_stream_sender(
&self,
_id: &crate::interaction::InteractionId,
) -> Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>> {
self.interaction_subscriber(_id)
}
fn mark_interaction_complete(&self, _id: &crate::interaction::InteractionId) {}
}
pub struct Agent<C, T, S>
where
C: AgentLlmClient + ?Sized,
T: AgentToolDispatcher + ?Sized,
S: AgentSessionStore + ?Sized,
{
config: AgentConfig,
client: Arc<C>,
tools: Arc<T>,
store: Arc<S>,
session: Session,
budget: Budget,
retry_policy: RetryPolicy,
state: LoopState,
sub_agent_manager: Arc<SubAgentManager>,
depth: u32,
pub(super) comms_runtime: Option<Arc<dyn CommsRuntime>>,
pub(super) hook_engine: Option<Arc<dyn HookEngine>>,
pub(super) hook_run_overrides: HookRunOverrides,
pub(crate) compactor: Option<Arc<dyn crate::compact::Compactor>>,
pub(crate) last_input_tokens: u64,
pub(crate) last_compaction_turn: Option<u32>,
pub(crate) memory_store: Option<Arc<dyn crate::memory::MemoryStore>>,
pub(crate) skill_engine: Option<Arc<dyn crate::skills::SkillEngine>>,
pub pending_skill_references: Option<Vec<crate::skills::SkillId>>,
pub(crate) event_tap: crate::event_tap::EventTap,
pub(crate) default_event_tx: Option<tokio::sync::mpsc::Sender<crate::event::AgentEvent>>,
pub(crate) host_drain_active: bool,
}
#[cfg(test)]
mod tests {
use super::CommsRuntime;
use crate::comms::{SendError, TrustedPeerSpec};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Notify;
struct NoopCommsRuntime {
notify: Arc<Notify>,
}
#[async_trait]
impl CommsRuntime for NoopCommsRuntime {
async fn drain_messages(&self) -> Vec<String> {
Vec::new()
}
fn inbox_notify(&self) -> std::sync::Arc<Notify> {
self.notify.clone()
}
}
#[tokio::test]
async fn test_comms_runtime_trait_defaults_hide_unimplemented_features() {
let runtime = NoopCommsRuntime {
notify: Arc::new(Notify::new()),
};
assert!(<NoopCommsRuntime as CommsRuntime>::public_key(&runtime).is_none());
let peer = TrustedPeerSpec {
name: "peer-a".to_string(),
peer_id: "ed25519:test".to_string(),
address: "inproc://peer-a".to_string(),
};
let result = <NoopCommsRuntime as CommsRuntime>::add_trusted_peer(&runtime, peer).await;
assert!(matches!(result, Err(SendError::Unsupported(_))));
}
}