reasonkit-core 0.1.8

The Reasoning Engine — Auditable Reasoning for Production AI | Rust-Native | Turn Prompts into Protocols
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
//! # Agency Module
//!
//! Autonomous agency and background task management for the ARF platform.
//! This module enables the system to spawn and manage autonomous agents for long-running tasks.

use crate::arf::types::*;
use crate::error::Result;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sled::Db;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio::task::JoinHandle;

/// Autonomous agent for background task execution
#[derive(Debug, Serialize, Deserialize)]
pub struct AutonomousAgent {
    id: String,
    task_type: AgentTaskType,
    status: AgentStatus,
    progress: f64,
    findings: Vec<AgentFinding>,
    start_time: chrono::DateTime<chrono::Utc>,
    #[serde(skip, default)]
    handle: Option<JoinHandle<Result<()>>>,
}

/// Types of autonomous tasks
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentTaskType {
    Research(String),       // Research a specific topic
    DataCollection(String), // Collect data from sources
    Analysis(String),       // Analyze existing data
    Monitoring(String),     // Monitor for changes
    Synthesis(String),      // Synthesize information
}

/// Agent execution status
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentStatus {
    Idle,
    Running,
    Completed,
    Failed(String),
    Terminated,
}

/// Findings from autonomous agent work
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentFinding {
    pub timestamp: chrono::DateTime<chrono::Utc>,
    pub category: String,
    pub content: String,
    pub confidence: f64,
    pub source: String,
}

/// Agency manager for coordinating autonomous agents
pub struct AgencyManager {
    agents: Arc<RwLock<HashMap<String, AutonomousAgent>>>,
    task_queue: mpsc::UnboundedSender<AgentTask>,
    task_receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<AgentTask>>>>,
    database: Arc<Db>,
    http_client: Client,
    max_concurrent_agents: usize,
}

#[derive(Debug)]
struct AgentTask {
    agent_id: String,
    task_type: AgentTaskType,
    _priority: TaskPriority,
}

#[derive(Debug, Clone)]
pub enum TaskPriority {
    Low,
    Normal,
    High,
    Critical,
}

impl AgencyManager {
    /// Create a new agency manager
    pub async fn new(database_path: &str, max_concurrent: usize) -> Result<Self> {
        let database = sled::open(database_path)?;
        let (tx, rx) = mpsc::unbounded_channel();

        let manager = Self {
            agents: Arc::new(RwLock::new(HashMap::new())),
            task_queue: tx,
            task_receiver: Arc::new(RwLock::new(Some(rx))),
            database: Arc::new(database),
            http_client: Client::new(),
            max_concurrent_agents: max_concurrent,
        };

        // Start the task dispatcher
        manager.start_task_dispatcher();

        Ok(manager)
    }

    /// Spawn a new autonomous agent
    pub async fn spawn_agent(
        &self,
        task_type: AgentTaskType,
        priority: TaskPriority,
    ) -> Result<String> {
        let agent_id = format!("agent_{}", uuid::Uuid::new_v4().simple());

        let agent = AutonomousAgent {
            id: agent_id.clone(),
            task_type: task_type.clone(),
            status: AgentStatus::Idle,
            progress: 0.0,
            findings: Vec::new(),
            start_time: chrono::Utc::now(),
            handle: None,
        };

        // Check concurrent agent limit
        let agents = self.agents.read().await;
        let running_count = agents
            .values()
            .filter(|a| matches!(a.status, AgentStatus::Running))
            .count();

        if running_count >= self.max_concurrent_agents {
            return Err(ArfError::engine("Maximum concurrent agents reached"));
        }
        drop(agents);

        // Add agent to registry
        let mut agents = self.agents.write().await;
        agents.insert(agent_id.clone(), agent);

        // Queue the task
        let task = AgentTask {
            agent_id: agent_id.clone(),
            task_type,
            _priority: priority,
        };

        self.task_queue.send(task)?;

        tracing::info!("Spawned autonomous agent: {}", agent_id);

        Ok(agent_id)
    }

    /// Get agent status
    pub async fn get_agent_status(&self, agent_id: &str) -> Result<AgentStatus> {
        let agents = self.agents.read().await;
        agents
            .get(agent_id)
            .map(|a| a.status.clone())
            .ok_or_else(|| ArfError::engine("Agent not found"))
    }

    /// Get agent findings
    pub async fn get_agent_findings(&self, agent_id: &str) -> Result<Vec<AgentFinding>> {
        let agents = self.agents.read().await;
        agents
            .get(agent_id)
            .map(|a| a.findings.clone())
            .ok_or_else(|| ArfError::engine("Agent not found"))
    }

