1use agent_diva_core::bus::{AgentEvent, InboundMessage, MessageBus, OutboundMessage};
4use agent_diva_core::config::MCPServerConfig;
5use agent_diva_core::cron::CronService;
6use agent_diva_core::error_context::ErrorContext;
7use agent_diva_core::security::{SecurityConfig, SecurityLevel, SecurityPolicy};
8use agent_diva_core::session::SessionManager;
9use agent_diva_files::{FileConfig, FileManager};
10use agent_diva_providers::LLMProvider;
11use agent_diva_tools::{
12 load_mcp_tools_sync, CronTool, EditFileTool, ExecTool, ListDirTool, ReadFileTool, SpawnTool,
13 ToolError, ToolRegistry, WriteFileTool,
14};
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::mpsc;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use crate::consolidation;
24use crate::context::{ContextBuilder, SoulContextSettings};
25use crate::runtime_control::RuntimeControlCommand;
26use crate::subagent::SubagentManager;
27use crate::tool_config::network::NetworkToolConfig;
28
29mod loop_runtime_control;
30mod loop_tools;
31mod loop_turn;
32
33#[derive(Clone)]
35pub struct ToolConfig {
36 pub network: NetworkToolConfig,
38 pub exec_timeout: u64,
40 pub restrict_to_workspace: bool,
42 pub mcp_servers: HashMap<String, MCPServerConfig>,
44 pub cron_service: Option<Arc<CronService>>,
46 pub soul_context: SoulContextSettings,
48 pub notify_on_soul_change: bool,
50 pub soul_governance: SoulGovernanceSettings,
52}
53
54impl Default for ToolConfig {
55 fn default() -> Self {
56 Self {
57 network: NetworkToolConfig::default(),
58 exec_timeout: 60,
59 restrict_to_workspace: false,
60 mcp_servers: HashMap::new(),
61 cron_service: None,
62 soul_context: SoulContextSettings::default(),
63 notify_on_soul_change: true,
64 soul_governance: SoulGovernanceSettings::default(),
65 }
66 }
67}
68
69#[derive(Clone, Debug)]
71pub struct SoulGovernanceSettings {
72 pub frequent_change_window_secs: u64,
74 pub frequent_change_threshold: usize,
76 pub boundary_confirmation_hint: bool,
78}
79
80impl Default for SoulGovernanceSettings {
81 fn default() -> Self {
82 Self {
83 frequent_change_window_secs: 600,
84 frequent_change_threshold: 3,
85 boundary_confirmation_hint: true,
86 }
87 }
88}
89
90pub struct AgentLoop {
92 bus: MessageBus,
93 provider: Arc<dyn LLMProvider>,
94 #[allow(dead_code)]
95 workspace: PathBuf,
96 #[allow(dead_code)]
97 model: String,
98 max_iterations: usize,
99 memory_window: usize,
100 context: ContextBuilder,
101 sessions: SessionManager,
102 tools: ToolRegistry,
103 subagent_manager: Arc<SubagentManager>,
104 runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
105 cancelled_sessions: HashSet<String>,
106 notify_on_soul_change: bool,
107 soul_governance: SoulGovernanceSettings,
108 soul_change_turns: VecDeque<Instant>,
109 file_manager: Arc<FileManager>,
110}
111
112impl AgentLoop {
113 pub async fn new(
115 bus: MessageBus,
116 provider: Arc<dyn LLMProvider>,
117 workspace: PathBuf,
118 model: Option<String>,
119 max_iterations: Option<usize>,
120 ) -> Result<Self, Box<dyn std::error::Error>> {
121 let model = model.unwrap_or_else(|| provider.get_default_model());
122 let mut context = ContextBuilder::with_skills(workspace.clone(), None);
123 context.set_soul_settings(SoulContextSettings::default());
124 let sessions = SessionManager::new(workspace.clone());
125 let tools = ToolRegistry::new();
126
127 let subagent_manager = Arc::new(SubagentManager::new(
128 provider.clone(),
129 workspace.clone(),
130 bus.clone(),
131 Some(model.clone()),
132 NetworkToolConfig::default(),
133 None,
134 false,
135 ));
136
137 let storage_path = dirs::data_local_dir()
139 .map(|p| p.join("agent-diva").join("files"))
140 .unwrap_or_else(|| PathBuf::from(".agent-diva/files"));
141 let file_config = FileConfig::with_path(&storage_path);
142 let file_manager = Arc::new(FileManager::new(file_config).await?);
143
144 Ok(Self {
145 bus,
146 provider,
147 workspace,
148 model,
149 max_iterations: max_iterations.unwrap_or(20),
150 memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
151 context,
152 sessions,
153 tools,
154 subagent_manager,
155 runtime_control_rx: None,
156 cancelled_sessions: HashSet::new(),
157 notify_on_soul_change: true,
158 soul_governance: SoulGovernanceSettings::default(),
159 soul_change_turns: VecDeque::new(),
160 file_manager,
161 })
162 }
163
164 pub fn file_manager(&self) -> Arc<FileManager> {
166 self.file_manager.clone()
167 }
168
169 #[allow(clippy::too_many_arguments)]
171 pub async fn with_tools(
172 bus: MessageBus,
173 provider: Arc<dyn LLMProvider>,
174 workspace: PathBuf,
175 model: Option<String>,
176 max_iterations: Option<usize>,
177 tool_config: ToolConfig,
178 runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
179 file_manager: Arc<FileManager>,
180 ) -> Result<Self, Box<dyn std::error::Error>> {
181 let model = model.unwrap_or_else(|| provider.get_default_model());
182 let mut context = ContextBuilder::with_skills(workspace.clone(), None);
183 context.set_soul_settings(tool_config.soul_context.clone());
184 let sessions = SessionManager::new(workspace.clone());
185 let mut tools = ToolRegistry::new();
186
187 let subagent_manager = Arc::new(SubagentManager::new(
188 provider.clone(),
189 workspace.clone(),
190 bus.clone(),
191 Some(model.clone()),
192 tool_config.network.clone(),
193 Some(tool_config.exec_timeout),
194 tool_config.restrict_to_workspace,
195 ));
196
197 let sm = subagent_manager.clone();
199 tools.register(Arc::new(SpawnTool::new(
200 move |task, label, channel, chat_id| {
201 let sm = sm.clone();
202 async move {
203 sm.spawn(task, label, channel, chat_id)
204 .await
205 .map_err(|e| ToolError::ExecutionFailed(e.to_string()))
206 }
207 },
208 )));
209
210 let security_config = if tool_config.restrict_to_workspace {
212 SecurityConfig {
213 level: SecurityLevel::Standard,
214 workspace_only: true,
215 ..SecurityConfig::default()
216 }
217 } else {
218 SecurityConfig::default()
219 };
220 let security = Arc::new(SecurityPolicy::with_config(
221 workspace.clone(),
222 security_config,
223 ));
224 tools.register(Arc::new(ReadFileTool::new(security.clone())));
225 tools.register(Arc::new(WriteFileTool::new(security.clone())));
226 tools.register(Arc::new(EditFileTool::new(security.clone())));
227 tools.register(Arc::new(ListDirTool::new(security)));
228
229 tools.register(Arc::new(ExecTool::with_config(
231 tool_config.exec_timeout,
232 Some(workspace.clone()),
233 tool_config.restrict_to_workspace,
234 )));
235
236 Self::register_web_tools(&mut tools, &tool_config.network);
238
239 for mcp_tool in load_mcp_tools_sync(&tool_config.mcp_servers) {
241 tools.register(mcp_tool);
242 }
243
244 if let Some(cron_service) = tool_config.cron_service.clone() {
246 tools.register(Arc::new(CronTool::new(cron_service)));
247 }
248
249 Ok(Self {
250 bus,
251 provider,
252 workspace,
253 model,
254 max_iterations: max_iterations.unwrap_or(20),
255 memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
256 context,
257 sessions,
258 tools,
259 subagent_manager,
260 runtime_control_rx,
261 cancelled_sessions: HashSet::new(),
262 notify_on_soul_change: tool_config.notify_on_soul_change,
263 soul_governance: tool_config.soul_governance,
264 soul_change_turns: VecDeque::new(),
265 file_manager,
266 })
267 }
268
269 pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
271 info!("Agent loop started");
272
273 let Some(mut inbound_rx) = self.bus.take_inbound_receiver().await else {
275 error!("Failed to take inbound receiver");
276 return Err("Inbound receiver already taken".into());
277 };
278
279 loop {
280 if let Some(control_rx) = self.runtime_control_rx.as_mut() {
281 tokio::select! {
282 control = control_rx.recv() => {
283 match control {
284 Some(cmd) => self.handle_runtime_control_command(cmd).await,
285 None => {
286 info!("Runtime control channel closed");
287 self.runtime_control_rx = None;
288 }
289 }
290 }
291 maybe_msg = inbound_rx.recv() => {
292 match maybe_msg {
293 Some(msg) => self.handle_inbound(msg).await,
294 None => {
295 info!("Message bus closed, stopping agent loop");
296 break;
297 }
298 }
299 }
300 }
301 } else {
302 match tokio::time::timeout(std::time::Duration::from_secs(1), inbound_rx.recv())
303 .await
304 {
305 Ok(Some(msg)) => self.handle_inbound(msg).await,
306 Ok(None) => {
307 info!("Message bus closed, stopping agent loop");
308 break;
309 }
310 Err(_) => continue,
311 }
312 }
313 }
314
315 info!("Agent loop stopped");
316 Ok(())
317 }
318
319 async fn handle_inbound(&mut self, msg: InboundMessage) {
320 debug!("Received message from {}:{}", msg.channel, msg.chat_id);
321 let event_msg = msg.clone();
322 match self.process_inbound_message(msg, None).await {
323 Ok(Some(response)) => {
324 if let Err(e) = self.bus.publish_outbound(response) {
325 error!("Failed to publish response: {}", e);
326 }
327 }
328 Ok(None) => debug!("No response needed"),
329 Err(e) => {
330 let error_message = format!("Failed to process message: {}", e);
331 let ctx = ErrorContext::new("handle_inbound", &error_message)
332 .with_metadata("channel", event_msg.channel.clone())
333 .with_metadata("chat_id", event_msg.chat_id.clone())
334 .with_metadata("sender_id", event_msg.sender_id.clone());
335 error!("{}", ctx.to_detailed_string());
336 self.emit_error_event(&event_msg, None, error_message);
337 }
338 }
339 }
340
341 pub async fn process_inbound_message(
343 &mut self,
344 msg: InboundMessage,
345 event_tx: Option<&mpsc::UnboundedSender<AgentEvent>>,
346 ) -> Result<Option<OutboundMessage>, Box<dyn std::error::Error>> {
347 let trace_id = Uuid::new_v4().to_string();
348 use tracing::Instrument;
349 let span = tracing::info_span!("AgentSpan", trace_id = %trace_id);
350
351 self.process_inbound_message_inner(msg, event_tx, trace_id)
352 .instrument(span)
353 .await
354 }
355
356 pub async fn process_direct(
358 &mut self,
359 content: impl Into<String>,
360 _session_key: impl Into<String>,
361 channel: impl Into<String>,
362 chat_id: impl Into<String>,
363 ) -> Result<String, Box<dyn std::error::Error>> {
364 let content = content.into();
365 let channel = channel.into();
366 let chat_id = chat_id.into();
367
368 let msg = InboundMessage::new(channel, "user", chat_id, content);
369
370 let response = self.process_inbound_message(msg, None).await?;
371 Ok(response
372 .map(|r| {
373 let content = r.content;
374 if let Some(reasoning) = r.reasoning_content {
375 if !reasoning.is_empty() {
376 return format!("<think>\n{}\n</think>\n\n{}", reasoning, content);
377 }
378 }
379 content
380 })
381 .unwrap_or_default())
382 }
383
384 pub async fn process_direct_stream(
386 &mut self,
387 content: impl Into<String>,
388 _session_key: impl Into<String>,
389 channel: impl Into<String>,
390 chat_id: impl Into<String>,
391 event_tx: mpsc::UnboundedSender<AgentEvent>,
392 ) -> Result<String, Box<dyn std::error::Error>> {
393 let content = content.into();
394 let channel = channel.into();
395 let chat_id = chat_id.into();
396
397 let msg = InboundMessage::new(channel, "user", chat_id, content);
398
399 match self.process_inbound_message(msg, Some(&event_tx)).await {
400 Ok(response) => Ok(response.map(|r| r.content).unwrap_or_default()),
401 Err(err) => {
402 let _ = event_tx.send(AgentEvent::Error {
403 message: err.to_string(),
404 });
405 Err(err)
406 }
407 }
408 }
409
410 fn is_frequent_soul_change_turn(&mut self) -> bool {
411 let window = Duration::from_secs(self.soul_governance.frequent_change_window_secs.max(1));
412 let now = Instant::now();
413 self.soul_change_turns.push_back(now);
414 while let Some(front) = self.soul_change_turns.front().copied() {
415 if now.duration_since(front) > window {
416 self.soul_change_turns.pop_front();
417 } else {
418 break;
419 }
420 }
421 self.soul_change_turns.len() >= self.soul_governance.frequent_change_threshold.max(1)
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use agent_diva_providers::{
429 LLMResponse, LiteLLMClient, Message, ProviderError, ProviderEventStream, ProviderResult,
430 };
431 use async_trait::async_trait;
432 use futures::stream;
433 use tokio::time::{timeout, Duration};
434
435 struct FailingStreamProvider;
436
437 #[async_trait]
438 impl LLMProvider for FailingStreamProvider {
439 async fn chat(
440 &self,
441 _messages: Vec<Message>,
442 _tools: Option<Vec<serde_json::Value>>,
443 _model: Option<String>,
444 _max_tokens: i32,
445 _temperature: f64,
446 ) -> ProviderResult<LLMResponse> {
447 Err(ProviderError::ApiError(
448 "chat should not be used".to_string(),
449 ))
450 }
451
452 async fn chat_stream(
453 &self,
454 _messages: Vec<Message>,
455 _tools: Option<Vec<serde_json::Value>>,
456 _model: Option<String>,
457 _max_tokens: i32,
458 _temperature: f64,
459 ) -> ProviderResult<ProviderEventStream> {
460 Ok(Box::pin(stream::iter(vec![Err(ProviderError::ApiError(
461 "simulated stream failure".to_string(),
462 ))])))
463 }
464
465 fn get_default_model(&self) -> String {
466 "test-model".to_string()
467 }
468 }
469
470 #[tokio::test]
471 async fn test_agent_loop_creation() {
472 let bus = MessageBus::new();
473 let provider = Arc::new(LiteLLMClient::default());
474 let workspace = PathBuf::from("/tmp/test");
475 let agent = AgentLoop::new(bus, provider, workspace, None, None);
476 assert_eq!(agent.max_iterations, 20);
477 }
478
479 #[tokio::test]
480 async fn test_process_direct() {
481 let bus = MessageBus::new();
482 let provider = Arc::new(LiteLLMClient::default());
483 let temp_dir = tempfile::tempdir().unwrap();
484 let workspace = temp_dir.path().to_path_buf();
485
486 let mut agent = AgentLoop::new(bus, provider, workspace, None, Some(1));
487
488 let result = agent
490 .process_direct("Hello", "cli:test", "cli", "test")
491 .await;
492
493 assert!(result.is_err());
495 }
496
497 #[test]
498 fn test_soul_governance_defaults_are_non_zero() {
499 let cfg = SoulGovernanceSettings::default();
500 assert!(cfg.frequent_change_window_secs > 0);
501 assert!(cfg.frequent_change_threshold > 0);
502 }
503
504 #[tokio::test]
505 async fn test_handle_inbound_emits_error_event_on_provider_failure() {
506 let bus = MessageBus::new();
507 let mut event_rx = bus.subscribe_events();
508 let provider = Arc::new(FailingStreamProvider);
509 let temp_dir = tempfile::tempdir().unwrap();
510 let workspace = temp_dir.path().to_path_buf();
511
512 let mut agent = AgentLoop::new(bus.clone(), provider, workspace, None, Some(1));
513 let msg = InboundMessage::new("gui", "user", "chat-1", "Hello");
514
515 agent.handle_inbound(msg).await;
516
517 let error_event = timeout(Duration::from_secs(1), async {
518 loop {
519 let bus_event = event_rx.recv().await.unwrap();
520 if let AgentEvent::Error { message } = bus_event.event {
521 break (bus_event.channel, bus_event.chat_id, message);
522 }
523 }
524 })
525 .await
526 .expect("timed out waiting for error event");
527
528 assert_eq!(error_event.0, "gui");
529 assert_eq!(error_event.1, "chat-1");
530 assert!(error_event.2.contains("simulated stream failure"));
531 }
532}