use crate::{NanoConfig, NanoError};
use crate::tool_assembly::{ToolAssembly, BuiltInToolsConfig};
use crate::nano_loop::{NanoAgentLoop, NanoLoopConfig, NanoRuntimeControlCommand};
use crate::internal::context::NanoSoulSettings;
use crate::internal::provider::{build_provider, build_tool_config};
use agent_diva_agent::{AgentLoop, AgentLoopToolSet};
use agent_diva_core::bus::{AgentEvent, InboundMessage, MessageBus};
#[cfg(feature = "files")]
use agent_diva_files::{FileManager, FileConfig};
use agent_diva_providers::DynamicProvider;
use agent_diva_tooling::{Tool, ToolRegistry};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{error, info};
#[derive(Debug, Clone, Default)]
pub enum AgentLoopMode {
#[default]
Standard,
Nano,
}
pub struct Agent {
bus: MessageBus,
provider: Arc<DynamicProvider>,
mode: AgentLoopMode,
tool_config: Option<agent_diva_agent::ToolConfig>,
tool_registry: Option<ToolRegistry>,
nano_loop_config: Option<NanoLoopConfig>,
workspace: PathBuf,
model: String,
max_iterations: usize,
#[cfg(feature = "files")]
file_manager: Arc<FileManager>,
runtime_control_tx: Option<mpsc::UnboundedSender<NanoRuntimeControlCommand>>,
agent_handle: Option<JoinHandle<()>>,
outbound_handle: Option<JoinHandle<()>>,
}
pub struct AgentBuilder {
config: NanoConfig,
custom_tools: Vec<Arc<dyn Tool>>,
tool_assembly: Option<ToolAssembly>,
mode: AgentLoopMode,
system_prompt: Option<String>,
}
impl Agent {
pub fn new(config: NanoConfig) -> AgentBuilder {
AgentBuilder {
config,
custom_tools: Vec::new(),
tool_assembly: None,
mode: AgentLoopMode::default(),
system_prompt: None,
}
}
pub async fn start(&mut self) -> Result<(), NanoError> {
if self.agent_handle.is_some() {
return Err(NanoError::Other("Agent already started".to_string()));
}
let bus = self.bus.clone();
let provider: Arc<dyn agent_diva_providers::LLMProvider> = self.provider.clone();
let model = self.model.clone();
let workspace = self.workspace.clone();
let max_iterations = self.max_iterations;
#[cfg(feature = "files")]
let file_manager = self.file_manager.clone();
let (runtime_control_tx, runtime_control_rx) = mpsc::unbounded_channel();
self.runtime_control_tx = Some(runtime_control_tx);
match self.mode {
AgentLoopMode::Standard => {
let tool_config = self.tool_config.clone().unwrap_or_default();
let registry = self
.tool_registry
.as_ref()
.map(clone_registry)
.unwrap_or_default();
#[cfg(not(feature = "files"))]
{
return Err(NanoError::Other("Standard mode requires 'files' feature. Use Nano mode or enable 'files' feature.".to_string()));
}
#[cfg(feature = "files")]
{
let mut agent_loop = AgentLoop::with_toolset(
bus.clone(),
provider,
workspace,
Some(model),
Some(max_iterations),
AgentLoopToolSet {
registry,
config: tool_config,
},
None, file_manager,
).await.map_err(|e| NanoError::Other(e.to_string()))?;
let agent_handle = tokio::spawn(async move {
info!("Agent loop (standard) starting");
if let Err(e) = agent_loop.run().await {
error!("Agent loop error: {}", e);
}
info!("Agent loop (standard) stopped");
});
self.agent_handle = Some(agent_handle);
}
}
AgentLoopMode::Nano => {
let tool_registry = self
.tool_registry
.as_ref()
.map(clone_registry)
.unwrap_or_default();
let nano_config = self.nano_loop_config.clone().unwrap_or_default();
let mut nano_loop = NanoAgentLoop::new(
bus.clone(),
provider,
workspace,
Some(model),
nano_config,
tool_registry,
#[cfg(feature = "files")]
file_manager,
).await.map_err(|e| NanoError::Other(e.to_string()))?;
nano_loop = nano_loop.with_runtime_control(runtime_control_rx);
let agent_handle = tokio::spawn(async move {
info!("Nano agent loop starting");
if let Err(e) = nano_loop.run().await {
error!("Nano agent loop error: {}", e);
}
info!("Nano agent loop stopped");
});
self.agent_handle = Some(agent_handle);
}
}
let bus_for_outbound = self.bus.clone();
let outbound_handle = tokio::spawn(async move {
bus_for_outbound.dispatch_outbound_loop().await;
});
self.outbound_handle = Some(outbound_handle);
Ok(())
}
pub async fn send(&self, message: impl Into<String>) -> Result<String, NanoError> {
let content = message.into();
let channel = "nano";
let chat_id = "default";
let mut event_rx = self.bus.subscribe_events();
let inbound = InboundMessage::new(channel, "user", chat_id, content);
self.bus
.publish_inbound(inbound)
.map_err(|e| NanoError::Agent(e.to_string()))?;
let mut full_response = String::new();
loop {
match tokio::time::timeout(
std::time::Duration::from_secs(300),
event_rx.recv(),
)
.await
{
Ok(Ok(bus_event)) => {
if bus_event.channel != channel || bus_event.chat_id != chat_id {
continue;
}
match bus_event.event {
AgentEvent::AssistantDelta { text } => full_response.push_str(&text),
AgentEvent::FinalResponse { content } => {
full_response = content;
break;
}
AgentEvent::Error { message } => {
return Err(NanoError::Agent(message));
}
_ => {}
}
}
Ok(Err(_)) => break,
Err(_) => return Err(NanoError::Timeout),
}
}
Ok(full_response)
}
pub async fn send_stream(
&self,
message: impl Into<String>,
) -> Result<mpsc::UnboundedReceiver<AgentEvent>, NanoError> {
let content = message.into();
let channel = "nano";
let chat_id = "default";
let mut event_rx = self.bus.subscribe_events();
let inbound = InboundMessage::new(channel, "user", chat_id, content);
self.bus
.publish_inbound(inbound)
.map_err(|e| NanoError::Agent(e.to_string()))?;
let (tx, rx) = mpsc::unbounded_channel::<AgentEvent>();
tokio::spawn(async move {
loop {
match tokio::time::timeout(
std::time::Duration::from_secs(300),
event_rx.recv(),
)
.await
{
Ok(Ok(bus_event)) => {
if bus_event.channel != channel || bus_event.chat_id != chat_id {
continue;
}
let is_final = matches!(
bus_event.event,
AgentEvent::FinalResponse { .. } | AgentEvent::Error { .. }
);
if tx.send(bus_event.event).is_err() {
break;
}
if is_final {
break;
}
}
_ => break,
}
}
});
Ok(rx)
}
pub fn reload_tools(&self, registry: ToolRegistry) -> Result<(), NanoError> {
if let Some(ref tx) = self.runtime_control_tx {
tx.send(NanoRuntimeControlCommand::ReloadTools(registry))
.map_err(|e| NanoError::Other(e.to_string()))?;
Ok(())
} else {
Err(NanoError::Other("Runtime control not available (either agent not started or using Standard mode)".to_string()))
}
}
pub fn cancel_session(&self, chat_id: impl Into<String>) -> Result<(), NanoError> {
if let Some(ref tx) = self.runtime_control_tx {
tx.send(NanoRuntimeControlCommand::CancelSession { chat_id: chat_id.into() })
.map_err(|e| NanoError::Other(e.to_string()))?;
Ok(())
} else {
Err(NanoError::Other("Runtime control not available".to_string()))
}
}
pub async fn stop(&mut self) {
if let Some(ref tx) = self.runtime_control_tx {
let _ = tx.send(NanoRuntimeControlCommand::Stop);
}
if let Some(handle) = self.agent_handle.take() {
handle.abort();
let _ = handle.await;
}
if let Some(handle) = self.outbound_handle.take() {
handle.abort();
let _ = handle.await;
}
self.bus.stop().await;
}
}
impl AgentBuilder {
pub fn model(mut self, model: impl Into<String>) -> Self {
self.config.model = model.into();
self
}
pub fn api_key(mut self, key: impl Into<String>) -> Self {
self.config.api_key = key.into();
self
}
pub fn api_base(mut self, base: impl Into<String>) -> Self {
self.config.api_base = Some(base.into());
self
}
pub fn workspace(mut self, path: impl Into<PathBuf>) -> Self {
self.config.workspace = path.into();
self
}
pub fn max_iterations(mut self, n: usize) -> Self {
self.config.max_iterations = n;
self
}
pub fn mode(mut self, mode: AgentLoopMode) -> Self {
self.mode = mode;
self
}
pub fn nano_mode(self) -> Self {
self.mode(AgentLoopMode::Nano)
}
pub fn standard_mode(self) -> Self {
self.mode(AgentLoopMode::Standard)
}
pub fn with_tool(mut self, tool: Arc<dyn Tool>) -> Self {
self.custom_tools.push(tool);
self
}
pub fn with_tool_assembly(mut self, assembly: ToolAssembly) -> Self {
self.tool_assembly = Some(assembly);
self.mode = AgentLoopMode::Standard;
self
}
pub fn builtin_tools(mut self, config: BuiltInToolsConfig) -> Self {
let workspace = self.config.workspace.clone();
let assembly = ToolAssembly::new(workspace)
.builtin(config);
self.tool_assembly = Some(assembly);
self.mode = AgentLoopMode::Standard;
self
}
pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
self.system_prompt = Some(prompt.into());
self
}
pub async fn build(self) -> Result<Agent, NanoError> {
let config = self.config;
if config.model.is_empty() {
return Err(NanoError::Other("model must be set".to_string()));
}
let bus = MessageBus::new();
let client = build_provider(
&config.model,
&config.api_key,
config.api_base.as_deref(),
)?;
let provider = Arc::new(DynamicProvider::new(Arc::new(client)));
let workspace = config.workspace.clone();
let model = config.model.clone();
let max_iterations = config.max_iterations;
#[cfg(feature = "files")]
let file_manager = {
let storage_path = workspace.join(".agent-diva/files");
let file_config = FileConfig::with_path(&storage_path);
Arc::new(FileManager::new(file_config).await.map_err(|e| NanoError::Other(e.to_string()))?)
};
match self.mode {
AgentLoopMode::Standard => {
let tool_config = build_tool_config(&config);
let mut assembly = if let Some(assembly) = self.tool_assembly {
assembly
} else {
let builtin_config = config
.builtin_tools
.clone()
.unwrap_or_else(BuiltInToolsConfig::default);
let mut assembly = ToolAssembly::new(workspace.clone())
.builtin(builtin_config)
.restrict_to_workspace(config.restrict_to_workspace);
if let Some(ref search) = config.web_search {
assembly = assembly.web_config(crate::tool_assembly::WebToolConfig {
search_enabled: true,
fetch_enabled: true,
search_provider: search.provider.clone(),
search_api_key: search.api_key.clone(),
max_results: search.max_results,
});
}
if !config.mcp_servers.is_empty() {
assembly = assembly.mcp_servers(config.mcp_servers.clone());
}
assembly
};
for tool in self.custom_tools {
assembly = assembly.with_tool(tool);
}
#[cfg(feature = "files")]
{
assembly = assembly.with_file_manager(file_manager.clone());
}
let tool_registry = assembly.build();
Ok(Agent {
bus,
provider,
mode: AgentLoopMode::Standard,
tool_config: Some(tool_config),
tool_registry: Some(tool_registry),
nano_loop_config: None,
workspace,
model,
max_iterations,
#[cfg(feature = "files")]
file_manager,
runtime_control_tx: None,
agent_handle: None,
outbound_handle: None,
})
}
AgentLoopMode::Nano => {
let tool_registry = if let Some(mut assembly) = self.tool_assembly {
for tool in self.custom_tools {
assembly = assembly.with_tool(tool);
}
#[cfg(feature = "files")]
{
assembly = assembly.with_file_manager(file_manager.clone());
}
assembly.build()
} else {
let builtin_config = config.builtin_tools.clone()
.unwrap_or_else(|| {
if config.restrict_to_workspace {
BuiltInToolsConfig::default()
} else {
BuiltInToolsConfig::all()
}
});
let mut assembly = ToolAssembly::new(workspace.clone())
.builtin(builtin_config)
.restrict_to_workspace(config.restrict_to_workspace);
if let Some(ref search) = config.web_search {
assembly = assembly.web_config(crate::tool_assembly::WebToolConfig {
search_enabled: true,
fetch_enabled: true,
search_provider: search.provider.clone(),
search_api_key: search.api_key.clone(),
max_results: search.max_results,
});
}
for tool in self.custom_tools {
assembly = assembly.with_tool(tool);
}
if !config.mcp_servers.is_empty() {
assembly = assembly.mcp_servers(config.mcp_servers.clone());
}
#[cfg(feature = "files")]
{
assembly = assembly.with_file_manager(file_manager.clone());
}
assembly.build()
};
let nano_loop_config = NanoLoopConfig {
max_iterations,
memory_window: 10,
soul_settings: NanoSoulSettings {
enabled: config.soul.enabled,
max_chars: config.soul.max_chars,
bootstrap_once: config.soul.bootstrap_once,
},
notify_on_soul_change: config.soul.notify_on_change,
};
Ok(Agent {
bus,
provider,
mode: AgentLoopMode::Nano,
tool_config: None,
tool_registry: Some(tool_registry),
nano_loop_config: Some(nano_loop_config),
workspace,
model,
max_iterations,
#[cfg(feature = "files")]
file_manager,
runtime_control_tx: None,
agent_handle: None,
outbound_handle: None,
})
}
}
}
}
fn clone_registry(registry: &ToolRegistry) -> ToolRegistry {
let mut cloned = ToolRegistry::new();
for name in registry.tool_names() {
if let Some(tool) = registry.get(&name) {
cloned.register(tool);
}
}
cloned
}