Skip to main content

agent_diva_agent/
agent_loop.rs

1//! Agent loop: the core processing engine
2
3use agent_diva_core::bus::{AgentEvent, InboundMessage, MessageBus, OutboundMessage};
4use agent_diva_core::config::MCPServerConfig;
5use agent_diva_core::cron::CronService;
6use agent_diva_core::error_context::ErrorContext;
7use agent_diva_core::security::{SecurityConfig, SecurityLevel, SecurityPolicy};
8use agent_diva_core::session::SessionManager;
9use agent_diva_files::{FileConfig, FileManager};
10use agent_diva_providers::LLMProvider;
11use agent_diva_tools::{
12    load_mcp_tools_sync, CronTool, EditFileTool, ExecTool, ListDirTool, ReadFileTool, SpawnTool,
13    ToolError, ToolRegistry, WriteFileTool,
14};
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::mpsc;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use crate::consolidation;
24use crate::context::{ContextBuilder, SoulContextSettings};
25use crate::runtime_control::RuntimeControlCommand;
26use crate::subagent::SubagentManager;
27use crate::tool_config::network::NetworkToolConfig;
28
29mod loop_runtime_control;
30mod loop_tools;
31mod loop_turn;
32
33/// Configuration for tool setup
34#[derive(Clone)]
35pub struct ToolConfig {
36    /// Network tool runtime config
37    pub network: NetworkToolConfig,
38    /// Shell execution timeout in seconds
39    pub exec_timeout: u64,
40    /// Whether to restrict file access to workspace
41    pub restrict_to_workspace: bool,
42    /// Configured MCP servers
43    pub mcp_servers: HashMap<String, MCPServerConfig>,
44    /// Optional cron service for scheduling tools
45    pub cron_service: Option<Arc<CronService>>,
46    /// Soul context settings
47    pub soul_context: SoulContextSettings,
48    /// Whether to append transparent notifications on soul updates
49    pub notify_on_soul_change: bool,
50    /// Governance behavior for soul evolution transparency
51    pub soul_governance: SoulGovernanceSettings,
52}
53
54impl Default for ToolConfig {
55    fn default() -> Self {
56        Self {
57            network: NetworkToolConfig::default(),
58            exec_timeout: 60,
59            restrict_to_workspace: false,
60            mcp_servers: HashMap::new(),
61            cron_service: None,
62            soul_context: SoulContextSettings::default(),
63            notify_on_soul_change: true,
64            soul_governance: SoulGovernanceSettings::default(),
65        }
66    }
67}
68
69/// Runtime soft-governance settings for soul evolution.
70#[derive(Clone, Debug)]
71pub struct SoulGovernanceSettings {
72    /// Rolling window in seconds for "frequent changes" hints.
73    pub frequent_change_window_secs: u64,
74    /// Minimum number of soul-changing turns in window to trigger hints.
75    pub frequent_change_threshold: usize,
76    /// Add a confirmation hint when SOUL.md changes.
77    pub boundary_confirmation_hint: bool,
78}
79
80impl Default for SoulGovernanceSettings {
81    fn default() -> Self {
82        Self {
83            frequent_change_window_secs: 600,
84            frequent_change_threshold: 3,
85            boundary_confirmation_hint: true,
86        }
87    }
88}
89
90/// The agent loop is the core processing engine
91pub struct AgentLoop {
92    bus: MessageBus,
93    provider: Arc<dyn LLMProvider>,
94    #[allow(dead_code)]
95    workspace: PathBuf,
96    #[allow(dead_code)]
97    model: String,
98    max_iterations: usize,
99    memory_window: usize,
100    context: ContextBuilder,
101    sessions: SessionManager,
102    tools: ToolRegistry,
103    subagent_manager: Arc<SubagentManager>,
104    runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
105    cancelled_sessions: HashSet<String>,
106    notify_on_soul_change: bool,
107    soul_governance: SoulGovernanceSettings,
108    soul_change_turns: VecDeque<Instant>,
109    file_manager: Arc<FileManager>,
110}
111
112impl AgentLoop {
113    /// Create a new agent loop
114    pub async fn new(
115        bus: MessageBus,
116        provider: Arc<dyn LLMProvider>,
117        workspace: PathBuf,
118        model: Option<String>,
119        max_iterations: Option<usize>,
120    ) -> Result<Self, Box<dyn std::error::Error>> {
121        let model = model.unwrap_or_else(|| provider.get_default_model());
122        let mut context = ContextBuilder::with_skills(workspace.clone(), None);
123        context.set_soul_settings(SoulContextSettings::default());
124        let sessions = SessionManager::new(workspace.clone());
125        let tools = ToolRegistry::new();
126
127        let subagent_manager = Arc::new(SubagentManager::new(
128            provider.clone(),
129            workspace.clone(),
130            bus.clone(),
131            Some(model.clone()),
132            NetworkToolConfig::default(),
133            None,
134            false,
135        ));
136
137        // Initialize file manager for attachment handling
138        let storage_path = dirs::data_local_dir()
139            .map(|p| p.join("agent-diva").join("files"))
140            .unwrap_or_else(|| PathBuf::from(".agent-diva/files"));
141        let file_config = FileConfig::with_path(&storage_path);
142        let file_manager = Arc::new(FileManager::new(file_config).await?);
143
144        Ok(Self {
145            bus,
146            provider,
147            workspace,
148            model,
149            max_iterations: max_iterations.unwrap_or(20),
150            memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
151            context,
152            sessions,
153            tools,
154            subagent_manager,
155            runtime_control_rx: None,
156            cancelled_sessions: HashSet::new(),
157            notify_on_soul_change: true,
158            soul_governance: SoulGovernanceSettings::default(),
159            soul_change_turns: VecDeque::new(),
160            file_manager,
161        })
162    }
163
164    /// Get the file manager
165    pub fn file_manager(&self) -> Arc<FileManager> {
166        self.file_manager.clone()
167    }
168
169    /// Create a new agent loop with tool configuration
170    #[allow(clippy::too_many_arguments)]
171    pub async fn with_tools(
172        bus: MessageBus,
173        provider: Arc<dyn LLMProvider>,
174        workspace: PathBuf,
175        model: Option<String>,
176        max_iterations: Option<usize>,
177        tool_config: ToolConfig,
178        runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
179        file_manager: Arc<FileManager>,
180    ) -> Result<Self, Box<dyn std::error::Error>> {
181        let model = model.unwrap_or_else(|| provider.get_default_model());
182        let mut context = ContextBuilder::with_skills(workspace.clone(), None);
183        context.set_soul_settings(tool_config.soul_context.clone());
184        let sessions = SessionManager::new(workspace.clone());
185        let mut tools = ToolRegistry::new();
186
187        let subagent_manager = Arc::new(SubagentManager::new(
188            provider.clone(),
189            workspace.clone(),
190            bus.clone(),
191            Some(model.clone()),
192            tool_config.network.clone(),
193            Some(tool_config.exec_timeout),
194            tool_config.restrict_to_workspace,
195        ));
196
197        // Register spawn tool
198        let sm = subagent_manager.clone();
199        tools.register(Arc::new(SpawnTool::new(
200            move |task, label, channel, chat_id| {
201                let sm = sm.clone();
202                async move {
203                    sm.spawn(task, label, channel, chat_id)
204                        .await
205                        .map_err(|e| ToolError::ExecutionFailed(e.to_string()))
206                }
207            },
208        )));
209
210        // Register file system tools with SecurityPolicy
211        let security_config = if tool_config.restrict_to_workspace {
212            SecurityConfig {
213                level: SecurityLevel::Standard,
214                workspace_only: true,
215                ..SecurityConfig::default()
216            }
217        } else {
218            SecurityConfig::default()
219        };
220        let security = Arc::new(SecurityPolicy::with_config(
221            workspace.clone(),
222            security_config,
223        ));
224        tools.register(Arc::new(ReadFileTool::new(security.clone())));
225        tools.register(Arc::new(WriteFileTool::new(security.clone())));
226        tools.register(Arc::new(EditFileTool::new(security.clone())));
227        tools.register(Arc::new(ListDirTool::new(security)));
228
229        // Register shell tool
230        tools.register(Arc::new(ExecTool::with_config(
231            tool_config.exec_timeout,
232            Some(workspace.clone()),
233            tool_config.restrict_to_workspace,
234        )));
235
236        // Register web tools
237        Self::register_web_tools(&mut tools, &tool_config.network);
238
239        // Register MCP tools discovered from configured servers
240        for mcp_tool in load_mcp_tools_sync(&tool_config.mcp_servers) {
241            tools.register(mcp_tool);
242        }
243
244        // Register cron tool when scheduling is configured
245        if let Some(cron_service) = tool_config.cron_service.clone() {
246            tools.register(Arc::new(CronTool::new(cron_service)));
247        }
248
249        Ok(Self {
250            bus,
251            provider,
252            workspace,
253            model,
254            max_iterations: max_iterations.unwrap_or(20),
255            memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
256            context,
257            sessions,
258            tools,
259            subagent_manager,
260            runtime_control_rx,
261            cancelled_sessions: HashSet::new(),
262            notify_on_soul_change: tool_config.notify_on_soul_change,
263            soul_governance: tool_config.soul_governance,
264            soul_change_turns: VecDeque::new(),
265            file_manager,
266        })
267    }
268
269    /// Run the agent loop, processing messages from the bus
270    pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
271        info!("Agent loop started");
272
273        // Take the inbound receiver
274        let Some(mut inbound_rx) = self.bus.take_inbound_receiver().await else {
275            error!("Failed to take inbound receiver");
276            return Err("Inbound receiver already taken".into());
277        };
278
279        loop {
280            if let Some(control_rx) = self.runtime_control_rx.as_mut() {
281                tokio::select! {
282                    control = control_rx.recv() => {
283                        match control {
284                            Some(cmd) => self.handle_runtime_control_command(cmd).await,
285                            None => {
286                                info!("Runtime control channel closed");
287                                self.runtime_control_rx = None;
288                            }
289                        }
290                    }
291                    maybe_msg = inbound_rx.recv() => {
292                        match maybe_msg {
293                            Some(msg) => self.handle_inbound(msg).await,
294                            None => {
295                                info!("Message bus closed, stopping agent loop");
296                                break;
297                            }
298                        }
299                    }
300                }
301            } else {
302                match tokio::time::timeout(std::time::Duration::from_secs(1), inbound_rx.recv())
303                    .await
304                {
305                    Ok(Some(msg)) => self.handle_inbound(msg).await,
306                    Ok(None) => {
307                        info!("Message bus closed, stopping agent loop");
308                        break;
309                    }
310                    Err(_) => continue,
311                }
312            }
313        }
314
315        info!("Agent loop stopped");
316        Ok(())
317    }
318
319    async fn handle_inbound(&mut self, msg: InboundMessage) {
320        debug!("Received message from {}:{}", msg.channel, msg.chat_id);
321        let event_msg = msg.clone();
322        match self.process_inbound_message(msg, None).await {
323            Ok(Some(response)) => {
324                if let Err(e) = self.bus.publish_outbound(response) {
325                    error!("Failed to publish response: {}", e);
326                }
327            }
328            Ok(None) => debug!("No response needed"),
329            Err(e) => {
330                let error_message = format!("Failed to process message: {}", e);
331                let ctx = ErrorContext::new("handle_inbound", &error_message)
332                    .with_metadata("channel", event_msg.channel.clone())
333                    .with_metadata("chat_id", event_msg.chat_id.clone())
334                    .with_metadata("sender_id", event_msg.sender_id.clone());
335                error!("{}", ctx.to_detailed_string());
336                self.emit_error_event(&event_msg, None, error_message);
337            }
338        }
339    }
340
341    /// Process a single inbound message
342    pub async fn process_inbound_message(
343        &mut self,
344        msg: InboundMessage,
345        event_tx: Option<&mpsc::UnboundedSender<AgentEvent>>,
346    ) -> Result<Option<OutboundMessage>, Box<dyn std::error::Error>> {
347        let trace_id = Uuid::new_v4().to_string();
348        use tracing::Instrument;
349        let span = tracing::info_span!("AgentSpan", trace_id = %trace_id);
350
351        self.process_inbound_message_inner(msg, event_tx, trace_id)
352            .instrument(span)
353            .await
354    }
355
356    /// Process a message directly (for CLI or testing)
357    pub async fn process_direct(
358        &mut self,
359        content: impl Into<String>,
360        _session_key: impl Into<String>,
361        channel: impl Into<String>,
362        chat_id: impl Into<String>,
363    ) -> Result<String, Box<dyn std::error::Error>> {
364        let content = content.into();
365        let channel = channel.into();
366        let chat_id = chat_id.into();
367
368        let msg = InboundMessage::new(channel, "user", chat_id, content);
369
370        let response = self.process_inbound_message(msg, None).await?;
371        Ok(response
372            .map(|r| {
373                let content = r.content;
374                if let Some(reasoning) = r.reasoning_content {
375                    if !reasoning.is_empty() {
376                        return format!("<think>\n{}\n</think>\n\n{}", reasoning, content);
377                    }
378                }
379                content
380            })
381            .unwrap_or_default())
382    }
383
384    /// Process a message directly and emit streaming events for UI consumers.
385    pub async fn process_direct_stream(
386        &mut self,
387        content: impl Into<String>,
388        _session_key: impl Into<String>,
389        channel: impl Into<String>,
390        chat_id: impl Into<String>,
391        event_tx: mpsc::UnboundedSender<AgentEvent>,
392    ) -> Result<String, Box<dyn std::error::Error>> {
393        let content = content.into();
394        let channel = channel.into();
395        let chat_id = chat_id.into();
396
397        let msg = InboundMessage::new(channel, "user", chat_id, content);
398
399        match self.process_inbound_message(msg, Some(&event_tx)).await {
400            Ok(response) => Ok(response.map(|r| r.content).unwrap_or_default()),
401            Err(err) => {
402                let _ = event_tx.send(AgentEvent::Error {
403                    message: err.to_string(),
404                });
405                Err(err)
406            }
407        }
408    }
409
410    fn is_frequent_soul_change_turn(&mut self) -> bool {
411        let window = Duration::from_secs(self.soul_governance.frequent_change_window_secs.max(1));
412        let now = Instant::now();
413        self.soul_change_turns.push_back(now);
414        while let Some(front) = self.soul_change_turns.front().copied() {
415            if now.duration_since(front) > window {
416                self.soul_change_turns.pop_front();
417            } else {
418                break;
419            }
420        }
421        self.soul_change_turns.len() >= self.soul_governance.frequent_change_threshold.max(1)
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use agent_diva_providers::{
429        LLMResponse, LiteLLMClient, Message, ProviderError, ProviderEventStream, ProviderResult,
430    };
431    use async_trait::async_trait;
432    use futures::stream;
433    use tokio::time::{timeout, Duration};
434
435    struct FailingStreamProvider;
436
437    #[async_trait]
438    impl LLMProvider for FailingStreamProvider {
439        async fn chat(
440            &self,
441            _messages: Vec<Message>,
442            _tools: Option<Vec<serde_json::Value>>,
443            _model: Option<String>,
444            _max_tokens: i32,
445            _temperature: f64,
446        ) -> ProviderResult<LLMResponse> {
447            Err(ProviderError::ApiError(
448                "chat should not be used".to_string(),
449            ))
450        }
451
452        async fn chat_stream(
453            &self,
454            _messages: Vec<Message>,
455            _tools: Option<Vec<serde_json::Value>>,
456            _model: Option<String>,
457            _max_tokens: i32,
458            _temperature: f64,
459        ) -> ProviderResult<ProviderEventStream> {
460            Ok(Box::pin(stream::iter(vec![Err(ProviderError::ApiError(
461                "simulated stream failure".to_string(),
462            ))])))
463        }
464
465        fn get_default_model(&self) -> String {
466            "test-model".to_string()
467        }
468    }
469
470    #[tokio::test]
471    async fn test_agent_loop_creation() {
472        let bus = MessageBus::new();
473        let provider = Arc::new(LiteLLMClient::default());
474        let workspace = PathBuf::from("/tmp/test");
475        let agent = AgentLoop::new(bus, provider, workspace, None, None);
476        assert_eq!(agent.max_iterations, 20);
477    }
478
479    #[tokio::test]
480    async fn test_process_direct() {
481        let bus = MessageBus::new();
482        let provider = Arc::new(LiteLLMClient::default());
483        let temp_dir = tempfile::tempdir().unwrap();
484        let workspace = temp_dir.path().to_path_buf();
485
486        let mut agent = AgentLoop::new(bus, provider, workspace, None, Some(1));
487
488        // This will fail to connect to LLM, but tests the structure
489        let result = agent
490            .process_direct("Hello", "cli:test", "cli", "test")
491            .await;
492
493        // We expect an error since we don't have a real LLM connection
494        assert!(result.is_err());
495    }
496
497    #[test]
498    fn test_soul_governance_defaults_are_non_zero() {
499        let cfg = SoulGovernanceSettings::default();
500        assert!(cfg.frequent_change_window_secs > 0);
501        assert!(cfg.frequent_change_threshold > 0);
502    }
503
504    #[tokio::test]
505    async fn test_handle_inbound_emits_error_event_on_provider_failure() {
506        let bus = MessageBus::new();
507        let mut event_rx = bus.subscribe_events();
508        let provider = Arc::new(FailingStreamProvider);
509        let temp_dir = tempfile::tempdir().unwrap();
510        let workspace = temp_dir.path().to_path_buf();
511
512        let mut agent = AgentLoop::new(bus.clone(), provider, workspace, None, Some(1));
513        let msg = InboundMessage::new("gui", "user", "chat-1", "Hello");
514
515        agent.handle_inbound(msg).await;
516
517        let error_event = timeout(Duration::from_secs(1), async {
518            loop {
519                let bus_event = event_rx.recv().await.unwrap();
520                if let AgentEvent::Error { message } = bus_event.event {
521                    break (bus_event.channel, bus_event.chat_id, message);
522                }
523            }
524        })
525        .await
526        .expect("timed out waiting for error event");
527
528        assert_eq!(error_event.0, "gui");
529        assert_eq!(error_event.1, "chat-1");
530        assert!(error_event.2.contains("simulated stream failure"));
531    }
532}