1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tracing::{error, info, warn};
6use uuid::Uuid;
7
8use crate::browser::BrowserManager;
9use crate::skills::SkillEngine;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Agent {
13 pub id: String,
14 pub name: String,
15 pub description: String,
16 pub api_key: String,
17 pub capabilities: Vec<AgentCapability>,
18 pub max_sessions: u32,
19 pub current_sessions: u32,
20 pub status: AgentStatus,
21 pub created_at: chrono::DateTime<chrono::Utc>,
22 pub last_activity: chrono::DateTime<chrono::Utc>,
23 pub metadata: HashMap<String, String>,
24 pub preferences: AgentPreferences,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub enum AgentCapability {
29 BrowserControl,
30 SkillExecution,
31 Screenshot,
32 JavaScript,
33 FileUpload,
34 NetworkAccess,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum AgentStatus {
39 Active,
40 Inactive,
41 Suspended,
42 Banned,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct AgentPreferences {
47 pub default_browser: String,
48 pub headless: bool,
49 pub viewport_width: u32,
50 pub viewport_height: u32,
51 pub timeout_seconds: u32,
52 pub auto_cleanup: bool,
53}
54
55impl Default for AgentPreferences {
56 fn default() -> Self {
57 Self {
58 default_browser: "chrome".to_string(),
59 headless: true,
60 viewport_width: 1920,
61 viewport_height: 1080,
62 timeout_seconds: 30,
63 auto_cleanup: true,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct AgentSession {
70 pub id: String,
71 pub agent_id: String,
72 pub browser_session_id: String,
73 pub created_at: chrono::DateTime<chrono::Utc>,
74 pub last_activity: chrono::DateTime<chrono::Utc>,
75 pub status: SessionStatus,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub enum SessionStatus {
80 Active,
81 Idle,
82 Error,
83 Closed,
84}
85
86pub struct AgentManager {
87 agents: Arc<RwLock<HashMap<String, Agent>>>,
88 sessions: Arc<RwLock<HashMap<String, AgentSession>>>,
89 browser_manager: Arc<BrowserManager>,
90 skill_engine: Arc<SkillEngine>,
91}
92
93impl AgentManager {
94 pub fn new(browser_manager: Arc<BrowserManager>, skill_engine: Arc<SkillEngine>) -> Self {
95 Self {
96 agents: Arc::new(RwLock::new(HashMap::new())),
97 sessions: Arc::new(RwLock::new(HashMap::new())),
98 browser_manager,
99 skill_engine,
100 }
101 }
102
103 pub async fn register_agent(&self, mut agent: Agent) -> Result<String, anyhow::Error> {
104 if agent.api_key.is_empty() {
106 agent.api_key = format!("sk-{}", Uuid::new_v4().to_string().replace("-", ""));
107 }
108
109 let mut agents = self.agents.write().await;
110
111 if agents.contains_key(&agent.id) {
112 return Err(anyhow::anyhow!("Agent with ID {} already exists", agent.id));
113 }
114
115 agents.insert(agent.id.clone(), agent.clone());
116 info!("Registered agent: {} (ID: {})", agent.name, agent.id);
117
118 Ok(agent.id)
119 }
120
121 pub async fn authenticate_agent(&self, api_key: &str) -> Result<String, anyhow::Error> {
122 let agents = self.agents.read().await;
123
124 let agent = agents
125 .values()
126 .find(|a| a.api_key == api_key && a.status == AgentStatus::Active)
127 .ok_or_else(|| anyhow::anyhow!("Invalid or inactive API key"))?;
128
129 let agent_id = agent.id.clone();
131 let agent_name = agent.name.clone();
132 drop(agents);
133
134 self.update_agent_activity(&agent_id).await;
136
137 info!("Agent authenticated: {}", agent_name);
138 Ok(agent_id)
139 }
140
141 pub async fn get_agent(&self, agent_id: &str) -> Option<Agent> {
142 let agents = self.agents.read().await;
143 agents.get(agent_id).cloned()
144 }
145
146 pub async fn list_agents(&self, status: Option<AgentStatus>) -> Vec<Agent> {
147 let agents = self.agents.read().await;
148
149 agents
150 .values()
151 .filter(|agent| status.as_ref().is_none_or(|s| agent.status == *s))
152 .cloned()
153 .collect()
154 }
155
156 pub async fn update_agent(
157 &self,
158 agent_id: &str,
159 updates: AgentUpdate,
160 ) -> Result<(), anyhow::Error> {
161 let mut agents = self.agents.write().await;
162
163 if let Some(agent) = agents.get_mut(agent_id) {
164 if let Some(name) = updates.name {
165 agent.name = name;
166 }
167 if let Some(description) = updates.description {
168 agent.description = description;
169 }
170 if let Some(capabilities) = updates.capabilities {
171 agent.capabilities = capabilities;
172 }
173 if let Some(max_sessions) = updates.max_sessions {
174 agent.max_sessions = max_sessions;
175 }
176 if let Some(status) = updates.status {
177 agent.status = status;
178 }
179 if let Some(metadata) = updates.metadata {
180 agent.metadata = metadata;
181 }
182 if let Some(preferences) = updates.preferences {
183 agent.preferences = preferences;
184 }
185
186 info!("Updated agent: {}", agent_id);
187 Ok(())
188 } else {
189 Err(anyhow::anyhow!("Agent not found: {}", agent_id))
190 }
191 }
192
193 pub async fn suspend_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
194 self.update_agent_status(agent_id, AgentStatus::Suspended)
195 .await
196 }
197
198 pub async fn activate_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
199 self.update_agent_status(agent_id, AgentStatus::Active)
200 .await
201 }
202
203 pub async fn ban_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
204 self.close_all_agent_sessions(agent_id).await?;
206
207 self.update_agent_status(agent_id, AgentStatus::Banned)
209 .await
210 }
211
212 pub async fn create_session(
213 &self,
214 agent_id: &str,
215 browser_type: Option<String>,
216 ) -> Result<String, anyhow::Error> {
217 let agent = {
218 let agents = self.agents.read().await;
219 agents
220 .get(agent_id)
221 .cloned()
222 .ok_or_else(|| anyhow::anyhow!("Agent not found: {}", agent_id))?
223 };
224
225 if agent.status != AgentStatus::Active {
226 return Err(anyhow::anyhow!("Agent {} is not active", agent_id));
227 }
228
229 if agent.current_sessions >= agent.max_sessions {
231 return Err(anyhow::anyhow!(
232 "Agent {} has reached maximum session limit",
233 agent_id
234 ));
235 }
236
237 let browser_type_str =
239 browser_type.unwrap_or_else(|| agent.preferences.default_browser.clone());
240 let browser_type = match browser_type_str.as_str() {
241 "chrome" => crate::browser::BrowserType::Chrome,
242 "firefox" => crate::browser::BrowserType::Firefox,
243 "safari" => crate::browser::BrowserType::Safari,
244 "edge" => crate::browser::BrowserType::Edge,
245 _ => crate::browser::BrowserType::Chrome,
246 };
247
248 let browser_session_id = self
249 .browser_manager
250 .create_session(agent_id.to_string(), browser_type)
251 .await?;
252
253 let session_id = Uuid::new_v4().to_string();
255 let session = AgentSession {
256 id: session_id.clone(),
257 agent_id: agent_id.to_string(),
258 browser_session_id: browser_session_id.clone(),
259 created_at: chrono::Utc::now(),
260 last_activity: chrono::Utc::now(),
261 status: SessionStatus::Active,
262 };
263
264 {
265 let mut sessions = self.sessions.write().await;
266 sessions.insert(session_id.clone(), session);
267 }
268
269 self.increment_agent_sessions(agent_id).await;
271
272 info!("Created session {} for agent {}", session_id, agent_id);
273 Ok(session_id)
274 }
275
276 pub async fn close_session(&self, session_id: &str) -> Result<(), anyhow::Error> {
277 let session = {
278 let sessions = self.sessions.read().await;
279 sessions
280 .get(session_id)
281 .cloned()
282 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
283 };
284
285 if let Err(e) = self
287 .browser_manager
288 .close_session(&session.browser_session_id)
289 .await
290 {
291 warn!(
292 "Failed to close browser session {}: {}",
293 session.browser_session_id, e
294 );
295 }
296
297 {
299 let mut sessions = self.sessions.write().await;
300 sessions.remove(session_id);
301 }
302
303 self.decrement_agent_sessions(&session.agent_id).await;
305
306 info!(
307 "Closed session {} for agent {}",
308 session_id, session.agent_id
309 );
310 Ok(())
311 }
312
313 pub async fn get_session(&self, session_id: &str) -> Option<AgentSession> {
314 let sessions = self.sessions.read().await;
315 sessions.get(session_id).cloned()
316 }
317
318 pub async fn list_sessions(&self, agent_id: Option<&str>) -> Vec<AgentSession> {
319 let sessions = self.sessions.read().await;
320
321 sessions
322 .values()
323 .filter(|session| agent_id.is_none_or(|id| session.agent_id == id))
324 .cloned()
325 .collect()
326 }
327
328 pub async fn execute_browser_command(
329 &self,
330 session_id: &str,
331 action: String,
332 parameters: HashMap<String, serde_json::Value>,
333 ) -> Result<crate::browser::CommandResult, anyhow::Error> {
334 let session = {
335 let sessions = self.sessions.read().await;
336 sessions
337 .get(session_id)
338 .cloned()
339 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
340 };
341
342 if session.status != SessionStatus::Active {
343 return Err(anyhow::anyhow!("Session {} is not active", session_id));
344 }
345
346 let browser_action = self.action_to_browser_action(&action, parameters)?;
348
349 let command = crate::browser::BrowserCommand {
350 action: browser_action,
351 parameters: HashMap::new(),
352 timeout_ms: 30000,
353 };
354
355 let result = self
356 .browser_manager
357 .execute_command(&session.browser_session_id, command)
358 .await?;
359
360 self.update_session_activity(session_id).await;
362
363 Ok(result)
364 }
365
366 pub async fn execute_skill(
367 &self,
368 agent_id: &str,
369 skill_id: &str,
370 parameters: HashMap<String, serde_json::Value>,
371 ) -> Result<String, anyhow::Error> {
372 let session_id = self.create_session(agent_id, None).await?;
374
375 let execution_id = self
377 .skill_engine
378 .execute_skill(
379 skill_id,
380 agent_id.to_string(),
381 session_id.clone(),
382 parameters,
383 )
384 .await?;
385
386 let skill_engine = Arc::clone(&self.skill_engine);
388 let browser_manager = Arc::clone(&self.browser_manager);
389 let session_id_clone = session_id.clone();
390 let execution_id_clone = execution_id.clone();
391
392 tokio::spawn(async move {
393 let mut attempts = 0;
395 while attempts < 300 {
396 if let Some(execution) = skill_engine.get_execution(&execution_id_clone).await {
398 match execution.status {
399 crate::skills::ExecutionStatus::Completed
400 | crate::skills::ExecutionStatus::Failed
401 | crate::skills::ExecutionStatus::Cancelled => {
402 break;
403 }
404 _ => {}
405 }
406 }
407 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
408 attempts += 1;
409 }
410
411 if let Err(e) = browser_manager.close_session(&session_id_clone).await {
413 warn!("Failed to cleanup session after skill execution: {}", e);
414 }
415 });
416
417 info!(
418 "Started skill execution {} for agent {}",
419 execution_id, agent_id
420 );
421 Ok(execution_id)
422 }
423
424 pub async fn get_agent_stats(&self) -> AgentStats {
425 let agents = self.agents.read().await;
426 let sessions = self.sessions.read().await;
427
428 let total_agents = agents.len();
429 let active_agents = agents
430 .values()
431 .filter(|a| a.status == AgentStatus::Active)
432 .count();
433 let suspended_agents = agents
434 .values()
435 .filter(|a| a.status == AgentStatus::Suspended)
436 .count();
437 let banned_agents = agents
438 .values()
439 .filter(|a| a.status == AgentStatus::Banned)
440 .count();
441
442 let total_sessions = sessions.len();
443 let active_sessions = sessions
444 .values()
445 .filter(|s| s.status == SessionStatus::Active)
446 .count();
447
448 AgentStats {
449 total_agents,
450 active_agents,
451 suspended_agents,
452 banned_agents,
453 total_sessions,
454 active_sessions,
455 }
456 }
457
458 async fn update_agent_activity(&self, agent_id: &str) {
459 let mut agents = self.agents.write().await;
460 if let Some(agent) = agents.get_mut(agent_id) {
461 agent.last_activity = chrono::Utc::now();
462 }
463 }
464
465 async fn update_session_activity(&self, session_id: &str) {
466 let mut sessions = self.sessions.write().await;
467 if let Some(session) = sessions.get_mut(session_id) {
468 session.last_activity = chrono::Utc::now();
469 }
470 }
471
472 async fn update_agent_status(
473 &self,
474 agent_id: &str,
475 status: AgentStatus,
476 ) -> Result<(), anyhow::Error> {
477 let mut agents = self.agents.write().await;
478 if let Some(agent) = agents.get_mut(agent_id) {
479 let status_clone = status.clone();
480 agent.status = status;
481 info!("Updated agent {} status to {:?}", agent_id, status_clone);
482 Ok(())
483 } else {
484 Err(anyhow::anyhow!("Agent {} not found", agent_id))
485 }
486 }
487
488 async fn increment_agent_sessions(&self, agent_id: &str) {
489 let mut agents = self.agents.write().await;
490 if let Some(agent) = agents.get_mut(agent_id) {
491 agent.current_sessions += 1;
492 }
493 }
494
495 async fn decrement_agent_sessions(&self, agent_id: &str) {
496 let mut agents = self.agents.write().await;
497 if let Some(agent) = agents.get_mut(agent_id) {
498 if agent.current_sessions > 0 {
499 agent.current_sessions -= 1;
500 }
501 }
502 }
503
504 async fn close_all_agent_sessions(&self, agent_id: &str) -> Result<(), anyhow::Error> {
505 let session_ids: Vec<String> = {
506 let sessions = self.sessions.read().await;
507 sessions
508 .values()
509 .filter(|s| s.agent_id == agent_id && s.status == SessionStatus::Active)
510 .map(|s| s.id.clone())
511 .collect()
512 };
513
514 for session_id in session_ids {
515 if let Err(e) = self.close_session(&session_id).await {
516 error!(
517 "Failed to close session {} for agent {}: {}",
518 session_id, agent_id, e
519 );
520 }
521 }
522
523 Ok(())
524 }
525
526 fn action_to_browser_action(
527 &self,
528 action: &str,
529 parameters: HashMap<String, serde_json::Value>,
530 ) -> Result<crate::browser::BrowserAction, anyhow::Error> {
531 match action {
532 "navigate" => {
533 let url = parameters
534 .get("url")
535 .and_then(|v| v.as_str())
536 .ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter for navigate"))?;
537 Ok(crate::browser::BrowserAction::Navigate {
538 url: url.to_string(),
539 })
540 }
541 "click" => {
542 let selector = parameters
543 .get("selector")
544 .and_then(|v| v.as_str())
545 .ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for click"))?;
546 Ok(crate::browser::BrowserAction::Click {
547 selector: selector.to_string(),
548 })
549 }
550 "type" => {
551 let selector = parameters
552 .get("selector")
553 .and_then(|v| v.as_str())
554 .ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for type"))?;
555 let text = parameters
556 .get("text")
557 .and_then(|v| v.as_str())
558 .ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter for type"))?;
559 Ok(crate::browser::BrowserAction::Type {
560 selector: selector.to_string(),
561 text: text.to_string(),
562 })
563 }
564 "screenshot" => {
565 let path = parameters
566 .get("path")
567 .and_then(|v| v.as_str())
568 .map(|s| s.to_string());
569 Ok(crate::browser::BrowserAction::Screenshot { path })
570 }
571 "execute_script" => {
572 let script = parameters
573 .get("script")
574 .and_then(|v| v.as_str())
575 .ok_or_else(|| {
576 anyhow::anyhow!("Missing 'script' parameter for execute_script")
577 })?;
578 Ok(crate::browser::BrowserAction::ExecuteScript {
579 script: script.to_string(),
580 })
581 }
582 "get_title" => Ok(crate::browser::BrowserAction::GetTitle {}),
583 "get_url" => Ok(crate::browser::BrowserAction::GetUrl {}),
584 "refresh" => Ok(crate::browser::BrowserAction::Refresh {}),
585 "back" => Ok(crate::browser::BrowserAction::Back {}),
586 "forward" => Ok(crate::browser::BrowserAction::Forward {}),
587 _ => Err(anyhow::anyhow!("Unknown action: {}", action)),
588 }
589 }
590
591 pub async fn init_default_agent(&self) -> Result<String, anyhow::Error> {
593 let admin_agent = Agent {
594 id: "admin".to_string(),
595 name: "Default Admin Agent".to_string(),
596 description: "Default administrator agent with full capabilities".to_string(),
597 api_key: "sk-ditto-admin-2024".to_string(),
598 capabilities: vec![
599 AgentCapability::BrowserControl,
600 AgentCapability::SkillExecution,
601 AgentCapability::Screenshot,
602 AgentCapability::JavaScript,
603 AgentCapability::FileUpload,
604 AgentCapability::NetworkAccess,
605 ],
606 max_sessions: 10,
607 current_sessions: 0,
608 status: AgentStatus::Active,
609 created_at: chrono::Utc::now(),
610 last_activity: chrono::Utc::now(),
611 metadata: HashMap::new(),
612 preferences: AgentPreferences::default(),
613 };
614
615 let agent_id = admin_agent.id.clone();
616 self.register_agent(admin_agent).await?;
617 info!("Initialized default admin agent");
618
619 Ok(agent_id)
620 }
621}
622
623#[derive(Debug, Clone)]
624pub struct AgentUpdate {
625 pub name: Option<String>,
626 pub description: Option<String>,
627 pub capabilities: Option<Vec<AgentCapability>>,
628 pub max_sessions: Option<u32>,
629 pub status: Option<AgentStatus>,
630 pub metadata: Option<HashMap<String, String>>,
631 pub preferences: Option<AgentPreferences>,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AgentStats {
636 pub total_agents: usize,
637 pub active_agents: usize,
638 pub suspended_agents: usize,
639 pub banned_agents: usize,
640 pub total_sessions: usize,
641 pub active_sessions: usize,
642}