use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use futures_util::stream::StreamExt;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use crate::backends::gemini::{
GeminiBackendConfig, GeminiConnection, GeminiConnectionStrategy, GeminiRunners,
};
use crate::backends::mock::{MockConnectionStrategy, MockRunners};
#[cfg(feature = "anthropic")]
use crate::backends::anthropic::{
AnthropicBackendConfig, AnthropicConnection, AnthropicConnectionStrategy, AnthropicRunners,
};
#[cfg(feature = "openai")]
use crate::backends::openai::{
OpenAiBackendConfig, OpenAiConnection, OpenAiConnectionStrategy, OpenAiRunners,
};
#[cfg(feature = "local")]
use crate::backends::local::connection::{
LocalBackendConfig, LocalConnection, LocalConnectionStrategy, LocalRunners,
};
use crate::connections::{Connection, ConnectionStrategy};
use crate::content::Content;
use crate::conversation::{ChatResponse, Conversation};
use crate::error::{Error, Result};
use crate::hooks::{HookRunner, SessionContext};
use crate::policy::{self, Policy};
use crate::tools::{Tool, ToolContext, ToolRunner};
use crate::triggers::{Trigger, TriggerRunner};
#[cfg(feature = "native")]
use crate::backends::mcp::McpBridge;
#[cfg(feature = "native")]
use crate::types::McpServerConfig;
use crate::types::{
BuiltinTool, CapabilitiesConfig, StepStatus, SystemInstructions, ToolCall,
};
#[derive(Default)]
pub struct AgentConfig {
pub system_instructions: Option<SystemInstructions>,
pub capabilities: CapabilitiesConfig,
pub tools: Vec<Arc<dyn Tool>>,
pub policies: Vec<Policy>,
pub triggers: Vec<Arc<dyn Trigger>>,
pub workspaces: Vec<PathBuf>,
#[cfg(feature = "native")]
pub mcp_servers: Vec<McpServerConfig>,
pub conversation_id: Option<String>,
pub response_schema: Option<String>,
pub pre_tool_hooks: Vec<Arc<dyn crate::hooks::PreToolCallDecideHook>>,
}
impl AgentConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
self.system_instructions = Some(instr.into());
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.capabilities = cap;
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.tools.push(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.policies = policies;
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.pre_tool_hooks.push(hook);
self
}
pub fn with_workspace(mut self, ws: impl Into<PathBuf>) -> Self {
self.workspaces.push(ws.into());
self
}
pub fn with_trigger(mut self, trigger: Arc<dyn Trigger>) -> Self {
self.triggers.push(trigger);
self
}
#[cfg(feature = "native")]
pub fn with_mcp_server(mut self, server: McpServerConfig) -> Self {
self.mcp_servers.push(server);
self
}
}
pub struct GeminiAgentConfig {
pub agent: AgentConfig,
pub gemini: GeminiBackendConfig,
pub initial_history: Option<Vec<u8>>,
}
impl GeminiAgentConfig {
pub fn new(api_key: impl Into<String>) -> Self {
Self {
agent: AgentConfig::default(),
gemini: GeminiBackendConfig::new(api_key),
initial_history: None,
}
}
pub fn with_history_bytes(mut self, bytes: Vec<u8>) -> Self {
self.initial_history = Some(bytes);
self
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.gemini = self.gemini.with_model(model);
self
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
let instr = instr.into();
self.gemini = self.gemini.with_system_instructions(instr.clone());
self.agent = self.agent.with_system_instructions(instr);
self
}
pub fn with_thinking(mut self, level: crate::types::ThinkingLevel) -> Self {
self.gemini = self.gemini.with_thinking(level);
self
}
pub fn with_max_output_tokens(mut self, max: u32) -> Self {
self.gemini = self.gemini.with_max_output_tokens(max);
self
}
pub fn with_response_schema(mut self, schema: impl Into<String>) -> Self {
let s = schema.into();
self.gemini = self.gemini.with_response_schema(s.clone());
self.agent.response_schema = Some(s);
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.agent = self.agent.with_capabilities(cap);
self
}
pub fn with_base_url(mut self, url: url::Url) -> Self {
self.gemini = self.gemini.with_base_url(url);
self
}
pub fn with_auth_provider(mut self, provider: crate::backends::KeyProvider) -> Self {
self.gemini.api_key_provider = Some(crate::backends::AuthTokenProvider(provider));
self
}
pub fn with_filesystem(mut self, fs: crate::filesystem::SharedFilesystem) -> Self {
self.gemini = self.gemini.with_filesystem(fs);
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.agent = self.agent.with_tool(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.agent = self.agent.with_policies(policies);
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.agent = self.agent.with_pre_tool_hook(hook);
self
}
pub fn with_workspace(mut self, ws: impl Into<PathBuf>) -> Self {
self.agent = self.agent.with_workspace(ws);
self
}
pub fn with_trigger(mut self, trigger: Arc<dyn Trigger>) -> Self {
self.agent = self.agent.with_trigger(trigger);
self
}
#[cfg(feature = "native")]
pub fn with_mcp_server(mut self, server: McpServerConfig) -> Self {
self.agent = self.agent.with_mcp_server(server);
self
}
pub fn resume(mut self, conversation_id: impl Into<String>) -> Self {
let id = conversation_id.into();
self.gemini.conversation_id = Some(id.clone());
self.agent.conversation_id = Some(id);
self
}
}
pub struct MockAgentConfig {
pub agent: AgentConfig,
pub mock: MockConnectionStrategy,
}
impl MockAgentConfig {
pub fn new(mock: MockConnectionStrategy) -> Self {
Self {
agent: AgentConfig::default(),
mock,
}
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
self.agent = self.agent.with_system_instructions(instr);
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.agent = self.agent.with_capabilities(cap);
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.agent = self.agent.with_tool(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.agent = self.agent.with_policies(policies);
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.agent = self.agent.with_pre_tool_hook(hook);
self
}
pub fn with_workspace(mut self, ws: impl Into<PathBuf>) -> Self {
self.agent = self.agent.with_workspace(ws);
self
}
pub fn with_trigger(mut self, trigger: Arc<dyn Trigger>) -> Self {
self.agent = self.agent.with_trigger(trigger);
self
}
}
#[cfg(feature = "anthropic")]
pub struct AnthropicAgentConfig {
pub agent: AgentConfig,
pub anthropic: AnthropicBackendConfig,
pub initial_history: Option<Vec<u8>>,
}
#[cfg(feature = "anthropic")]
impl AnthropicAgentConfig {
pub fn new(api_key: impl Into<String>) -> Self {
Self {
agent: AgentConfig::default(),
anthropic: AnthropicBackendConfig::new(api_key),
initial_history: None,
}
}
pub fn with_history_bytes(mut self, bytes: Vec<u8>) -> Self {
self.initial_history = Some(bytes);
self
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.anthropic = self.anthropic.with_model(model);
self
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
let instr = instr.into();
self.anthropic = self.anthropic.with_system_instructions(instr.clone());
self.agent = self.agent.with_system_instructions(instr);
self
}
pub fn with_thinking(mut self, level: crate::types::ThinkingLevel) -> Self {
self.anthropic = self.anthropic.with_thinking(level);
self
}
pub fn with_temperature(mut self, t: f32) -> Self {
self.anthropic = self.anthropic.with_temperature(t);
self
}
pub fn with_max_tokens(mut self, n: u32) -> Self {
self.anthropic = self.anthropic.with_max_tokens(n);
self
}
pub fn with_base_url(mut self, url: url::Url) -> Self {
self.anthropic = self.anthropic.with_base_url(url);
self
}
pub fn with_auth_provider(mut self, provider: crate::backends::KeyProvider) -> Self {
self.anthropic.api_key_provider = Some(crate::backends::AuthTokenProvider(provider));
self
}
pub fn with_filesystem(mut self, fs: crate::filesystem::SharedFilesystem) -> Self {
self.anthropic = self.anthropic.with_filesystem(fs);
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.agent = self.agent.with_capabilities(cap);
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.agent = self.agent.with_tool(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.agent = self.agent.with_policies(policies);
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.agent = self.agent.with_pre_tool_hook(hook);
self
}
pub fn with_workspace(mut self, ws: impl Into<PathBuf>) -> Self {
self.agent = self.agent.with_workspace(ws);
self
}
pub fn with_trigger(mut self, trigger: Arc<dyn Trigger>) -> Self {
self.agent = self.agent.with_trigger(trigger);
self
}
#[cfg(feature = "native")]
pub fn with_mcp_server(mut self, server: McpServerConfig) -> Self {
self.agent = self.agent.with_mcp_server(server);
self
}
pub fn resume(mut self, conversation_id: impl Into<String>) -> Self {
let id = conversation_id.into();
self.anthropic.conversation_id = Some(id.clone());
self.agent.conversation_id = Some(id);
self
}
}
#[cfg(feature = "openai")]
pub struct OpenAiAgentConfig {
pub agent: AgentConfig,
pub openai: OpenAiBackendConfig,
pub initial_history: Option<Vec<u8>>,
}
#[cfg(feature = "openai")]
impl OpenAiAgentConfig {
pub fn new(api_key: impl Into<String>) -> Self {
Self {
agent: AgentConfig::default(),
openai: OpenAiBackendConfig::new(api_key),
initial_history: None,
}
}
pub fn with_history_bytes(mut self, bytes: Vec<u8>) -> Self {
self.initial_history = Some(bytes);
self
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.openai = self.openai.with_model(model);
self
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
let instr = instr.into();
self.openai = self.openai.with_system_instructions(instr.clone());
self.agent = self.agent.with_system_instructions(instr);
self
}
pub fn with_temperature(mut self, t: f32) -> Self {
self.openai = self.openai.with_temperature(t);
self
}
pub fn with_max_tokens(mut self, n: u32) -> Self {
self.openai = self.openai.with_max_tokens(n);
self
}
pub fn with_base_url(mut self, url: url::Url) -> Self {
self.openai = self.openai.with_base_url(url);
self
}
pub fn with_auth_provider(mut self, provider: crate::backends::KeyProvider) -> Self {
self.openai.api_key_provider = Some(crate::backends::AuthTokenProvider(provider));
self
}
pub fn with_filesystem(mut self, fs: crate::filesystem::SharedFilesystem) -> Self {
self.openai = self.openai.with_filesystem(fs);
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.agent = self.agent.with_capabilities(cap);
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.agent = self.agent.with_tool(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.agent = self.agent.with_policies(policies);
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.agent = self.agent.with_pre_tool_hook(hook);
self
}
pub fn with_workspace(mut self, ws: impl Into<PathBuf>) -> Self {
self.agent = self.agent.with_workspace(ws);
self
}
pub fn with_trigger(mut self, trigger: Arc<dyn Trigger>) -> Self {
self.agent = self.agent.with_trigger(trigger);
self
}
#[cfg(feature = "native")]
pub fn with_mcp_server(mut self, server: McpServerConfig) -> Self {
self.agent = self.agent.with_mcp_server(server);
self
}
pub fn resume(mut self, conversation_id: impl Into<String>) -> Self {
let id = conversation_id.into();
self.openai.conversation_id = Some(id.clone());
self.agent.conversation_id = Some(id);
self
}
}
#[cfg(feature = "local")]
pub struct LocalAgentConfig {
pub agent: AgentConfig,
pub local: LocalBackendConfig,
pub initial_history: Option<Vec<u8>>,
}
#[cfg(feature = "local")]
impl LocalAgentConfig {
pub fn new(model: impl Into<String>) -> Self {
Self {
agent: AgentConfig::default(),
local: LocalBackendConfig::new(model),
initial_history: None,
}
}
pub fn with_history_bytes(mut self, bytes: Vec<u8>) -> Self {
self.initial_history = Some(bytes);
self
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.local = self.local.with_model(model);
self
}
pub fn with_system_instructions(mut self, instr: impl Into<SystemInstructions>) -> Self {
let instr = instr.into();
self.local = self.local.with_system_instructions(instr.clone());
self.agent = self.agent.with_system_instructions(instr);
self
}
pub fn with_filesystem(mut self, fs: crate::filesystem::SharedFilesystem) -> Self {
self.local = self.local.with_filesystem(fs);
self
}
pub fn with_capabilities(mut self, cap: CapabilitiesConfig) -> Self {
self.agent = self.agent.with_capabilities(cap);
self.local = self.local.with_capabilities(self.agent.capabilities.clone());
self
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.agent = self.agent.with_tool(tool);
self
}
pub fn with_policies(mut self, policies: Vec<Policy>) -> Self {
self.agent = self.agent.with_policies(policies);
self
}
pub fn with_pre_tool_hook(
mut self,
hook: Arc<dyn crate::hooks::PreToolCallDecideHook>,
) -> Self {
self.agent = self.agent.with_pre_tool_hook(hook);
self
}
pub fn resume(mut self, conversation_id: impl Into<String>) -> Self {
let id = conversation_id.into();
self.local.conversation_id = Some(id.clone());
self.agent.conversation_id = Some(id);
self
}
}
pub struct Agent {
conversation: Conversation,
connection: Arc<dyn Connection>,
gemini_connection: Option<Arc<GeminiConnection>>,
#[cfg(feature = "anthropic")]
anthropic_connection: Option<Arc<AnthropicConnection>>,
#[cfg(feature = "openai")]
openai_connection: Option<Arc<OpenAiConnection>>,
#[cfg(feature = "local")]
local_connection: Option<Arc<LocalConnection>>,
hook_runner: Arc<HookRunner>,
tool_runner: Arc<ToolRunner>,
trigger_runner: Option<Arc<TriggerRunner>>,
#[cfg(feature = "native")]
mcp_bridge: Option<Arc<McpBridge>>,
session_ctx: SessionContext,
#[cfg(not(target_arch = "wasm32"))]
dispatcher: parking_lot::Mutex<Option<JoinHandle<()>>>,
shutdown_flag: Arc<AtomicBool>,
}
impl Agent {
pub async fn start_gemini(mut config: GeminiAgentConfig) -> Result<Self> {
config.agent.capabilities.validate()?;
Self::wire_response_schema(&mut config.agent);
let mut gemini_config = config.gemini;
gemini_config.capabilities = config.agent.capabilities.clone();
let initial_history = config.initial_history.take();
let capture: Arc<parking_lot::Mutex<Option<Arc<GeminiConnection>>>> =
Arc::new(parking_lot::Mutex::new(None));
let capture_for_factory = capture.clone();
let mut agent = Self::start_with_factory(config.agent, move |hooks, tools, ctx| {
GeminiConnectionStrategy::new(gemini_config)
.with_runners(GeminiRunners {
tool_runner: Some(tools),
hook_runner: Some(hooks),
session_ctx: Some(ctx),
})
.with_typed_capture(capture_for_factory)
})
.await?;
agent.gemini_connection = capture.lock().take();
if let (Some(bytes), Some(gc)) = (initial_history, agent.gemini_connection.as_ref()) {
gc.set_history_bytes(&bytes)?;
}
Ok(agent)
}
pub async fn start_mock(mut config: MockAgentConfig) -> Result<Self> {
config.agent.capabilities.validate()?;
Self::wire_response_schema(&mut config.agent);
let mock = config.mock;
Self::start_with_factory(config.agent, move |hooks, tools, ctx| {
mock.with_runners(MockRunners {
tool_runner: Some(tools),
hook_runner: Some(hooks),
session_ctx: Some(ctx),
})
})
.await
}
#[cfg(feature = "anthropic")]
pub async fn start_anthropic(mut config: AnthropicAgentConfig) -> Result<Self> {
config.agent.capabilities.validate()?;
Self::wire_response_schema(&mut config.agent);
let mut anthropic_config = config.anthropic;
anthropic_config.capabilities = config.agent.capabilities.clone();
let initial_history = config.initial_history.take();
let capture: Arc<parking_lot::Mutex<Option<Arc<AnthropicConnection>>>> =
Arc::new(parking_lot::Mutex::new(None));
let capture_for_factory = capture.clone();
let mut agent = Self::start_with_factory(config.agent, move |hooks, tools, ctx| {
AnthropicConnectionStrategy::new(anthropic_config)
.with_runners(AnthropicRunners {
tool_runner: Some(tools),
hook_runner: Some(hooks),
session_ctx: Some(ctx),
})
.with_typed_capture(capture_for_factory)
})
.await?;
agent.anthropic_connection = capture.lock().take();
if let (Some(bytes), Some(ac)) = (initial_history, agent.anthropic_connection.as_ref()) {
ac.set_history_bytes(&bytes)?;
}
Ok(agent)
}
#[cfg(feature = "openai")]
pub async fn start_openai(mut config: OpenAiAgentConfig) -> Result<Self> {
config.agent.capabilities.validate()?;
Self::wire_response_schema(&mut config.agent);
let mut openai_config = config.openai;
openai_config.capabilities = config.agent.capabilities.clone();
let initial_history = config.initial_history.take();
let capture: Arc<parking_lot::Mutex<Option<Arc<OpenAiConnection>>>> =
Arc::new(parking_lot::Mutex::new(None));
let capture_for_factory = capture.clone();
let mut agent = Self::start_with_factory(config.agent, move |hooks, tools, ctx| {
OpenAiConnectionStrategy::new(openai_config)
.with_runners(OpenAiRunners {
tool_runner: Some(tools),
hook_runner: Some(hooks),
session_ctx: Some(ctx),
})
.with_typed_capture(capture_for_factory)
})
.await?;
agent.openai_connection = capture.lock().take();
if let (Some(bytes), Some(oc)) = (initial_history, agent.openai_connection.as_ref()) {
oc.set_history_bytes(&bytes)?;
}
Ok(agent)
}
#[cfg(feature = "local")]
pub async fn start_local(mut config: LocalAgentConfig) -> Result<Self> {
config.agent.capabilities.validate()?;
Self::wire_response_schema(&mut config.agent);
let mut local_config = config.local;
local_config.capabilities = config.agent.capabilities.clone();
let initial_history = config.initial_history.take();
let capture: Arc<parking_lot::Mutex<Option<Arc<LocalConnection>>>> =
Arc::new(parking_lot::Mutex::new(None));
let capture_for_factory = capture.clone();
let mut agent = Self::start_with_factory(config.agent, move |hooks, tools, ctx| {
LocalConnectionStrategy::new(local_config)
.with_runners(LocalRunners {
tool_runner: Some(tools),
hook_runner: Some(hooks),
session_ctx: Some(ctx),
})
.with_typed_capture(capture_for_factory)
})
.await?;
agent.local_connection = capture.lock().take();
if let (Some(bytes), Some(lc)) = (initial_history, agent.local_connection.as_ref()) {
lc.set_history_bytes(&bytes)?;
}
Ok(agent)
}
pub fn cumulative_usage(&self) -> crate::types::UsageMetadata {
self.conversation.cumulative_usage()
}
pub fn cancel_turn(&self) {
self.conversation.cancel_turn();
}
pub fn history_bytes(&self) -> Result<Option<Vec<u8>>> {
if let Some(gc) = self.gemini_connection.as_ref() {
return gc.history_bytes().map(Some);
}
#[cfg(feature = "anthropic")]
if let Some(ac) = self.anthropic_connection.as_ref() {
return ac.history_bytes().map(Some);
}
#[cfg(feature = "openai")]
if let Some(oc) = self.openai_connection.as_ref() {
return oc.history_bytes().map(Some);
}
#[cfg(feature = "local")]
if let Some(lc) = self.local_connection.as_ref() {
return lc.history_bytes().map(Some);
}
Ok(None)
}
pub async fn compact(&self) -> bool {
if let Some(gc) = self.gemini_connection.as_ref() {
return gc.compact().await;
}
#[cfg(feature = "anthropic")]
if let Some(ac) = self.anthropic_connection.as_ref() {
return ac.compact().await;
}
#[cfg(feature = "openai")]
if let Some(oc) = self.openai_connection.as_ref() {
return oc.compact().await;
}
#[cfg(feature = "local")]
if let Some(lc) = self.local_connection.as_ref() {
return lc.compact().await;
}
false
}
pub fn clear_history(&self) {
if let Some(gc) = self.gemini_connection.as_ref() {
gc.clear_history();
}
#[cfg(feature = "anthropic")]
if let Some(ac) = self.anthropic_connection.as_ref() {
ac.clear_history();
}
#[cfg(feature = "openai")]
if let Some(oc) = self.openai_connection.as_ref() {
oc.clear_history();
}
#[cfg(feature = "local")]
if let Some(lc) = self.local_connection.as_ref() {
lc.clear_history();
}
}
pub fn transcript(&self) -> Vec<crate::types::TranscriptEntry> {
if let Some(gc) = self.gemini_connection.as_ref() {
return gc.transcript();
}
#[cfg(feature = "anthropic")]
if let Some(ac) = self.anthropic_connection.as_ref() {
return ac.transcript();
}
#[cfg(feature = "openai")]
if let Some(oc) = self.openai_connection.as_ref() {
return oc.transcript();
}
#[cfg(feature = "local")]
if let Some(lc) = self.local_connection.as_ref() {
return lc.transcript();
}
Vec::new()
}
async fn start_with_factory<S, F>(agent_config: AgentConfig, factory: F) -> Result<Self>
where
S: ConnectionStrategy + 'static,
F: FnOnce(Arc<HookRunner>, Arc<ToolRunner>, SessionContext) -> S,
{
let hook_runner = Arc::new(HookRunner::new());
let tool_runner = Arc::new(ToolRunner::new());
for t in &agent_config.tools {
tool_runner.register(t.clone());
}
let mut active_policies = agent_config.policies;
if !agent_config.workspaces.is_empty() {
let mut ws_policies = policy::workspace_only(agent_config.workspaces.clone());
ws_policies.extend(active_policies);
active_policies = ws_policies;
}
let has_custom_tools = !agent_config.tools.is_empty();
if requires_safety_policy(&agent_config.capabilities, has_custom_tools)
&& active_policies.is_empty()
&& !hook_runner.has_pre_tool_call_decide()
{
return Err(Error::config(
"write or custom tools are enabled but no safety policies are \
configured. Add policy::allow_all() to approve all calls, or \
[policy::deny_all(), policy::allow(\"tool_name\")] to scope.",
));
}
if !active_policies.is_empty() {
hook_runner.register_pre_tool_call_decide(policy::enforce(active_policies));
}
for hook in &agent_config.pre_tool_hooks {
hook_runner.register_pre_tool_call_decide(hook.clone());
}
#[cfg(feature = "native")]
let mcp_bridge = if agent_config.mcp_servers.is_empty() {
None
} else {
let mut bridge = McpBridge::new();
for cfg in &agent_config.mcp_servers {
bridge.connect(cfg).await?;
}
let registered = bridge.register_into(&tool_runner);
if !registered.is_empty() {
tracing::debug!(?registered, "registered MCP tools");
}
Some(Arc::new(bridge))
};
let session_ctx = SessionContext::new();
let strategy = factory(hook_runner.clone(), tool_runner.clone(), session_ctx.clone());
let connection = strategy.connect().await?;
hook_runner.dispatch_session_start(&session_ctx).await;
tool_runner.set_context(Arc::new(ToolContext::new(connection.clone())));
let conversation = Conversation::new(connection.clone());
let shutdown_flag = Arc::new(AtomicBool::new(false));
#[cfg(not(target_arch = "wasm32"))]
let dispatcher = spawn_tool_dispatcher(
connection.clone(),
tool_runner.clone(),
hook_runner.clone(),
session_ctx.clone(),
shutdown_flag.clone(),
);
#[cfg(target_arch = "wasm32")]
spawn_tool_dispatcher(
connection.clone(),
tool_runner.clone(),
hook_runner.clone(),
session_ctx.clone(),
shutdown_flag.clone(),
);
let trigger_runner = if agent_config.triggers.is_empty() {
None
} else {
let runner = Arc::new(TriggerRunner::new(
agent_config.triggers,
connection.clone(),
));
runner.start()?;
Some(runner)
};
Ok(Self {
conversation,
connection,
gemini_connection: None,
#[cfg(feature = "anthropic")]
anthropic_connection: None,
#[cfg(feature = "openai")]
openai_connection: None,
#[cfg(feature = "local")]
local_connection: None,
hook_runner,
tool_runner,
trigger_runner,
#[cfg(feature = "native")]
mcp_bridge,
session_ctx,
#[cfg(not(target_arch = "wasm32"))]
dispatcher: parking_lot::Mutex::new(Some(dispatcher)),
shutdown_flag,
})
}
fn wire_response_schema(config: &mut AgentConfig) {
if let Some(schema) = config.response_schema.take() {
config.capabilities.finish_tool_schema_json = Some(schema);
}
}
pub fn conversation(&self) -> &Conversation {
&self.conversation
}
pub fn conversation_id(&self) -> String {
self.connection.conversation_id().to_string()
}
pub fn hooks(&self) -> &HookRunner {
&self.hook_runner
}
pub fn tools(&self) -> &ToolRunner {
&self.tool_runner
}
pub async fn chat(&self, content: impl Into<Content>) -> Result<ChatResponse> {
self.conversation.chat(content).await
}
pub async fn shutdown(self) -> Result<()> {
self.shutdown_flag.store(true, Ordering::Release);
#[cfg(not(target_arch = "wasm32"))]
{
let handle = self.dispatcher.lock().take();
if let Some(handle) = handle {
handle.abort();
let _ = handle.await;
}
}
if let Some(triggers) = self.trigger_runner.as_ref() {
triggers.stop().await;
}
self.hook_runner.dispatch_session_end(&self.session_ctx).await;
self.connection.shutdown().await?;
#[cfg(feature = "native")]
if let Some(bridge) = self.mcp_bridge.as_ref() {
bridge.shutdown().await;
}
Ok(())
}
}
impl Drop for Agent {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::Release);
#[cfg(not(target_arch = "wasm32"))]
if let Some(handle) = self.dispatcher.lock().take() {
handle.abort();
}
}
}
fn requires_safety_policy(capabilities: &CapabilitiesConfig, has_custom_tools: bool) -> bool {
has_custom_tools
|| capabilities
.effective_tools()
.iter()
.any(|t| !BuiltinTool::READ_ONLY.contains(t))
}
#[cfg(not(target_arch = "wasm32"))]
fn spawn_tool_dispatcher(
connection: Arc<dyn Connection>,
tool_runner: Arc<ToolRunner>,
hook_runner: Arc<HookRunner>,
session_ctx: SessionContext,
shutdown: Arc<AtomicBool>,
) -> JoinHandle<()> {
let registered: std::collections::HashSet<String> =
tool_runner.names().into_iter().collect();
tokio::spawn(async move {
let mut stream = connection.subscribe_steps();
while let Some(step) = stream.next().await {
if shutdown.load(Ordering::Acquire) {
return;
}
let step = match step {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "tool dispatcher stream error");
continue;
}
};
if step.tool_calls.is_empty() {
continue;
}
if matches!(step.status, StepStatus::Done) {
continue;
}
let custom_calls: Vec<ToolCall> = step
.tool_calls
.into_iter()
.filter(|tc| registered.contains(&tc.name))
.collect();
if custom_calls.is_empty() {
continue;
}
let turn_ctx = session_ctx.child();
let mut results = Vec::with_capacity(custom_calls.len());
for call in custom_calls {
let (decision, op_ctx) =
hook_runner.dispatch_pre_tool_call(&turn_ctx, &call).await;
if !decision.allow {
let r = crate::types::ToolResult::err(
call.name.clone(),
call.id.clone(),
decision.message.clone(),
);
hook_runner.dispatch_post_tool_call(&op_ctx, &r).await;
results.push(r);
continue;
}
let r = match tool_runner.execute(&call.name, call.args.clone()).await {
Ok(v) => crate::types::ToolResult::ok(call.name.clone(), call.id.clone(), v),
Err(e) => crate::types::ToolResult::err(
call.name.clone(),
call.id.clone(),
e.to_string(),
),
};
hook_runner.dispatch_post_tool_call(&op_ctx, &r).await;
results.push(r);
}
if let Err(e) = connection.send_tool_results(results).await {
warn!(error = %e, "failed to send tool results");
}
}
debug!("tool dispatcher exiting");
})
}
#[cfg(target_arch = "wasm32")]
fn spawn_tool_dispatcher(
connection: Arc<dyn Connection>,
tool_runner: Arc<ToolRunner>,
hook_runner: Arc<HookRunner>,
session_ctx: SessionContext,
shutdown: Arc<AtomicBool>,
) {
let registered: std::collections::HashSet<String> =
tool_runner.names().into_iter().collect();
crate::runtime::spawn(async move {
let mut stream = connection.subscribe_steps();
while let Some(step) = stream.next().await {
if shutdown.load(Ordering::Acquire) {
return;
}
let step = match step {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "tool dispatcher stream error");
continue;
}
};
if step.tool_calls.is_empty() {
continue;
}
if matches!(step.status, StepStatus::Done) {
continue;
}
let custom_calls: Vec<ToolCall> = step
.tool_calls
.into_iter()
.filter(|tc| registered.contains(&tc.name))
.collect();
if custom_calls.is_empty() {
continue;
}
let turn_ctx = session_ctx.child();
let mut results = Vec::with_capacity(custom_calls.len());
for call in custom_calls {
let (decision, op_ctx) =
hook_runner.dispatch_pre_tool_call(&turn_ctx, &call).await;
if !decision.allow {
let r = crate::types::ToolResult::err(
call.name.clone(),
call.id.clone(),
decision.message.clone(),
);
hook_runner.dispatch_post_tool_call(&op_ctx, &r).await;
results.push(r);
continue;
}
let r = match tool_runner.execute(&call.name, call.args.clone()).await {
Ok(v) => crate::types::ToolResult::ok(call.name.clone(), call.id.clone(), v),
Err(e) => crate::types::ToolResult::err(
call.name.clone(),
call.id.clone(),
e.to_string(),
),
};
hook_runner.dispatch_post_tool_call(&op_ctx, &r).await;
results.push(r);
}
if let Err(e) = connection.send_tool_results(results).await {
warn!(error = %e, "failed to send tool results");
}
}
debug!("tool dispatcher exiting");
});
}
#[cfg(test)]
mod safety_guard_tests {
use super::*;
fn caps(enabled: Vec<BuiltinTool>) -> CapabilitiesConfig {
CapabilitiesConfig {
enabled_tools: Some(enabled),
..Default::default()
}
}
#[test]
fn custom_tools_require_a_safety_policy() {
assert!(!requires_safety_policy(&caps(vec![]), false));
assert!(requires_safety_policy(&caps(vec![]), true));
assert!(requires_safety_policy(&caps(vec![BuiltinTool::CreateFile]), false));
assert!(!requires_safety_policy(
&caps(BuiltinTool::READ_ONLY.to_vec()),
false
));
}
}