Skip to main content

agent_diva_agent/
agent_loop.rs

1//! Agent loop: the core processing engine
2
3use agent_diva_core::bus::{AgentEvent, InboundMessage, MessageBus, OutboundMessage};
4use agent_diva_core::config::MCPServerConfig;
5use agent_diva_core::cron::CronService;
6use agent_diva_core::error_context::ErrorContext;
7use agent_diva_core::session::SessionManager;
8use agent_diva_files::{FileConfig, FileManager};
9use agent_diva_providers::LLMProvider;
10use agent_diva_tooling::{ToolError, ToolRegistry};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use crate::consolidation;
20use crate::context::{ContextBuilder, SoulContextSettings};
21use crate::runtime_control::RuntimeControlCommand;
22use crate::subagent::SubagentManager;
23use crate::tool_assembly::{SubagentSpawner, ToolAssembly};
24use crate::tool_config::builtin::BuiltInToolsConfig;
25use crate::tool_config::network::NetworkToolConfig;
26
27mod loop_runtime_control;
28mod loop_tools;
29mod loop_turn;
30
31/// Configuration for tool setup
32#[derive(Clone)]
33pub struct ToolConfig {
34    /// Built-in tool toggles.
35    pub builtin: BuiltInToolsConfig,
36    /// Network tool runtime config
37    pub network: NetworkToolConfig,
38    /// Shell execution timeout in seconds
39    pub exec_timeout: u64,
40    /// Whether to restrict file access to workspace
41    pub restrict_to_workspace: bool,
42    /// Configured MCP servers
43    pub mcp_servers: HashMap<String, MCPServerConfig>,
44    /// Optional cron service for scheduling tools
45    pub cron_service: Option<Arc<CronService>>,
46    /// Soul context settings
47    pub soul_context: SoulContextSettings,
48    /// Whether to append transparent notifications on soul updates
49    pub notify_on_soul_change: bool,
50    /// Governance behavior for soul evolution transparency
51    pub soul_governance: SoulGovernanceSettings,
52}
53
54impl Default for ToolConfig {
55    fn default() -> Self {
56        Self {
57            builtin: BuiltInToolsConfig::default(),
58            network: NetworkToolConfig::default(),
59            exec_timeout: 60,
60            restrict_to_workspace: false,
61            mcp_servers: HashMap::new(),
62            cron_service: None,
63            soul_context: SoulContextSettings::default(),
64            notify_on_soul_change: true,
65            soul_governance: SoulGovernanceSettings::default(),
66        }
67    }
68}
69
70/// Runtime soft-governance settings for soul evolution.
71#[derive(Clone, Debug)]
72pub struct SoulGovernanceSettings {
73    /// Rolling window in seconds for "frequent changes" hints.
74    pub frequent_change_window_secs: u64,
75    /// Minimum number of soul-changing turns in window to trigger hints.
76    pub frequent_change_threshold: usize,
77    /// Add a confirmation hint when SOUL.md changes.
78    pub boundary_confirmation_hint: bool,
79}
80
81impl Default for SoulGovernanceSettings {
82    fn default() -> Self {
83        Self {
84            frequent_change_window_secs: 600,
85            frequent_change_threshold: 3,
86            boundary_confirmation_hint: true,
87        }
88    }
89}
90
91/// The agent loop is the core processing engine
92pub struct AgentLoop {
93    bus: MessageBus,
94    provider: Arc<dyn LLMProvider>,
95    #[allow(dead_code)]
96    workspace: PathBuf,
97    #[allow(dead_code)]
98    model: String,
99    max_iterations: usize,
100    memory_window: usize,
101    context: ContextBuilder,
102    sessions: SessionManager,
103    tool_config: ToolConfig,
104    tools: ToolRegistry,
105    subagent_manager: Arc<SubagentManager>,
106    runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
107    cancelled_sessions: HashSet<String>,
108    notify_on_soul_change: bool,
109    soul_governance: SoulGovernanceSettings,
110    soul_change_turns: VecDeque<Instant>,
111    file_manager: Arc<FileManager>,
112}
113
114pub struct AgentLoopToolSet {
115    pub registry: ToolRegistry,
116    pub config: ToolConfig,
117}
118
119struct SubagentManagerSpawner {
120    manager: Arc<SubagentManager>,
121}
122
123#[async_trait::async_trait]
124impl SubagentSpawner for SubagentManagerSpawner {
125    async fn spawn(
126        &self,
127        task: String,
128        label: Option<String>,
129        channel: String,
130        chat_id: String,
131    ) -> Result<String, ToolError> {
132        self.manager
133            .spawn(task, label, channel, chat_id)
134            .await
135            .map_err(|e| ToolError::ExecutionFailed(e.to_string()))
136    }
137}
138
139impl AgentLoop {
140    /// Create a new agent loop
141    pub async fn new(
142        bus: MessageBus,
143        provider: Arc<dyn LLMProvider>,
144        workspace: PathBuf,
145        model: Option<String>,
146        max_iterations: Option<usize>,
147    ) -> Result<Self, Box<dyn std::error::Error>> {
148        let model = model.unwrap_or_else(|| provider.get_default_model());
149        let mut context = ContextBuilder::with_skills(workspace.clone(), None);
150        context.set_soul_settings(SoulContextSettings::default());
151        let sessions = SessionManager::new(workspace.clone());
152        let tools = ToolRegistry::new();
153
154        let subagent_manager = Arc::new(SubagentManager::new(
155            provider.clone(),
156            workspace.clone(),
157            bus.clone(),
158            Some(model.clone()),
159            BuiltInToolsConfig::default().for_subagent(),
160            NetworkToolConfig::default(),
161            None,
162            false,
163            HashMap::new(),
164        ));
165
166        // Initialize file manager for attachment handling
167        let storage_path = dirs::data_local_dir()
168            .map(|p| p.join("agent-diva").join("files"))
169            .unwrap_or_else(|| PathBuf::from(".agent-diva/files"));
170        let file_config = FileConfig::with_path(&storage_path);
171        let file_manager = Arc::new(FileManager::new(file_config).await?);
172
173        Ok(Self {
174            bus,
175            provider,
176            workspace,
177            model,
178            max_iterations: max_iterations.unwrap_or(20),
179            memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
180            context,
181            sessions,
182            tool_config: ToolConfig::default(),
183            tools,
184            subagent_manager,
185            runtime_control_rx: None,
186            cancelled_sessions: HashSet::new(),
187            notify_on_soul_change: true,
188            soul_governance: SoulGovernanceSettings::default(),
189            soul_change_turns: VecDeque::new(),
190            file_manager,
191        })
192    }
193
194    /// Get the file manager
195    pub fn file_manager(&self) -> Arc<FileManager> {
196        self.file_manager.clone()
197    }
198
199    /// Create a new agent loop with tool configuration
200    #[allow(clippy::too_many_arguments)]
201    pub async fn with_tools(
202        bus: MessageBus,
203        provider: Arc<dyn LLMProvider>,
204        workspace: PathBuf,
205        model: Option<String>,
206        max_iterations: Option<usize>,
207        tool_config: ToolConfig,
208        runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
209        file_manager: Arc<FileManager>,
210    ) -> Result<Self, Box<dyn std::error::Error>> {
211        let model = model.unwrap_or_else(|| provider.get_default_model());
212        let mut context = ContextBuilder::with_skills(workspace.clone(), None);
213        context.set_soul_settings(tool_config.soul_context.clone());
214        let sessions = SessionManager::new(workspace.clone());
215
216        let subagent_manager = Arc::new(SubagentManager::new(
217            provider.clone(),
218            workspace.clone(),
219            bus.clone(),
220            Some(model.clone()),
221            tool_config.builtin.for_subagent(),
222            tool_config.network.clone(),
223            Some(tool_config.exec_timeout),
224            tool_config.restrict_to_workspace,
225            tool_config.mcp_servers.clone(),
226        ));
227
228        let spawner = Arc::new(SubagentManagerSpawner {
229            manager: subagent_manager.clone(),
230        });
231        let tools = ToolAssembly::new(workspace.clone())
232            .builtin(tool_config.builtin.clone())
233            .with_network_config(tool_config.network.clone())
234            .with_exec_timeout(tool_config.exec_timeout)
235            .restrict_to_workspace(tool_config.restrict_to_workspace)
236            .mcp_servers(tool_config.mcp_servers.clone())
237            .with_subagent_spawner(spawner)
238            .with_file_manager(file_manager.clone())
239            .with_tools(Vec::new())
240            .build();
241
242        let mut agent = Self {
243            bus,
244            provider,
245            workspace,
246            model,
247            max_iterations: max_iterations.unwrap_or(20),
248            memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
249            context,
250            sessions,
251            tool_config: tool_config.clone(),
252            tools,
253            subagent_manager,
254            runtime_control_rx,
255            cancelled_sessions: HashSet::new(),
256            notify_on_soul_change: tool_config.notify_on_soul_change,
257            soul_governance: tool_config.soul_governance,
258            soul_change_turns: VecDeque::new(),
259            file_manager,
260        };
261
262        if let Some(cron_service) = agent.tool_config.cron_service.clone() {
263            agent.tools = ToolAssembly::new(agent.workspace.clone())
264                .builtin(agent.tool_config.builtin.clone())
265                .with_network_config(agent.tool_config.network.clone())
266                .with_exec_timeout(agent.tool_config.exec_timeout)
267                .restrict_to_workspace(agent.tool_config.restrict_to_workspace)
268                .mcp_servers(agent.tool_config.mcp_servers.clone())
269                .with_subagent_spawner(Arc::new(SubagentManagerSpawner {
270                    manager: agent.subagent_manager.clone(),
271                }))
272                .with_cron_service(cron_service)
273                .with_file_manager(agent.file_manager.clone())
274                .build();
275        }
276
277        Ok(agent)
278    }
279
280    #[allow(clippy::too_many_arguments)]
281    pub async fn with_toolset(
282        bus: MessageBus,
283        provider: Arc<dyn LLMProvider>,
284        workspace: PathBuf,
285        model: Option<String>,
286        max_iterations: Option<usize>,
287        toolset: AgentLoopToolSet,
288        runtime_control_rx: Option<mpsc::UnboundedReceiver<RuntimeControlCommand>>,
289        file_manager: Arc<FileManager>,
290    ) -> Result<Self, Box<dyn std::error::Error>> {
291        let model = model.unwrap_or_else(|| provider.get_default_model());
292        let mut context = ContextBuilder::with_skills(workspace.clone(), None);
293        context.set_soul_settings(toolset.config.soul_context.clone());
294        let sessions = SessionManager::new(workspace.clone());
295        let subagent_manager = Arc::new(SubagentManager::new(
296            provider.clone(),
297            workspace.clone(),
298            bus.clone(),
299            Some(model.clone()),
300            toolset.config.builtin.for_subagent(),
301            toolset.config.network.clone(),
302            Some(toolset.config.exec_timeout),
303            toolset.config.restrict_to_workspace,
304            toolset.config.mcp_servers.clone(),
305        ));
306
307        Ok(Self {
308            bus,
309            provider,
310            workspace,
311            model,
312            max_iterations: max_iterations.unwrap_or(20),
313            memory_window: consolidation::DEFAULT_MEMORY_WINDOW,
314            context,
315            sessions,
316            tool_config: toolset.config.clone(),
317            tools: toolset.registry,
318            subagent_manager,
319            runtime_control_rx,
320            cancelled_sessions: HashSet::new(),
321            notify_on_soul_change: toolset.config.notify_on_soul_change,
322            soul_governance: toolset.config.soul_governance,
323            soul_change_turns: VecDeque::new(),
324            file_manager,
325        })
326    }
327
328    /// Run the agent loop, processing messages from the bus
329    pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
330        info!("Agent loop started");
331
332        // Take the inbound receiver
333        let Some(mut inbound_rx) = self.bus.take_inbound_receiver().await else {
334            error!("Failed to take inbound receiver");
335            return Err("Inbound receiver already taken".into());
336        };
337
338        loop {
339            if let Some(control_rx) = self.runtime_control_rx.as_mut() {
340                tokio::select! {
341                    control = control_rx.recv() => {
342                        match control {
343                            Some(cmd) => self.handle_runtime_control_command(cmd).await,
344                            None => {
345                                info!("Runtime control channel closed");
346                                self.runtime_control_rx = None;
347                            }
348                        }
349                    }
350                    maybe_msg = inbound_rx.recv() => {
351                        match maybe_msg {
352                            Some(msg) => self.handle_inbound(msg).await,
353                            None => {
354                                info!("Message bus closed, stopping agent loop");
355                                break;
356                            }
357                        }
358                    }
359                }
360            } else {
361                match tokio::time::timeout(std::time::Duration::from_secs(1), inbound_rx.recv())
362                    .await
363                {
364                    Ok(Some(msg)) => self.handle_inbound(msg).await,
365                    Ok(None) => {
366                        info!("Message bus closed, stopping agent loop");
367                        break;
368                    }
369                    Err(_) => continue,
370                }
371            }
372        }
373
374        info!("Agent loop stopped");
375        Ok(())
376    }
377
378    async fn handle_inbound(&mut self, msg: InboundMessage) {
379        debug!("Received message from {}:{}", msg.channel, msg.chat_id);
380        let event_msg = msg.clone();
381        match self.process_inbound_message(msg, None).await {
382            Ok(Some(response)) => {
383                if let Err(e) = self.bus.publish_outbound(response) {
384                    error!("Failed to publish response: {}", e);
385                }
386            }
387            Ok(None) => debug!("No response needed"),
388            Err(e) => {
389                let error_message = format!("Failed to process message: {}", e);
390                let ctx = ErrorContext::new("handle_inbound", &error_message)
391                    .with_metadata("channel", event_msg.channel.clone())
392                    .with_metadata("chat_id", event_msg.chat_id.clone())
393                    .with_metadata("sender_id", event_msg.sender_id.clone());
394                error!("{}", ctx.to_detailed_string());
395                self.emit_error_event(&event_msg, None, error_message);
396            }
397        }
398    }
399
400    /// Process a single inbound message
401    pub async fn process_inbound_message(
402        &mut self,
403        msg: InboundMessage,
404        event_tx: Option<&mpsc::UnboundedSender<AgentEvent>>,
405    ) -> Result<Option<OutboundMessage>, Box<dyn std::error::Error>> {
406        let trace_id = Uuid::new_v4().to_string();
407        use tracing::Instrument;
408        let span = tracing::info_span!("AgentSpan", trace_id = %trace_id);
409
410        self.process_inbound_message_inner(msg, event_tx, trace_id)
411            .instrument(span)
412            .await
413    }
414
415    /// Process a message directly (for CLI or testing)
416    pub async fn process_direct(
417        &mut self,
418        content: impl Into<String>,
419        _session_key: impl Into<String>,
420        channel: impl Into<String>,
421        chat_id: impl Into<String>,
422    ) -> Result<String, Box<dyn std::error::Error>> {
423        let content = content.into();
424        let channel = channel.into();
425        let chat_id = chat_id.into();
426
427        let msg = InboundMessage::new(channel, "user", chat_id, content);
428
429        let response = self.process_inbound_message(msg, None).await?;
430        Ok(response
431            .map(|r| {
432                let content = r.content;
433                if let Some(reasoning) = r.reasoning_content {
434                    if !reasoning.is_empty() {
435                        return format!("<think>\n{}\n</think>\n\n{}", reasoning, content);
436                    }
437                }
438                content
439            })
440            .unwrap_or_default())
441    }
442
443    /// Process a message directly and emit streaming events for UI consumers.
444    pub async fn process_direct_stream(
445        &mut self,
446        content: impl Into<String>,
447        _session_key: impl Into<String>,
448        channel: impl Into<String>,
449        chat_id: impl Into<String>,
450        event_tx: mpsc::UnboundedSender<AgentEvent>,
451    ) -> Result<String, Box<dyn std::error::Error>> {
452        let content = content.into();
453        let channel = channel.into();
454        let chat_id = chat_id.into();
455
456        let msg = InboundMessage::new(channel, "user", chat_id, content);
457
458        match self.process_inbound_message(msg, Some(&event_tx)).await {
459            Ok(response) => Ok(response.map(|r| r.content).unwrap_or_default()),
460            Err(err) => {
461                let _ = event_tx.send(AgentEvent::Error {
462                    message: err.to_string(),
463                });
464                Err(err)
465            }
466        }
467    }
468
469    fn is_frequent_soul_change_turn(&mut self) -> bool {
470        let window = Duration::from_secs(self.soul_governance.frequent_change_window_secs.max(1));
471        let now = Instant::now();
472        self.soul_change_turns.push_back(now);
473        while let Some(front) = self.soul_change_turns.front().copied() {
474            if now.duration_since(front) > window {
475                self.soul_change_turns.pop_front();
476            } else {
477                break;
478            }
479        }
480        self.soul_change_turns.len() >= self.soul_governance.frequent_change_threshold.max(1)
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use agent_diva_providers::{
488        LLMResponse, LiteLLMClient, Message, ProviderError, ProviderEventStream, ProviderResult,
489    };
490    use async_trait::async_trait;
491    use futures::stream;
492    use tokio::time::{timeout, Duration};
493
494    struct FailingStreamProvider;
495
496    #[async_trait]
497    impl LLMProvider for FailingStreamProvider {
498        async fn chat(
499            &self,
500            _messages: Vec<Message>,
501            _tools: Option<Vec<serde_json::Value>>,
502            _model: Option<String>,
503            _max_tokens: i32,
504            _temperature: f64,
505        ) -> ProviderResult<LLMResponse> {
506            Err(ProviderError::ApiError(
507                "chat should not be used".to_string(),
508            ))
509        }
510
511        async fn chat_stream(
512            &self,
513            _messages: Vec<Message>,
514            _tools: Option<Vec<serde_json::Value>>,
515            _model: Option<String>,
516            _max_tokens: i32,
517            _temperature: f64,
518        ) -> ProviderResult<ProviderEventStream> {
519            Ok(Box::pin(stream::iter(vec![Err(ProviderError::ApiError(
520                "simulated stream failure".to_string(),
521            ))])))
522        }
523
524        fn get_default_model(&self) -> String {
525            "test-model".to_string()
526        }
527    }
528
529    #[tokio::test]
530    async fn test_agent_loop_creation() {
531        let bus = MessageBus::new();
532        let provider = Arc::new(LiteLLMClient::default());
533        let workspace = PathBuf::from("/tmp/test");
534        let agent = AgentLoop::new(bus, provider, workspace, None, None)
535            .await
536            .unwrap();
537        assert_eq!(agent.max_iterations, 20);
538    }
539
540    #[tokio::test]
541    async fn test_process_direct() {
542        let bus = MessageBus::new();
543        let provider = Arc::new(LiteLLMClient::default());
544        let temp_dir = tempfile::tempdir().unwrap();
545        let workspace = temp_dir.path().to_path_buf();
546
547        let mut agent = AgentLoop::new(bus, provider, workspace, None, Some(1))
548            .await
549            .unwrap();
550
551        // This will fail to connect to LLM, but tests the structure
552        let result = agent
553            .process_direct("Hello", "cli:test", "cli", "test")
554            .await;
555
556        // We expect an error since we don't have a real LLM connection
557        assert!(result.is_err());
558    }
559
560    #[test]
561    fn test_soul_governance_defaults_are_non_zero() {
562        let cfg = SoulGovernanceSettings::default();
563        assert!(cfg.frequent_change_window_secs > 0);
564        assert!(cfg.frequent_change_threshold > 0);
565    }
566
567    #[tokio::test]
568    async fn test_handle_inbound_emits_error_event_on_provider_failure() {
569        let bus = MessageBus::new();
570        let mut event_rx = bus.subscribe_events();
571        let provider = Arc::new(FailingStreamProvider);
572        let temp_dir = tempfile::tempdir().unwrap();
573        let workspace = temp_dir.path().to_path_buf();
574
575        let mut agent = AgentLoop::new(bus.clone(), provider, workspace, None, Some(1))
576            .await
577            .unwrap();
578        let msg = InboundMessage::new("gui", "user", "chat-1", "Hello");
579
580        agent.handle_inbound(msg).await;
581
582        let error_event = timeout(Duration::from_secs(1), async {
583            loop {
584                let bus_event = event_rx.recv().await.unwrap();
585                if let AgentEvent::Error { message } = bus_event.event {
586                    break (bus_event.channel, bus_event.chat_id, message);
587                }
588            }
589        })
590        .await
591        .expect("timed out waiting for error event");
592
593        assert_eq!(error_event.0, "gui");
594        assert_eq!(error_event.1, "chat-1");
595        assert!(error_event.2.contains("simulated stream failure"));
596    }
597}