1use agent_diva_core::bus::{AgentEvent, InboundMessage, MessageBus, OutboundMessage};
4use agent_diva_core::config::MCPServerConfig;
5use agent_diva_core::cron::CronService;
6use agent_diva_core::error_context::ErrorContext;
7use agent_diva_core::session::SessionManager;
8use agent_diva_files::{FileConfig, FileManager};
9use agent_diva_providers::LLMProvider;
10use agent_diva_tooling::{ToolError, ToolRegistry};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use crate::consolidation;
20use crate::context::{ContextBuilder, SoulContextSettings};
21use crate::runtime_control::RuntimeControlCommand;
22use crate::subagent::SubagentManager;
23use crate::tool_assembly::{SubagentSpawner, ToolAssembly};
24use crate::tool_config::builtin::BuiltInToolsConfig;
25use crate::tool_config::network::NetworkToolConfig;
26
27mod loop_runtime_control;
28mod loop_tools;
29mod loop_turn;
30
31#[derive(Clone)]
33pub struct ToolConfig {
34 pub builtin: BuiltInToolsConfig,
36 pub network: NetworkToolConfig,
38 pub exec_timeout: u64,
40 pub restrict_to_workspace: bool,
42 pub mcp_servers: HashMap<String, MCPServerConfig>,
44 pub cron_service: Option<Arc<CronService>>,
46 pub soul_context: SoulContextSettings,
48 pub notify_on_soul_change: bool,
50 pub soul_governance: SoulGovernanceSettings,
52}
53
54impl Default for ToolConfig {
55 fn default() -> Self {
56 Self {
57 builtin: BuiltInToolsConfig::default(),
58 network: NetworkToolConfig::default(),
59 exec_timeout: 60,
60 restrict_to_workspace: false,
61 mcp_servers: HashMap::new(),
62 cron_service: None,
63 soul_context: SoulContextSettings::default(),
64 notify_on_soul_change: true,
65 soul_governance: SoulGovernanceSettings::default(),
66 }
67 }
68}
69
70#[derive(Clone, Debug)]
72pub struct SoulGovernanceSettings {
73 pub frequent_change_window_secs: u64,
75 pub frequent_change_threshold: usize,
77 pub boundary_confirmation_hint: bool,
79}
80
81impl Default for SoulGovernanceSettings {
82 fn default() -> Self {
83 Self {
84 frequent_change_window_secs: 600,
85 frequent_change_threshold: 3,
86 boundary_confirmation_hint: true,
87 }
88 }
89}
90
91pub struct AgentLoop {
93 bus: MessageBus,
94 provider: Arc<dyn LLMProvider>,
95 #[allow(dead_code)]
96 workspace: PathBuf,
97 #[allow(dead_code)]
98 model: String,
99 max_iterations: usize,
100 memory_window: usize,
101 context: ContextBuilder,
102 sessions: SessionManager,
103 tool_config: ToolConfig,
104 tools: ToolRegistry,
105 subagent_manager: Arc<SubagentManager>,
106 runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
107 cancelled_sessions: HashSet<String>,
108 notify_on_soul_change: bool,
109 soul_governance: SoulGovernanceSettings,
110 soul_change_turns: VecDeque<Instant>,
111 file_manager: Arc<FileManager>,
112}
113
114pub struct AgentLoopToolSet {
115 pub registry: ToolRegistry,
116 pub config: ToolConfig,
117}
118
119struct SubagentManagerSpawner {
120 manager: Arc<SubagentManager>,
121}
122
123#[async_trait::async_trait]
124impl SubagentSpawner for SubagentManagerSpawner {
125 async fn spawn(
126 &self,
127 task: String,
128 label: Option<String>,
129 channel: String,
130 chat_id: String,
131 ) -> Result<String, ToolError> {
132 self.manager
133 .spawn(task, label, channel, chat_id)
134 .await
135 .map_err(|e| ToolError::ExecutionFailed(e.to_string()))
136 }
137}
138
139impl AgentLoop {
140 pub async fn new(
142 bus: MessageBus,
143 provider: Arc<dyn LLMProvider>,
144 workspace: PathBuf,
145 model: Option<String>,
146 max_iterations: Option<usize>,
147 ) -> Result<Self, Box<dyn std::error::Error>> {
148 let model = model.unwrap_or_else(|| provider.get_default_model());
149 let mut context = ContextBuilder::with_skills(workspace.clone(), None);
150 context.set_soul_settings(SoulContextSettings::default());
151 let sessions = SessionManager::new(workspace.clone());
152 let tools = ToolRegistry::new();
153
154 let subagent_manager = Arc::new(SubagentManager::new(
155 provider.clone(),
156 workspace.clone(),
157 bus.clone(),
158 Some(model.clone()),
159 BuiltInToolsConfig::default().for_subagent(),
160 NetworkToolConfig::default(),
161 None,
162 false,
163 HashMap::new(),
164 ));
165
166 let storage_path = dirs::data_local_dir()
168 .map(|p| p.join("agent-diva").join("files"))
169 .unwrap_or_else(|| PathBuf::from(".agent-diva/files"));
170 let file_config = FileConfig::with_path(&storage_path);
171 let file_manager = Arc::new(FileManager::new(file_config).await?);
172
173 Ok(Self {
174 bus,
175 provider,
176 workspace,
177 model,
178 max_iterations: max_iterations.unwrap_or(20),
179 memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
180 context,
181 sessions,
182 tool_config: ToolConfig::default(),
183 tools,
184 subagent_manager,
185 runtime_control_rx: None,
186 cancelled_sessions: HashSet::new(),
187 notify_on_soul_change: true,
188 soul_governance: SoulGovernanceSettings::default(),
189 soul_change_turns: VecDeque::new(),
190 file_manager,
191 })
192 }
193
194 pub fn file_manager(&self) -> Arc<FileManager> {
196 self.file_manager.clone()
197 }
198
199 #[allow(clippy::too_many_arguments)]
201 pub async fn with_tools(
202 bus: MessageBus,
203 provider: Arc<dyn LLMProvider>,
204 workspace: PathBuf,
205 model: Option<String>,
206 max_iterations: Option<usize>,
207 tool_config: ToolConfig,
208 runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
209 file_manager: Arc<FileManager>,
210 ) -> Result<Self, Box<dyn std::error::Error>> {
211 let model = model.unwrap_or_else(|| provider.get_default_model());
212 let mut context = ContextBuilder::with_skills(workspace.clone(), None);
213 context.set_soul_settings(tool_config.soul_context.clone());
214 let sessions = SessionManager::new(workspace.clone());
215
216 let subagent_manager = Arc::new(SubagentManager::new(
217 provider.clone(),
218 workspace.clone(),
219 bus.clone(),
220 Some(model.clone()),
221 tool_config.builtin.for_subagent(),
222 tool_config.network.clone(),
223 Some(tool_config.exec_timeout),
224 tool_config.restrict_to_workspace,
225 tool_config.mcp_servers.clone(),
226 ));
227
228 let spawner = Arc::new(SubagentManagerSpawner {
229 manager: subagent_manager.clone(),
230 });
231 let tools = ToolAssembly::new(workspace.clone())
232 .builtin(tool_config.builtin.clone())
233 .with_network_config(tool_config.network.clone())
234 .with_exec_timeout(tool_config.exec_timeout)
235 .restrict_to_workspace(tool_config.restrict_to_workspace)
236 .mcp_servers(tool_config.mcp_servers.clone())
237 .with_subagent_spawner(spawner)
238 .with_file_manager(file_manager.clone())
239 .with_tools(Vec::new())
240 .build();
241
242 let mut agent = Self {
243 bus,
244 provider,
245 workspace,
246 model,
247 max_iterations: max_iterations.unwrap_or(20),
248 memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
249 context,
250 sessions,
251 tool_config: tool_config.clone(),
252 tools,
253 subagent_manager,
254 runtime_control_rx,
255 cancelled_sessions: HashSet::new(),
256 notify_on_soul_change: tool_config.notify_on_soul_change,
257 soul_governance: tool_config.soul_governance,
258 soul_change_turns: VecDeque::new(),
259 file_manager,
260 };
261
262 if let Some(cron_service) = agent.tool_config.cron_service.clone() {
263 agent.tools = ToolAssembly::new(agent.workspace.clone())
264 .builtin(agent.tool_config.builtin.clone())
265 .with_network_config(agent.tool_config.network.clone())
266 .with_exec_timeout(agent.tool_config.exec_timeout)
267 .restrict_to_workspace(agent.tool_config.restrict_to_workspace)
268 .mcp_servers(agent.tool_config.mcp_servers.clone())
269 .with_subagent_spawner(Arc::new(SubagentManagerSpawner {
270 manager: agent.subagent_manager.clone(),
271 }))
272 .with_cron_service(cron_service)
273 .with_file_manager(agent.file_manager.clone())
274 .build();
275 }
276
277 Ok(agent)
278 }
279
280 #[allow(clippy::too_many_arguments)]
281 pub async fn with_toolset(
282 bus: MessageBus,
283 provider: Arc<dyn LLMProvider>,
284 workspace: PathBuf,
285 model: Option<String>,
286 max_iterations: Option<usize>,
287 toolset: AgentLoopToolSet,
288 runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
289 file_manager: Arc<FileManager>,
290 ) -> Result<Self, Box<dyn std::error::Error>> {
291 let model = model.unwrap_or_else(|| provider.get_default_model());
292 let mut context = ContextBuilder::with_skills(workspace.clone(), None);
293 context.set_soul_settings(toolset.config.soul_context.clone());
294 let sessions = SessionManager::new(workspace.clone());
295 let subagent_manager = Arc::new(SubagentManager::new(
296 provider.clone(),
297 workspace.clone(),
298 bus.clone(),
299 Some(model.clone()),
300 toolset.config.builtin.for_subagent(),
301 toolset.config.network.clone(),
302 Some(toolset.config.exec_timeout),
303 toolset.config.restrict_to_workspace,
304 toolset.config.mcp_servers.clone(),
305 ));
306
307 Ok(Self {
308 bus,
309 provider,
310 workspace,
311 model,
312 max_iterations: max_iterations.unwrap_or(20),
313 memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
314 context,
315 sessions,
316 tool_config: toolset.config.clone(),
317 tools: toolset.registry,
318 subagent_manager,
319 runtime_control_rx,
320 cancelled_sessions: HashSet::new(),
321 notify_on_soul_change: toolset.config.notify_on_soul_change,
322 soul_governance: toolset.config.soul_governance,
323 soul_change_turns: VecDeque::new(),
324 file_manager,
325 })
326 }
327
328 pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
330 info!("Agent loop started");
331
332 let Some(mut inbound_rx) = self.bus.take_inbound_receiver().await else {
334 error!("Failed to take inbound receiver");
335 return Err("Inbound receiver already taken".into());
336 };
337
338 loop {
339 if let Some(control_rx) = self.runtime_control_rx.as_mut() {
340 tokio::select! {
341 control = control_rx.recv() => {
342 match control {
343 Some(cmd) => self.handle_runtime_control_command(cmd).await,
344 None => {
345 info!("Runtime control channel closed");
346 self.runtime_control_rx = None;
347 }
348 }
349 }
350 maybe_msg = inbound_rx.recv() => {
351 match maybe_msg {
352 Some(msg) => self.handle_inbound(msg).await,
353 None => {
354 info!("Message bus closed, stopping agent loop");
355 break;
356 }
357 }
358 }
359 }
360 } else {
361 match tokio::time::timeout(std::time::Duration::from_secs(1), inbound_rx.recv())
362 .await
363 {
364 Ok(Some(msg)) => self.handle_inbound(msg).await,
365 Ok(None) => {
366 info!("Message bus closed, stopping agent loop");
367 break;
368 }
369 Err(_) => continue,
370 }
371 }
372 }
373
374 info!("Agent loop stopped");
375 Ok(())
376 }
377
378 async fn handle_inbound(&mut self, msg: InboundMessage) {
379 debug!("Received message from {}:{}", msg.channel, msg.chat_id);
380 let event_msg = msg.clone();
381 match self.process_inbound_message(msg, None).await {
382 Ok(Some(response)) => {
383 if let Err(e) = self.bus.publish_outbound(response) {
384 error!("Failed to publish response: {}", e);
385 }
386 }
387 Ok(None) => debug!("No response needed"),
388 Err(e) => {
389 let error_message = format!("Failed to process message: {}", e);
390 let ctx = ErrorContext::new("handle_inbound", &error_message)
391 .with_metadata("channel", event_msg.channel.clone())
392 .with_metadata("chat_id", event_msg.chat_id.clone())
393 .with_metadata("sender_id", event_msg.sender_id.clone());
394 error!("{}", ctx.to_detailed_string());
395 self.emit_error_event(&event_msg, None, error_message);
396 }
397 }
398 }
399
400 pub async fn process_inbound_message(
402 &mut self,
403 msg: InboundMessage,
404 event_tx: Option<&mpsc::UnboundedSender<AgentEvent>>,
405 ) -> Result<Option<OutboundMessage>, Box<dyn std::error::Error>> {
406 let trace_id = Uuid::new_v4().to_string();
407 use tracing::Instrument;
408 let span = tracing::info_span!("AgentSpan", trace_id = %trace_id);
409
410 self.process_inbound_message_inner(msg, event_tx, trace_id)
411 .instrument(span)
412 .await
413 }
414
415 pub async fn process_direct(
417 &mut self,
418 content: impl Into<String>,
419 _session_key: impl Into<String>,
420 channel: impl Into<String>,
421 chat_id: impl Into<String>,
422 ) -> Result<String, Box<dyn std::error::Error>> {
423 let content = content.into();
424 let channel = channel.into();
425 let chat_id = chat_id.into();
426
427 let msg = InboundMessage::new(channel, "user", chat_id, content);
428
429 let response = self.process_inbound_message(msg, None).await?;
430 Ok(response
431 .map(|r| {
432 let content = r.content;
433 if let Some(reasoning) = r.reasoning_content {
434 if !reasoning.is_empty() {
435 return format!("<think>\n{}\n</think>\n\n{}", reasoning, content);
436 }
437 }
438 content
439 })
440 .unwrap_or_default())
441 }
442
443 pub async fn process_direct_stream(
445 &mut self,
446 content: impl Into<String>,
447 _session_key: impl Into<String>,
448 channel: impl Into<String>,
449 chat_id: impl Into<String>,
450 event_tx: mpsc::UnboundedSender<AgentEvent>,
451 ) -> Result<String, Box<dyn std::error::Error>> {
452 let content = content.into();
453 let channel = channel.into();
454 let chat_id = chat_id.into();
455
456 let msg = InboundMessage::new(channel, "user", chat_id, content);
457
458 match self.process_inbound_message(msg, Some(&event_tx)).await {
459 Ok(response) => Ok(response.map(|r| r.content).unwrap_or_default()),
460 Err(err) => {
461 let _ = event_tx.send(AgentEvent::Error {
462 message: err.to_string(),
463 });
464 Err(err)
465 }
466 }
467 }
468
469 fn is_frequent_soul_change_turn(&mut self) -> bool {
470 let window = Duration::from_secs(self.soul_governance.frequent_change_window_secs.max(1));
471 let now = Instant::now();
472 self.soul_change_turns.push_back(now);
473 while let Some(front) = self.soul_change_turns.front().copied() {
474 if now.duration_since(front) > window {
475 self.soul_change_turns.pop_front();
476 } else {
477 break;
478 }
479 }
480 self.soul_change_turns.len() >= self.soul_governance.frequent_change_threshold.max(1)
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use agent_diva_providers::{
488 LLMResponse, LiteLLMClient, Message, ProviderError, ProviderEventStream, ProviderResult,
489 };
490 use async_trait::async_trait;
491 use futures::stream;
492 use tokio::time::{timeout, Duration};
493
494 struct FailingStreamProvider;
495
496 #[async_trait]
497 impl LLMProvider for FailingStreamProvider {
498 async fn chat(
499 &self,
500 _messages: Vec<Message>,
501 _tools: Option<Vec<serde_json::Value>>,
502 _model: Option<String>,
503 _max_tokens: i32,
504 _temperature: f64,
505 ) -> ProviderResult<LLMResponse> {
506 Err(ProviderError::ApiError(
507 "chat should not be used".to_string(),
508 ))
509 }
510
511 async fn chat_stream(
512 &self,
513 _messages: Vec<Message>,
514 _tools: Option<Vec<serde_json::Value>>,
515 _model: Option<String>,
516 _max_tokens: i32,
517 _temperature: f64,
518 ) -> ProviderResult<ProviderEventStream> {
519 Ok(Box::pin(stream::iter(vec![Err(ProviderError::ApiError(
520 "simulated stream failure".to_string(),
521 ))])))
522 }
523
524 fn get_default_model(&self) -> String {
525 "test-model".to_string()
526 }
527 }
528
529 #[tokio::test]
530 async fn test_agent_loop_creation() {
531 let bus = MessageBus::new();
532 let provider = Arc::new(LiteLLMClient::default());
533 let workspace = PathBuf::from("/tmp/test");
534 let agent = AgentLoop::new(bus, provider, workspace, None, None)
535 .await
536 .unwrap();
537 assert_eq!(agent.max_iterations, 20);
538 }
539
540 #[tokio::test]
541 async fn test_process_direct() {
542 let bus = MessageBus::new();
543 let provider = Arc::new(LiteLLMClient::default());
544 let temp_dir = tempfile::tempdir().unwrap();
545 let workspace = temp_dir.path().to_path_buf();
546
547 let mut agent = AgentLoop::new(bus, provider, workspace, None, Some(1))
548 .await
549 .unwrap();
550
551 let result = agent
553 .process_direct("Hello", "cli:test", "cli", "test")
554 .await;
555
556 assert!(result.is_err());
558 }
559
560 #[test]
561 fn test_soul_governance_defaults_are_non_zero() {
562 let cfg = SoulGovernanceSettings::default();
563 assert!(cfg.frequent_change_window_secs > 0);
564 assert!(cfg.frequent_change_threshold > 0);
565 }
566
567 #[tokio::test]
568 async fn test_handle_inbound_emits_error_event_on_provider_failure() {
569 let bus = MessageBus::new();
570 let mut event_rx = bus.subscribe_events();
571 let provider = Arc::new(FailingStreamProvider);
572 let temp_dir = tempfile::tempdir().unwrap();
573 let workspace = temp_dir.path().to_path_buf();
574
575 let mut agent = AgentLoop::new(bus.clone(), provider, workspace, None, Some(1))
576 .await
577 .unwrap();
578 let msg = InboundMessage::new("gui", "user", "chat-1", "Hello");
579
580 agent.handle_inbound(msg).await;
581
582 let error_event = timeout(Duration::from_secs(1), async {
583 loop {
584 let bus_event = event_rx.recv().await.unwrap();
585 if let AgentEvent::Error { message } = bus_event.event {
586 break (bus_event.channel, bus_event.chat_id, message);
587 }
588 }
589 })
590 .await
591 .expect("timed out waiting for error event");
592
593 assert_eq!(error_event.0, "gui");
594 assert_eq!(error_event.1, "chat-1");
595 assert!(error_event.2.contains("simulated stream failure"));
596 }
597}