    /// Terminate an agent
    pub async fn terminate_agent(&self, agent_id: &str) -> Result<()> {
        let mut agents = self.agents.write().await;

        if let Some(agent) = agents.get_mut(agent_id) {
            agent.status = AgentStatus::Terminated;

            if let Some(handle) = agent.handle.take() {
                handle.abort();
            }

            // Save final state
            self.save_agent_state(agent).await?;
        }

        Ok(())
    }

    /// List all agents
    pub async fn list_agents(&self) -> Vec<(String, AgentStatus)> {
        let agents = self.agents.read().await;
        agents
            .iter()
            .map(|(id, agent)| (id.clone(), agent.status.clone()))
            .collect()
    }

    /// Start the task dispatcher
    fn start_task_dispatcher(&self) {
        let agents = Arc::clone(&self.agents);
        let database = Arc::clone(&self.database);
        let http_client = self.http_client.clone();
        let task_receiver = Arc::clone(&self.task_receiver);

        tokio::spawn(async move {
            let receiver = {
                let mut guard = task_receiver.write().await;
                guard.take()
            };

            let Some(mut receiver) = receiver else {
                tracing::warn!("Agency task receiver already taken; dispatcher not started");
                return;
            };

            while let Some(task) = receiver.recv().await {
                let agents_clone = Arc::clone(&agents);
                let db_clone = Arc::clone(&database);
                let client_clone = http_client.clone();

                tokio::spawn(async move {
                    if let Err(e) =
                        Self::execute_agent_task(task, agents_clone, db_clone, client_clone).await
                    {
                        tracing::error!("Agent task execution failed: {}", e);
                    }
                });
            }
        });
    }

    /// Execute an agent task
    async fn execute_agent_task(
        task: AgentTask,
        agents: Arc<RwLock<HashMap<String, AutonomousAgent>>>,
        database: Arc<Db>,
        http_client: Client,
    ) -> Result<()> {
        // Get agent
        let mut agents_lock = agents.write().await;
        let agent = agents_lock
            .get_mut(&task.agent_id)
            .ok_or_else(|| ArfError::engine("Agent not found during execution"))?;

        // Update status
        agent.status = AgentStatus::Running;

        // Execute based on task type
        let result = match &task.task_type {
            AgentTaskType::Research(topic) => {
                Self::execute_research_task(agent, topic, &http_client).await
            }
            AgentTaskType::DataCollection(source) => {
                Self::execute_data_collection_task(agent, source, &http_client).await
            }
            AgentTaskType::Analysis(data) => Self::execute_analysis_task(agent, data).await,
            AgentTaskType::Monitoring(target) => {
                Self::execute_monitoring_task(agent, target, &http_client).await
            }
            AgentTaskType::Synthesis(topic) => {
                Self::execute_synthesis_task(agent, topic, &database).await
            }
        };

        // Update final status
        match result {
            Ok(_) => {
                agent.status = AgentStatus::Completed;
                agent.progress = 1.0;
            }
            Err(e) => {
                agent.status = AgentStatus::Failed(e.to_string());
            }
        }

        // Save final state
        Self::save_agent_state_static(agent, &database).await?;

        Ok(())
    }

    /// Execute research task
    async fn execute_research_task(
        agent: &mut AutonomousAgent,
        topic: &str,
        http_client: &Client,
    ) -> Result<()> {
        // Simulate research by searching web
        let search_query = format!("{} research latest developments", topic);
        let findings = Self::web_search(http_client, &search_query).await?;

        for finding in findings {
            agent.findings.push(finding);
            agent.progress += 0.1; // Simulate progress
        }

        Ok(())
    }

    /// Execute data collection task
    async fn execute_data_collection_task(
        agent: &mut AutonomousAgent,
        source: &str,
        http_client: &Client,
    ) -> Result<()> {
        // Collect data from specified source
        let response = http_client.get(source).send().await?;
        let content = response.text().await?;

        let finding = AgentFinding {
            timestamp: chrono::Utc::now(),
            category: "data_collection".to_string(),
            content: content.chars().take(1000).collect(), // Limit size
            confidence: 0.8,
            source: source.to_string(),
        };

        agent.findings.push(finding);
        agent.progress = 1.0;

        Ok(())
    }

    /// Execute analysis task
    async fn execute_analysis_task(agent: &mut AutonomousAgent, data: &str) -> Result<()> {
        // Simple analysis - count words, find patterns
        let word_count = data.split_whitespace().count();
        let has_numbers = data.chars().any(|c| c.is_numeric());

        let analysis = format!(
            "Word count: {}, Contains numbers: {}",
            word_count, has_numbers
        );

        let finding = AgentFinding {
            timestamp: chrono::Utc::now(),
            category: "analysis".to_string(),
            content: analysis,
            confidence: 0.9,
            source: "data_analysis".to_string(),
        };

        agent.findings.push(finding);
        agent.progress = 1.0;

        Ok(())
    }

