use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::browser::BrowserManager;
use crate::skills::SkillEngine;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub id: String,
pub name: String,
pub description: String,
pub api_key: String,
pub capabilities: Vec<AgentCapability>,
pub max_sessions: u32,
pub current_sessions: u32,
pub status: AgentStatus,
pub created_at: chrono::DateTime<chrono::Utc>,
pub last_activity: chrono::DateTime<chrono::Utc>,
pub metadata: HashMap<String, String>,
pub preferences: AgentPreferences,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentCapability {
BrowserControl,
SkillExecution,
Screenshot,
JavaScript,
FileUpload,
NetworkAccess,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AgentStatus {
Active,
Inactive,
Suspended,
Banned,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentPreferences {
pub default_browser: String,
pub headless: bool,
pub viewport_width: u32,
pub viewport_height: u32,
pub timeout_seconds: u32,
pub auto_cleanup: bool,
}
impl Default for AgentPreferences {
fn default() -> Self {
Self {
default_browser: "chrome".to_string(),
headless: true,
viewport_width: 1920,
viewport_height: 1080,
timeout_seconds: 30,
auto_cleanup: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentSession {
pub id: String,
pub agent_id: String,
pub browser_session_id: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub last_activity: chrono::DateTime<chrono::Utc>,
pub status: SessionStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SessionStatus {
Active,
Idle,
Error,
Closed,
}
pub struct AgentManager {
agents: Arc<RwLock<HashMap<String, Agent>>>,
sessions: Arc<RwLock<HashMap<String, AgentSession>>>,
browser_manager: Arc<BrowserManager>,
skill_engine: Arc<SkillEngine>,
}
impl AgentManager {
pub fn new(browser_manager: Arc<BrowserManager>, skill_engine: Arc<SkillEngine>) -> Self {
Self {
agents: Arc::new(RwLock::new(HashMap::new())),
sessions: Arc::new(RwLock::new(HashMap::new())),
browser_manager,
skill_engine,
}
}
pub async fn register_agent(&self, mut agent: Agent) -> Result<String, anyhow::Error> {
if agent.api_key.is_empty() {
agent.api_key = format!("sk-{}", Uuid::new_v4().to_string().replace("-", ""));
}
let mut agents = self.agents.write().await;
if agents.contains_key(&agent.id) {
return Err(anyhow::anyhow!("Agent with ID {} already exists", agent.id));
}
agents.insert(agent.id.clone(), agent.clone());
info!("Registered agent: {} (ID: {})", agent.name, agent.id);
Ok(agent.id)
}
pub async fn authenticate_agent(&self, api_key: &str) -> Result<String, anyhow::Error> {
let agents = self.agents.read().await;
let agent = agents
.values()
.find(|a| a.api_key == api_key && a.status == AgentStatus::Active)
.ok_or_else(|| anyhow::anyhow!("Invalid or inactive API key"))?;
let agent_id = agent.id.clone();
let agent_name = agent.name.clone();
drop(agents);
self.update_agent_activity(&agent_id).await;
info!("Agent authenticated: {}", agent_name);
Ok(agent_id)
}
pub async fn get_agent(&self, agent_id: &str) -> Option<Agent> {
let agents = self.agents.read().await;
agents.get(agent_id).cloned()
}
pub async fn list_agents(&self, status: Option<AgentStatus>) -> Vec<Agent> {
let agents = self.agents.read().await;
agents
.values()
.filter(|agent| status.as_ref().is_none_or(|s| agent.status == *s))
.cloned()
.collect()
}
pub async fn update_agent(
&self,
agent_id: &str,
updates: AgentUpdate,
) -> Result<(), anyhow::Error> {
let mut agents = self.agents.write().await;
if let Some(agent) = agents.get_mut(agent_id) {
if let Some(name) = updates.name {
agent.name = name;
}
if let Some(description) = updates.description {
agent.description = description;
}
if let Some(capabilities) = updates.capabilities {
agent.capabilities = capabilities;
}
if let Some(max_sessions) = updates.max_sessions {
agent.max_sessions = max_sessions;
}
if let Some(status) = updates.status {
agent.status = status;
}
if let Some(metadata) = updates.metadata {
agent.metadata = metadata;
}
if let Some(preferences) = updates.preferences {
agent.preferences = preferences;
}
info!("Updated agent: {}", agent_id);
Ok(())
} else {
Err(anyhow::anyhow!("Agent not found: {}", agent_id))
}
}
pub async fn suspend_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
self.update_agent_status(agent_id, AgentStatus::Suspended)
.await
}
pub async fn activate_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
self.update_agent_status(agent_id, AgentStatus::Active)
.await
}
pub async fn ban_agent(&self, agent_id: &str) -> Result<(), anyhow::Error> {
self.close_all_agent_sessions(agent_id).await?;
self.update_agent_status(agent_id, AgentStatus::Banned)
.await
}
pub async fn create_session(
&self,
agent_id: &str,
browser_type: Option<String>,
) -> Result<String, anyhow::Error> {
let agent = {
let agents = self.agents.read().await;
agents
.get(agent_id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Agent not found: {}", agent_id))?
};
if agent.status != AgentStatus::Active {
return Err(anyhow::anyhow!("Agent {} is not active", agent_id));
}
if agent.current_sessions >= agent.max_sessions {
return Err(anyhow::anyhow!(
"Agent {} has reached maximum session limit",
agent_id
));
}
let browser_type_str =
browser_type.unwrap_or_else(|| agent.preferences.default_browser.clone());
let browser_type = match browser_type_str.as_str() {
"chrome" => crate::browser::BrowserType::Chrome,
"firefox" => crate::browser::BrowserType::Firefox,
"safari" => crate::browser::BrowserType::Safari,
"edge" => crate::browser::BrowserType::Edge,
_ => crate::browser::BrowserType::Chrome,
};
let browser_session_id = self
.browser_manager
.create_session(agent_id.to_string(), browser_type)
.await?;
let session_id = Uuid::new_v4().to_string();
let session = AgentSession {
id: session_id.clone(),
agent_id: agent_id.to_string(),
browser_session_id: browser_session_id.clone(),
created_at: chrono::Utc::now(),
last_activity: chrono::Utc::now(),
status: SessionStatus::Active,
};
{
let mut sessions = self.sessions.write().await;
sessions.insert(session_id.clone(), session);
}
self.increment_agent_sessions(agent_id).await;
info!("Created session {} for agent {}", session_id, agent_id);
Ok(session_id)
}
pub async fn close_session(&self, session_id: &str) -> Result<(), anyhow::Error> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
};
if let Err(e) = self
.browser_manager
.close_session(&session.browser_session_id)
.await
{
warn!(
"Failed to close browser session {}: {}",
session.browser_session_id, e
);
}
{
let mut sessions = self.sessions.write().await;
sessions.remove(session_id);
}
self.decrement_agent_sessions(&session.agent_id).await;
info!(
"Closed session {} for agent {}",
session_id, session.agent_id
);
Ok(())
}
pub async fn get_session(&self, session_id: &str) -> Option<AgentSession> {
let sessions = self.sessions.read().await;
sessions.get(session_id).cloned()
}
pub async fn list_sessions(&self, agent_id: Option<&str>) -> Vec<AgentSession> {
let sessions = self.sessions.read().await;
sessions
.values()
.filter(|session| agent_id.is_none_or(|id| session.agent_id == id))
.cloned()
.collect()
}
pub async fn execute_browser_command(
&self,
session_id: &str,
action: String,
parameters: HashMap<String, serde_json::Value>,
) -> Result<crate::browser::CommandResult, anyhow::Error> {
let session = {
let sessions = self.sessions.read().await;
sessions
.get(session_id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?
};
if session.status != SessionStatus::Active {
return Err(anyhow::anyhow!("Session {} is not active", session_id));
}
let browser_action = self.action_to_browser_action(&action, parameters)?;
let command = crate::browser::BrowserCommand {
action: browser_action,
parameters: HashMap::new(),
timeout_ms: 30000,
};
let result = self
.browser_manager
.execute_command(&session.browser_session_id, command)
.await?;
self.update_session_activity(session_id).await;
Ok(result)
}
pub async fn execute_skill(
&self,
agent_id: &str,
skill_id: &str,
parameters: HashMap<String, serde_json::Value>,
) -> Result<String, anyhow::Error> {
let session_id = self.create_session(agent_id, None).await?;
let execution_id = self
.skill_engine
.execute_skill(
skill_id,
agent_id.to_string(),
session_id.clone(),
parameters,
)
.await?;
let skill_engine = Arc::clone(&self.skill_engine);
let browser_manager = Arc::clone(&self.browser_manager);
let session_id_clone = session_id.clone();
let execution_id_clone = execution_id.clone();
tokio::spawn(async move {
let mut attempts = 0;
while attempts < 300 {
if let Some(execution) = skill_engine.get_execution(&execution_id_clone).await {
match execution.status {
crate::skills::ExecutionStatus::Completed
| crate::skills::ExecutionStatus::Failed
| crate::skills::ExecutionStatus::Cancelled => {
break;
}
_ => {}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
attempts += 1;
}
if let Err(e) = browser_manager.close_session(&session_id_clone).await {
warn!("Failed to cleanup session after skill execution: {}", e);
}
});
info!(
"Started skill execution {} for agent {}",
execution_id, agent_id
);
Ok(execution_id)
}
pub async fn get_agent_stats(&self) -> AgentStats {
let agents = self.agents.read().await;
let sessions = self.sessions.read().await;
let total_agents = agents.len();
let active_agents = agents
.values()
.filter(|a| a.status == AgentStatus::Active)
.count();
let suspended_agents = agents
.values()
.filter(|a| a.status == AgentStatus::Suspended)
.count();
let banned_agents = agents
.values()
.filter(|a| a.status == AgentStatus::Banned)
.count();
let total_sessions = sessions.len();
let active_sessions = sessions
.values()
.filter(|s| s.status == SessionStatus::Active)
.count();
AgentStats {
total_agents,
active_agents,
suspended_agents,
banned_agents,
total_sessions,
active_sessions,
}
}
async fn update_agent_activity(&self, agent_id: &str) {
let mut agents = self.agents.write().await;
if let Some(agent) = agents.get_mut(agent_id) {
agent.last_activity = chrono::Utc::now();
}
}
async fn update_session_activity(&self, session_id: &str) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(session_id) {
session.last_activity = chrono::Utc::now();
}
}
async fn update_agent_status(
&self,
agent_id: &str,
status: AgentStatus,
) -> Result<(), anyhow::Error> {
let mut agents = self.agents.write().await;
if let Some(agent) = agents.get_mut(agent_id) {
let status_clone = status.clone();
agent.status = status;
info!("Updated agent {} status to {:?}", agent_id, status_clone);
Ok(())
} else {
Err(anyhow::anyhow!("Agent {} not found", agent_id))
}
}
async fn increment_agent_sessions(&self, agent_id: &str) {
let mut agents = self.agents.write().await;
if let Some(agent) = agents.get_mut(agent_id) {
agent.current_sessions += 1;
}
}
async fn decrement_agent_sessions(&self, agent_id: &str) {
let mut agents = self.agents.write().await;
if let Some(agent) = agents.get_mut(agent_id) {
if agent.current_sessions > 0 {
agent.current_sessions -= 1;
}
}
}
async fn close_all_agent_sessions(&self, agent_id: &str) -> Result<(), anyhow::Error> {
let session_ids: Vec<String> = {
let sessions = self.sessions.read().await;
sessions
.values()
.filter(|s| s.agent_id == agent_id && s.status == SessionStatus::Active)
.map(|s| s.id.clone())
.collect()
};
for session_id in session_ids {
if let Err(e) = self.close_session(&session_id).await {
error!(
"Failed to close session {} for agent {}: {}",
session_id, agent_id, e
);
}
}
Ok(())
}
fn action_to_browser_action(
&self,
action: &str,
parameters: HashMap<String, serde_json::Value>,
) -> Result<crate::browser::BrowserAction, anyhow::Error> {
match action {
"navigate" => {
let url = parameters
.get("url")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter for navigate"))?;
Ok(crate::browser::BrowserAction::Navigate {
url: url.to_string(),
})
}
"click" => {
let selector = parameters
.get("selector")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for click"))?;
Ok(crate::browser::BrowserAction::Click {
selector: selector.to_string(),
})
}
"type" => {
let selector = parameters
.get("selector")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'selector' parameter for type"))?;
let text = parameters
.get("text")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter for type"))?;
Ok(crate::browser::BrowserAction::Type {
selector: selector.to_string(),
text: text.to_string(),
})
}
"screenshot" => {
let path = parameters
.get("path")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
Ok(crate::browser::BrowserAction::Screenshot { path })
}
"execute_script" => {
let script = parameters
.get("script")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!("Missing 'script' parameter for execute_script")
})?;
Ok(crate::browser::BrowserAction::ExecuteScript {
script: script.to_string(),
})
}
"get_title" => Ok(crate::browser::BrowserAction::GetTitle {}),
"get_url" => Ok(crate::browser::BrowserAction::GetUrl {}),
"refresh" => Ok(crate::browser::BrowserAction::Refresh {}),
"back" => Ok(crate::browser::BrowserAction::Back {}),
"forward" => Ok(crate::browser::BrowserAction::Forward {}),
_ => Err(anyhow::anyhow!("Unknown action: {}", action)),
}
}
pub async fn init_default_agent(&self) -> Result<String, anyhow::Error> {
let admin_agent = Agent {
id: "admin".to_string(),
name: "Default Admin Agent".to_string(),
description: "Default administrator agent with full capabilities".to_string(),
api_key: "sk-ditto-admin-2024".to_string(),
capabilities: vec![
AgentCapability::BrowserControl,
AgentCapability::SkillExecution,
AgentCapability::Screenshot,
AgentCapability::JavaScript,
AgentCapability::FileUpload,
AgentCapability::NetworkAccess,
],
max_sessions: 10,
current_sessions: 0,
status: AgentStatus::Active,
created_at: chrono::Utc::now(),
last_activity: chrono::Utc::now(),
metadata: HashMap::new(),
preferences: AgentPreferences::default(),
};
let agent_id = admin_agent.id.clone();
self.register_agent(admin_agent).await?;
info!("Initialized default admin agent");
Ok(agent_id)
}
}
#[derive(Debug, Clone)]
pub struct AgentUpdate {
pub name: Option<String>,
pub description: Option<String>,
pub capabilities: Option<Vec<AgentCapability>>,
pub max_sessions: Option<u32>,
pub status: Option<AgentStatus>,
pub metadata: Option<HashMap<String, String>>,
pub preferences: Option<AgentPreferences>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStats {
pub total_agents: usize,
pub active_agents: usize,
pub suspended_agents: usize,
pub banned_agents: usize,
pub total_sessions: usize,
pub active_sessions: usize,
}