Skip to main content

ditto_os/agent/
mod.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::{error, info, warn};
6use uuid::Uuid;
7
8use crate::browser::BrowserManager;
9use crate::skills::SkillEngine;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Agent {
13    pub id: String,
14    pub name: String,
15    pub description: String,
16    pub api_key: String,
17    pub capabilities: Vec<AgentCapability>,
18    pub max_sessions: u32,
19    pub current_sessions: u32,
20    pub status: AgentStatus,
21    pub created_at: chrono::DateTime<chrono::Utc>,
22    pub last_activity: chrono::DateTime<chrono::Utc>,
23    pub metadata: HashMap<String, String>,
24    pub preferences: AgentPreferences,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub enum AgentCapability {
29    BrowserControl,
30    SkillExecution,
31    Screenshot,
32    JavaScript,
33    FileUpload,
34    NetworkAccess,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum AgentStatus {
39    Active,
40    Inactive,
41    Suspended,
42    Banned,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct AgentPreferences {
47    pub default_browser: String,
48    pub headless: bool,
49    pub viewport_width: u32,
50    pub viewport_height: u32,
51    pub timeout_seconds: u32,
52    pub auto_cleanup: bool,
53}
54
55impl Default for AgentPreferences {
56    fn default() -> Self {
57        Self {
58            default_browser: "chrome".to_string(),
59            headless: true,
60            viewport_width: 1920,
61            viewport_height: 1080,
62            timeout_seconds: 30,
63            auto_cleanup: true,
64        }
65    }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct AgentSession {
70    pub id: String,
71    pub agent_id: String,
72    pub browser_session_id: String,
73    pub created_at: chrono::DateTime<chrono::Utc>,
74    pub last_activity: chrono::DateTime<chrono::Utc>,
75    pub status: SessionStatus,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub enum SessionStatus {
80    Active,
81    Idle,
82    Error,
83    Closed,
84}
85
86pub struct AgentManager {
87    agents: Arc<RwLock<HashMap<String, Agent>>>,
88    sessions: Arc<RwLock<HashMap<String, AgentSession>>>,
89    browser_manager: Arc<BrowserManager>,
90    skill_engine: Arc<SkillEngine>,
91}
92
93impl AgentManager {
94    pub fn new(browser_manager: Arc<BrowserManager>, skill_engine: Arc<SkillEngine>) -> Self {
95        Self {
96            agents: Arc::new(RwLock::new(HashMap::new())),
97            sessions: Arc::new(RwLock::new(HashMap::new())),
98            browser_manager,
99            skill_engine,
100        }
101    }
102
103    pub async fn register_agent(&self, mut agent: Agent) -> Result<String, anyhow::Error> {
104        // Generate API key if not provided
105        if agent.api_key.is_empty() {
106            agent.api_key = format!("sk-{}", Uuid::new_v4().to_string().replace("-", ""));
107        }
108
109        let mut agents = self.agents.write().await;
110
111        if agents.contains_key(&agent.id) {
112            return Err(anyhow::anyhow!("Agent with ID {} already exists", agent.id));
113        }
114
115        agents.insert(agent.id.clone(), agent.clone());
116        info!("Registered agent: {} (ID: {})", agent.name, agent.id);
117
118        Ok(agent.id)
119    }
120
121    pub async fn authenticate_agent(&self, api_key: &str) -> Result<String, anyhow::Error> {
122        let agents = self.agents.read().await;
123
124        let agent = agents
125            .values()
126            .find(|a| a.api_key == api_key && a.status == AgentStatus::Active)
127            .ok_or_else(|| anyhow::anyhow!("Invalid or inactive API key"))?;
128
129        // Clone needed data before dropping the lock
130        let agent_id = agent.id.clone();
131        let agent_name = agent.name.clone();
132        drop(agents);
133
134        // Update last activity
135        self.update_agent_activity(&agent_id).await;
136
137        info!("Agent authenticated: {}", agent_name);
138        Ok(agent_id)
139    }
140
141    pub async fn get_agent(&self, agent_id: &str) -> Option<Agent> {
142        let agents = self.agents.read().await;
143        agents.get(agent_id).cloned()
144    }
145
146    pub async fn list_agents(&self, status: Option<AgentStatus>) -> Vec<Agent> {
147        let agents = self.agents.read().await;
148
149        agents
150            .values()
151            .filter(|agent| status.as_ref().is_none_or(|s| agent.status == *s))
152            .cloned()
153            .collect()
154    }
155
156    pub async fn update_agent(
157        &self,
158        agent_id: &str,
159        updates: AgentUpdate,
160    ) -> Result<(), anyhow::Error> {
161        let mut agents = self.agents.write().await;
162
163        if let Some(agent) = agents.get_mut(agent_id) {
164            if let Some(name) = updates.name {
165                agent.name = name;
166            }
167            if let Some(description) = updates.description {
168                agent.description = description;
169            }
170            if let Some(capabilities) = updates.capabilities {
171                agent.capabilities = capabilities;
172            }
173            if let Some(max_sessions) = updates.max_sessions {
174                agent.max_sessions = max_sessions;
175            }
176            if let Some(status) = updates.status {
177                agent.status = status;
178            }
179            if let Some(metadata) = updates.metadata {
180                agent.metadata = metadata;
181            }
182            if let Some(preferences) = updates.preferences {
183                agent.preferences = preferences;
184            }
185
186            info!("Updated agent: {}", agent_id);
187            Ok(())
188        } else {
189            Err(anyhow::anyhow!("Agent not found: {}", agent_id))
190        }
191    }
192
193    pub async fn suspend_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
194        self.update_agent_status(agent_id, AgentStatus::Suspended)
195            .await
196    }
197
198    pub async fn activate_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
199        self.update_agent_status(agent_id, AgentStatus::Active)
200            .await
201    }
202
203    pub async fn ban_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
204        // Close all sessions for the agent
205        self.close_all_agent_sessions(agent_id).await?;
206
207        // Update status
208        self.update_agent_status(agent_id, AgentStatus::Banned)
209            .await
210    }
211
212    pub async fn create_session(
213        &self,
214        agent_id: &str,
215        browser_type: Option<String>,
216    ) -> Result<String, anyhow::Error> {
217        let agent = {
218            let agents = self.agents.read().await;
219            agents
220                .get(agent_id)
221                .cloned()
222                .ok_or_else(|| anyhow::anyhow!("Agent not found: {}", agent_id))?
223        };
224
225        if agent.status != AgentStatus::Active {
226            return Err(anyhow::anyhow!("Agent {} is not active", agent_id));
227        }
228
229        // Check session limit
230        if agent.current_sessions >= agent.max_sessions {
231            return Err(anyhow::anyhow!(
232                "Agent {} has reached maximum session limit",
233                agent_id
234            ));
235        }
236
237        // Create browser session
238        let browser_type_str =
239            browser_type.unwrap_or_else(|| agent.preferences.default_browser.clone());
240        let browser_type = match browser_type_str.as_str() {
241            "chrome" => crate::browser::BrowserType::Chrome,
242            "firefox" => crate::browser::BrowserType::Firefox,
243            "safari" => crate::browser::BrowserType::Safari,
244            "edge" => crate::browser::BrowserType::Edge,
245            _ => crate::browser::BrowserType::Chrome,
246        };
247
248        let browser_session_id = self
249            .browser_manager
250            .create_session(agent_id.to_string(), browser_type)
251            .await?;
252
253        // Create agent session
254        let session_id = Uuid::new_v4().to_string();
255        let session = AgentSession {
256            id: session_id.clone(),
257            agent_id: agent_id.to_string(),
258            browser_session_id: browser_session_id.clone(),
259            created_at: chrono::Utc::now(),
260            last_activity: chrono::Utc::now(),
261            status: SessionStatus::Active,
262        };
263
264        {
265            let mut sessions = self.sessions.write().await;
266            sessions.insert(session_id.clone(), session);
267        }
268
269        // Update agent session count
270        self.increment_agent_sessions(agent_id).await;
271
272        info!("Created session {} for agent {}", session_id, agent_id);
273        Ok(session_id)
274    }
275
276    pub async fn close_session(&self, session_id: &str) -> Result<(), anyhow::Error> {
277        let session = {
278            let sessions = self.sessions.read().await;
279            sessions
280                .get(session_id)
281                .cloned()
282                .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
283        };
284
285        // Close browser session
286        if let Err(e) = self
287            .browser_manager
288            .close_session(&session.browser_session_id)
289            .await
290        {
291            warn!(
292                "Failed to close browser session {}: {}",
293                session.browser_session_id, e
294            );
295        }
296
297        // Remove agent session
298        {
299            let mut sessions = self.sessions.write().await;
300            sessions.remove(session_id);
301        }
302
303        // Update agent session count
304        self.decrement_agent_sessions(&session.agent_id).await;
305
306        info!(
307            "Closed session {} for agent {}",
308            session_id, session.agent_id
309        );
310        Ok(())
311    }
312
313    pub async fn get_session(&self, session_id: &str) -> Option<AgentSession> {
314        let sessions = self.sessions.read().await;
315        sessions.get(session_id).cloned()
316    }
317
318    pub async fn list_sessions(&self, agent_id: Option<&str>) -> Vec<AgentSession> {
319        let sessions = self.sessions.read().await;
320
321        sessions
322            .values()
323            .filter(|session| agent_id.is_none_or(|id| session.agent_id == id))
324            .cloned()
325            .collect()
326    }
327
328    pub async fn execute_browser_command(
329        &self,
330        session_id: &str,
331        action: String,
332        parameters: HashMap<String, serde_json::Value>,
333    ) -> Result<crate::browser::CommandResult, anyhow::Error> {
334        let session = {
335            let sessions = self.sessions.read().await;
336            sessions
337                .get(session_id)
338                .cloned()
339                .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
340        };
341
342        if session.status != SessionStatus::Active {
343            return Err(anyhow::anyhow!("Session {} is not active", session_id));
344        }
345
346        // Convert action to BrowserAction
347        let browser_action = self.action_to_browser_action(&action, parameters)?;
348
349        let command = crate::browser::BrowserCommand {
350            action: browser_action,
351            parameters: HashMap::new(),
352            timeout_ms: 30000,
353        };
354
355        let result = self
356            .browser_manager
357            .execute_command(&session.browser_session_id, command)
358            .await?;
359
360        // Update session activity
361        self.update_session_activity(session_id).await;
362
363        Ok(result)
364    }
365
366    pub async fn execute_skill(
367        &self,
368        agent_id: &str,
369        skill_id: &str,
370        parameters: HashMap<String, serde_json::Value>,
371    ) -> Result<String, anyhow::Error> {
372        // Create a temporary session for skill execution
373        let session_id = self.create_session(agent_id, None).await?;
374
375        // Execute the skill
376        let execution_id = self
377            .skill_engine
378            .execute_skill(
379                skill_id,
380                agent_id.to_string(),
381                session_id.clone(),
382                parameters,
383            )
384            .await?;
385
386        // Clean up the session after skill execution (in background)
387        let skill_engine = Arc::clone(&self.skill_engine);
388        let browser_manager = Arc::clone(&self.browser_manager);
389        let session_id_clone = session_id.clone();
390        let execution_id_clone = execution_id.clone();
391
392        tokio::spawn(async move {
393            // Wait for skill execution to complete
394            let mut attempts = 0;
395            while attempts < 300 {
396                // 5 minutes max
397                if let Some(execution) = skill_engine.get_execution(&execution_id_clone).await {
398                    match execution.status {
399                        crate::skills::ExecutionStatus::Completed
400                        | crate::skills::ExecutionStatus::Failed
401                        | crate::skills::ExecutionStatus::Cancelled => {
402                            break;
403                        }
404                        _ => {}
405                    }
406                }
407                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
408                attempts += 1;
409            }
410
411            // Close the session using browser manager directly
412            if let Err(e) = browser_manager.close_session(&session_id_clone).await {
413                warn!("Failed to cleanup session after skill execution: {}", e);
414            }
415        });
416
417        info!(
418            "Started skill execution {} for agent {}",
419            execution_id, agent_id
420        );
421        Ok(execution_id)
422    }
423
424    pub async fn get_agent_stats(&self) -> AgentStats {
425        let agents = self.agents.read().await;
426        let sessions = self.sessions.read().await;
427
428        let total_agents = agents.len();
429        let active_agents = agents
430            .values()
431            .filter(|a| a.status == AgentStatus::Active)
432            .count();
433        let suspended_agents = agents
434            .values()
435            .filter(|a| a.status == AgentStatus::Suspended)
436            .count();
437        let banned_agents = agents
438            .values()
439            .filter(|a| a.status == AgentStatus::Banned)
440            .count();
441
442        let total_sessions = sessions.len();
443        let active_sessions = sessions
444            .values()
445            .filter(|s| s.status == SessionStatus::Active)
446            .count();
447
448        AgentStats {
449            total_agents,
450            active_agents,
451            suspended_agents,
452            banned_agents,
453            total_sessions,
454            active_sessions,
455        }
456    }
457
458    async fn update_agent_activity(&self, agent_id: &str) {
459        let mut agents = self.agents.write().await;
460        if let Some(agent) = agents.get_mut(agent_id) {
461            agent.last_activity = chrono::Utc::now();
462        }
463    }
464
465    async fn update_session_activity(&self, session_id: &str) {
466        let mut sessions = self.sessions.write().await;
467        if let Some(session) = sessions.get_mut(session_id) {
468            session.last_activity = chrono::Utc::now();
469        }
470    }
471
472    async fn update_agent_status(
473        &self,
474        agent_id: &str,
475        status: AgentStatus,
476    ) -> Result<(), anyhow::Error> {
477        let mut agents = self.agents.write().await;
478        if let Some(agent) = agents.get_mut(agent_id) {
479            let status_clone = status.clone();
480            agent.status = status;
481            info!("Updated agent {} status to {:?}", agent_id, status_clone);
482            Ok(())
483        } else {
484            Err(anyhow::anyhow!("Agent {} not found", agent_id))
485        }
486    }
487
488    async fn increment_agent_sessions(&self, agent_id: &str) {
489        let mut agents = self.agents.write().await;
490        if let Some(agent) = agents.get_mut(agent_id) {
491            agent.current_sessions += 1;
492        }
493    }
494
495    async fn decrement_agent_sessions(&self, agent_id: &str) {
496        let mut agents = self.agents.write().await;
497        if let Some(agent) = agents.get_mut(agent_id) {
498            if agent.current_sessions > 0 {
499                agent.current_sessions -= 1;
500            }
501        }
502    }
503
504    async fn close_all_agent_sessions(&self, agent_id: &str) -> Result<(), anyhow::Error> {
505        let session_ids: Vec<String> = {
506            let sessions = self.sessions.read().await;
507            sessions
508                .values()
509                .filter(|s| s.agent_id == agent_id && s.status == SessionStatus::Active)
510                .map(|s| s.id.clone())
511                .collect()
512        };
513
514        for session_id in session_ids {
515            if let Err(e) = self.close_session(&session_id).await {
516                error!(
517                    "Failed to close session {} for agent {}: {}",
518                    session_id, agent_id, e
519                );
520            }
521        }
522
523        Ok(())
524    }
525
526    fn action_to_browser_action(
527        &self,
528        action: &str,
529        parameters: HashMap<String, serde_json::Value>,
530    ) -> Result<crate::browser::BrowserAction, anyhow::Error> {
531        match action {
532            "navigate" => {
533                let url = parameters
534                    .get("url")
535                    .and_then(|v| v.as_str())
536                    .ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter for navigate"))?;
537                Ok(crate::browser::BrowserAction::Navigate {
538                    url: url.to_string(),
539                })
540            }
541            "click" => {
542                let selector = parameters
543                    .get("selector")
544                    .and_then(|v| v.as_str())
545                    .ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for click"))?;
546                Ok(crate::browser::BrowserAction::Click {
547                    selector: selector.to_string(),
548                })
549            }
550            "type" => {
551                let selector = parameters
552                    .get("selector")
553                    .and_then(|v| v.as_str())
554                    .ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for type"))?;
555                let text = parameters
556                    .get("text")
557                    .and_then(|v| v.as_str())
558                    .ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter for type"))?;
559                Ok(crate::browser::BrowserAction::Type {
560                    selector: selector.to_string(),
561                    text: text.to_string(),
562                })
563            }
564            "screenshot" => {
565                let path = parameters
566                    .get("path")
567                    .and_then(|v| v.as_str())
568                    .map(|s| s.to_string());
569                Ok(crate::browser::BrowserAction::Screenshot { path })
570            }
571            "execute_script" => {
572                let script = parameters
573                    .get("script")
574                    .and_then(|v| v.as_str())
575                    .ok_or_else(|| {
576                        anyhow::anyhow!("Missing 'script' parameter for execute_script")
577                    })?;
578                Ok(crate::browser::BrowserAction::ExecuteScript {
579                    script: script.to_string(),
580                })
581            }
582            "get_title" => Ok(crate::browser::BrowserAction::GetTitle {}),
583            "get_url" => Ok(crate::browser::BrowserAction::GetUrl {}),
584            "refresh" => Ok(crate::browser::BrowserAction::Refresh {}),
585            "back" => Ok(crate::browser::BrowserAction::Back {}),
586            "forward" => Ok(crate::browser::BrowserAction::Forward {}),
587            _ => Err(anyhow::anyhow!("Unknown action: {}", action)),
588        }
589    }
590
591    // Initialize with default admin agent
592    pub async fn init_default_agent(&self) -> Result<String, anyhow::Error> {
593        let admin_agent = Agent {
594            id: "admin".to_string(),
595            name: "Default Admin Agent".to_string(),
596            description: "Default administrator agent with full capabilities".to_string(),
597            api_key: "sk-ditto-admin-2024".to_string(),
598            capabilities: vec![
599                AgentCapability::BrowserControl,
600                AgentCapability::SkillExecution,
601                AgentCapability::Screenshot,
602                AgentCapability::JavaScript,
603                AgentCapability::FileUpload,
604                AgentCapability::NetworkAccess,
605            ],
606            max_sessions: 10,
607            current_sessions: 0,
608            status: AgentStatus::Active,
609            created_at: chrono::Utc::now(),
610            last_activity: chrono::Utc::now(),
611            metadata: HashMap::new(),
612            preferences: AgentPreferences::default(),
613        };
614
615        let agent_id = admin_agent.id.clone();
616        self.register_agent(admin_agent).await?;
617        info!("Initialized default admin agent");
618
619        Ok(agent_id)
620    }
621}
622
623#[derive(Debug, Clone)]
624pub struct AgentUpdate {
625    pub name: Option<String>,
626    pub description: Option<String>,
627    pub capabilities: Option<Vec<AgentCapability>>,
628    pub max_sessions: Option<u32>,
629    pub status: Option<AgentStatus>,
630    pub metadata: Option<HashMap<String, String>>,
631    pub preferences: Option<AgentPreferences>,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AgentStats {
636    pub total_agents: usize,
637    pub active_agents: usize,
638    pub suspended_agents: usize,
639    pub banned_agents: usize,
640    pub total_sessions: usize,
641    pub active_sessions: usize,
642}