    /// Execute monitoring task
    async fn execute_monitoring_task(
        agent: &mut AutonomousAgent,
        target: &str,
        http_client: &Client,
    ) -> Result<()> {
        // Monitor target for changes (simplified)
        let response = http_client.get(target).send().await?;
        let status = response.status();

        let finding = AgentFinding {
            timestamp: chrono::Utc::now(),
            category: "monitoring".to_string(),
            content: format!("Status: {}", status),
            confidence: if status.is_success() { 0.9 } else { 0.5 },
            source: target.to_string(),
        };

        agent.findings.push(finding);
        agent.progress = 1.0;

        Ok(())
    }

    /// Execute synthesis task
    async fn execute_synthesis_task(
        agent: &mut AutonomousAgent,
        topic: &str,
        database: &Db,
    ) -> Result<()> {
        // Synthesize information from database
        let key = format!("synthesis_{}", topic);
        let existing_data = database.get(key.as_bytes())?;

        let synthesis = if let Some(data) = existing_data {
            format!("Synthesized data for {}: {} bytes", topic, data.len())
        } else {
            format!("No existing data found for synthesis of {}", topic)
        };

        let finding = AgentFinding {
            timestamp: chrono::Utc::now(),
            category: "synthesis".to_string(),
            content: synthesis,
            confidence: 0.7,
            source: "knowledge_base".to_string(),
        };

        agent.findings.push(finding);
        agent.progress = 1.0;

        Ok(())
    }

    /// Perform web search (simplified)
    async fn web_search(_http_client: &Client, query: &str) -> Result<Vec<AgentFinding>> {
        // In a real implementation, this would search actual web sources
        // For simulation, we'll create mock findings
        let findings = vec![AgentFinding {
            timestamp: chrono::Utc::now(),
            category: "research".to_string(),
            content: format!("Latest research on {} shows promising developments", query),
            confidence: 0.8,
            source: "web_search".to_string(),
        }];

        Ok(findings)
    }

    /// Save agent state
    async fn save_agent_state(&self, agent: &AutonomousAgent) -> Result<()> {
        Self::save_agent_state_static(agent, &self.database).await
    }

    /// Static method to save agent state
    async fn save_agent_state_static(agent: &AutonomousAgent, database: &Db) -> Result<()> {
        let key = format!("agent_{}", agent.id);
        let value = serde_json::to_string(agent)?;
        database.insert(key.as_bytes(), value.as_bytes())?;
        Ok(())
    }

    /// Get agency statistics
    pub async fn get_statistics(&self) -> Result<AgencyStats> {
        let agents = self.agents.read().await;

        let total_agents = agents.len();
        let running_agents = agents
            .values()
            .filter(|a| matches!(a.status, AgentStatus::Running))
            .count();
        let completed_agents = agents
            .values()
            .filter(|a| matches!(a.status, AgentStatus::Completed))
            .count();
        let failed_agents = agents
            .values()
            .filter(|a| matches!(a.status, AgentStatus::Failed(_)))
            .count();

        Ok(AgencyStats {
            total_agents,
            running_agents,
            completed_agents,
            failed_agents,
            total_findings: agents.values().map(|a| a.findings.len()).sum(),
        })
    }
}

/// Agency statistics
#[derive(Debug, Clone)]
pub struct AgencyStats {
    pub total_agents: usize,
    pub running_agents: usize,
    pub completed_agents: usize,
    pub failed_agents: usize,
    pub total_findings: usize,
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[tokio::test]
    async fn test_agency_manager_creation() {
        let temp_dir = TempDir::new().unwrap();
        let db_path = temp_dir.path().join("test.db");

        let manager = AgencyManager::new(db_path.to_str().unwrap(), 5)
            .await
            .unwrap();
        let stats = manager.get_statistics().await.unwrap();
        assert_eq!(stats.total_agents, 0);
    }

    #[tokio::test]
    async fn test_agent_spawning() {
        let temp_dir = TempDir::new().unwrap();
        let db_path = temp_dir.path().join("test.db");

        let manager = AgencyManager::new(db_path.to_str().unwrap(), 5)
            .await
            .unwrap();

        let agent_id = manager
            .spawn_agent(
                AgentTaskType::Research("test topic".to_string()),
                TaskPriority::Normal,
            )
            .await
            .unwrap();

        assert!(!agent_id.is_empty());

        let status = manager.get_agent_status(&agent_id).await.unwrap();
        // Status might be Idle or Running depending on timing
        assert!(matches!(status, AgentStatus::Idle | AgentStatus::Running));
    }
}