use axum::{
extract::{Path, State, WebSocketUpgrade},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::agent::AgentManager;
use crate::browser::BrowserManager;
use crate::security::{AuthManager, RateLimiter};
use crate::skills::SkillEngine;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
pub enable_websocket: bool,
pub enable_cors: bool,
pub max_request_size_mb: usize,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".to_string(),
port: 8080,
enable_websocket: true,
enable_cors: true,
max_request_size_mb: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub success: bool,
pub data: Option<T>,
pub error: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl<T> ApiResponse<T> {
pub fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
timestamp: chrono::Utc::now(),
}
}
pub fn error(error: String) -> Self {
Self {
success: false,
data: None,
error: Some(error),
timestamp: chrono::Utc::now(),
}
}
}
pub struct DittoServer {
config: ServerConfig,
agent_manager: Arc<AgentManager>,
browser_manager: Arc<BrowserManager>,
skill_engine: Arc<SkillEngine>,
auth_manager: Arc<AuthManager>,
rate_limiter: Arc<RateLimiter>,
}
impl DittoServer {
pub fn new(config: ServerConfig) -> Self {
let browser_manager = Arc::new(BrowserManager::new());
let skill_engine = Arc::new(SkillEngine::new());
let agent_manager = Arc::new(AgentManager::new(
Arc::clone(&browser_manager),
Arc::clone(&skill_engine),
));
let auth_manager = Arc::new(AuthManager::new("ditto-secret-key-2024".to_string()));
let rate_limiter = Arc::new(RateLimiter::new(crate::security::RateLimitConfig::default()));
Self {
config,
agent_manager,
browser_manager,
skill_engine,
auth_manager,
rate_limiter,
}
}
pub async fn initialize(&self) -> Result<(), anyhow::Error> {
info!("Initializing Ditto server...");
self.browser_manager.init_browsers().await?;
self.agent_manager.init_default_agent().await?;
self.skill_engine.init_default_skills().await?;
self.auth_manager.init_default_agent().await?;
info!("Ditto server initialized successfully");
Ok(())
}
pub fn router(&self) -> Router {
Router::new()
.route("/api/v1/agents", post(create_agent))
.route("/api/v1/agents", get(list_agents))
.route("/api/v1/agents/:agent_id", get(get_agent))
.route(
"/api/v1/agents/:agent_id/sessions/:session_id",
get(get_session),
)
.route("/api/v1/skills", get(list_skills))
.route("/api/v1/skills", post(create_skill))
.route("/api/v1/skills/:skill_id", get(get_skill))
.route("/api/v1/skills/:skill_id/execute", post(execute_skill))
.route("/api/v1/health", get(get_health_check))
.route("/ws/agents/:agent_id", get(websocket_handler))
.with_state(Arc::new(self.clone()))
}
pub async fn start(&self) -> Result<(), anyhow::Error> {
let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid address: {}", e))?;
info!("Starting Ditto server on {}", addr);
let app = self.router();
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| anyhow::anyhow!("Failed to bind to address: {}", e))?;
axum::serve(listener, app)
.await
.map_err(|e| anyhow::anyhow!("Server failed to start: {}", e))
}
#[allow(dead_code)]
async fn authenticate_request(
&self,
headers: axum::http::HeaderMap,
) -> Result<String, StatusCode> {
let token = headers
.get("authorization")
.and_then(|h| h.to_str().ok())
.and_then(|t| t.strip_prefix("Bearer "));
let token = match token {
Some(t) => t,
None => return Err(StatusCode::UNAUTHORIZED),
};
match self.auth_manager.validate_token(token).await {
Ok(agent_id) => Ok(agent_id),
Err(_) => Err(StatusCode::UNAUTHORIZED),
}
}
#[allow(dead_code)]
async fn check_rate_limit(
&self,
agent_id: &str,
ip: std::net::IpAddr,
) -> Result<(), StatusCode> {
match self.rate_limiter.check_agent_request(agent_id, ip).await {
Ok(_) => Ok(()),
Err(e) => {
warn!("Rate limit exceeded for agent {}: {}", agent_id, e);
Err(StatusCode::TOO_MANY_REQUESTS)
}
}
}
}
impl Clone for DittoServer {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
agent_manager: Arc::clone(&self.agent_manager),
browser_manager: Arc::clone(&self.browser_manager),
skill_engine: Arc::clone(&self.skill_engine),
auth_manager: Arc::clone(&self.auth_manager),
rate_limiter: Arc::clone(&self.rate_limiter),
}
}
}
async fn create_agent(
State(_server): State<Arc<DittoServer>>,
Json(agent_data): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let agent_id = agent_data
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let response = ApiResponse::success(serde_json::json!({
"agent_id": agent_id,
"status": "created"
}));
Ok(Json(response))
}
async fn list_agents(
State(server): State<Arc<DittoServer>>,
) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
let agents = server.agent_manager.list_agents(None).await;
let agent_data: Vec<serde_json::Value> = agents
.into_iter()
.map(|agent| serde_json::to_value(agent).unwrap_or_default())
.collect();
Ok(Json(ApiResponse::success(agent_data)))
}
async fn get_agent(
State(server): State<Arc<DittoServer>>,
Path(agent_id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
match server.agent_manager.get_agent(&agent_id).await {
Some(agent) => {
let agent_data = serde_json::to_value(agent).unwrap_or_default();
Ok(Json(ApiResponse::success(agent_data)))
}
None => Ok(Json(ApiResponse::error("Agent not found".to_string()))),
}
}
#[allow(dead_code)]
async fn update_agent(
State(_server): State<Arc<DittoServer>>,
Path(_agent_id): Path<String>,
Json(_updates): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<()>>, StatusCode> {
Ok(Json(ApiResponse::success(())))
}
#[allow(dead_code)]
async fn create_agent_session(
State(_server): State<Arc<DittoServer>>,
Path(_agent_id): Path<String>,
Json(session_data): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let browser_type = session_data
.get("browser_type")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let session_id = format!("session_{}", uuid::Uuid::new_v4());
let response = serde_json::json!({
"session_id": session_id,
"browser_type": browser_type.unwrap_or("chrome".to_string()),
"status": "created"
});
Ok(Json(ApiResponse::success(response)))
}
#[allow(dead_code)]
async fn list_agent_sessions(
State(server): State<Arc<DittoServer>>,
Path(agent_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
let sessions = server.agent_manager.list_sessions(Some(&agent_id)).await;
let session_data: Vec<serde_json::Value> = sessions
.into_iter()
.map(|session| serde_json::to_value(session).unwrap_or_default())
.collect();
Ok(Json(ApiResponse::success(session_data)))
}
async fn get_session(
State(server): State<Arc<DittoServer>>,
Path(session_id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
match server.agent_manager.get_session(&session_id).await {
Some(session) => {
let session_data = serde_json::to_value(session).unwrap_or_default();
Ok(Json(ApiResponse::success(session_data)))
}
None => Ok(Json(ApiResponse::error("Session not found".to_string()))),
}
}
#[allow(dead_code)]
async fn close_session(
State(server): State<Arc<DittoServer>>,
Path(session_id): Path<String>,
) -> Result<Json<ApiResponse<()>>, StatusCode> {
match server.agent_manager.close_session(&session_id).await {
Ok(_) => Ok(Json(ApiResponse::success(()))),
Err(e) => Ok(Json(ApiResponse::error(format!(
"Failed to close session: {}",
e
)))),
}
}
#[allow(dead_code)]
async fn execute_command(
State(server): State<Arc<DittoServer>>,
Path(session_id): Path<String>,
Json(command): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let action = command
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let parameters = command
.get("parameters")
.and_then(|v| v.as_object())
.map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
match server
.agent_manager
.execute_browser_command(&session_id, action.to_string(), parameters)
.await
{
Ok(result) => {
let result_data = serde_json::to_value(result).unwrap_or_default();
Ok(Json(ApiResponse::success(result_data)))
}
Err(e) => Ok(Json(ApiResponse::error(format!(
"Command execution failed: {}",
e
)))),
}
}
async fn create_skill(
State(_server): State<Arc<DittoServer>>,
Json(skill_data): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let skill_id = skill_data
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let response = serde_json::json!({
"skill_id": skill_id,
"status": "created"
});
Ok(Json(ApiResponse::success(response)))
}
async fn list_skills(
State(server): State<Arc<DittoServer>>,
) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
let skills = server.skill_engine.list_skills(None, None).await;
let skill_data: Vec<serde_json::Value> = skills
.into_iter()
.map(|skill| serde_json::to_value(skill).unwrap_or_default())
.collect();
Ok(Json(ApiResponse::success(skill_data)))
}
async fn get_skill(
State(server): State<Arc<DittoServer>>,
Path(skill_id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
match server.skill_engine.get_skill(&skill_id).await {
Some(skill) => {
let skill_data = serde_json::to_value(skill).unwrap_or_default();
Ok(Json(ApiResponse::success(skill_data)))
}
None => Ok(Json(ApiResponse::error("Skill not found".to_string()))),
}
}
#[allow(dead_code)]
async fn search_skills(
State(server): State<Arc<DittoServer>>,
Path(query): Path<String>,
) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
let skills = server.skill_engine.search_skills(&query).await;
let skill_data: Vec<serde_json::Value> = skills
.into_iter()
.map(|skill| serde_json::to_value(skill).unwrap_or_default())
.collect();
Ok(Json(ApiResponse::success(skill_data)))
}
async fn execute_skill(
State(server): State<Arc<DittoServer>>,
Path(skill_id): Path<String>,
Json(execution_data): Json<serde_json::Value>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let agent_id = execution_data
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("admin");
let parameters = execution_data
.get("parameters")
.and_then(|v| v.as_object())
.map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
match server
.agent_manager
.execute_skill(agent_id, &skill_id, parameters)
.await
{
Ok(execution_id) => {
let response = serde_json::json!({
"execution_id": execution_id,
"skill_id": skill_id,
"status": "started"
});
Ok(Json(ApiResponse::success(response)))
}
Err(e) => Ok(Json(ApiResponse::error(format!(
"Skill execution failed: {}",
e
)))),
}
}
#[allow(dead_code)]
async fn get_system_stats(
State(server): State<Arc<DittoServer>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let agent_stats = server.agent_manager.get_agent_stats().await;
let session_stats = server
.browser_manager
.get_session_stats()
.await
.map_err(|e| {
error!("Failed to get session stats: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let skill_stats = server.skill_engine.get_skill_stats().await;
let rate_limit_stats = server.rate_limiter.get_rate_limit_stats().await;
let stats = serde_json::json!({
"agents": agent_stats,
"sessions": session_stats,
"skills": skill_stats,
"rate_limits": rate_limit_stats,
"server": {
"version": "0.1.0",
"uptime": "0s", "memory_usage": "0MB" }
});
Ok(Json(ApiResponse::success(stats)))
}
async fn get_health_check(
State(_server): State<Arc<DittoServer>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let health = serde_json::json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
"components": {
"browser_manager": "healthy",
"agent_manager": "healthy",
"skill_engine": "healthy",
"auth_manager": "healthy",
"rate_limiter": "healthy"
}
});
Ok(Json(ApiResponse::success(health)))
}
async fn websocket_handler(
State(server): State<Arc<DittoServer>>,
Path(agent_id): Path<String>,
ws: WebSocketUpgrade,
) -> axum::response::Response {
ws.on_upgrade(move |socket| handle_websocket(socket, server, agent_id))
}
async fn handle_websocket(
socket: axum::extract::ws::WebSocket,
server: Arc<DittoServer>,
agent_id: String,
) {
info!("Agent {} connected via WebSocket", agent_id);
let (mut sender, mut receiver) = socket.split();
while let Some(msg) = receiver.next().await {
match msg {
Ok(axum::extract::ws::Message::Text(text)) => {
debug!(
"Received WebSocket message from agent {}: {}",
agent_id, text
);
if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
let response = handle_websocket_message(&server, &agent_id, message).await;
if let Ok(response_text) = serde_json::to_string(&response) {
if let Err(e) = sender
.send(axum::extract::ws::Message::Text(response_text))
.await
{
error!("Failed to send WebSocket response: {}", e);
break;
}
}
} else {
warn!("Invalid JSON received from agent {}", agent_id);
}
}
Ok(axum::extract::ws::Message::Close(_)) => {
info!("Agent {} disconnected", agent_id);
break;
}
Err(e) => {
error!("WebSocket error for agent {}: {}", agent_id, e);
break;
}
_ => {}
}
}
}
async fn handle_websocket_message(
server: &DittoServer,
agent_id: &str,
message: serde_json::Value,
) -> ApiResponse<serde_json::Value> {
let message_type = message
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match message_type {
"ping" => ApiResponse::success(serde_json::json!({"type": "pong"})),
"create_session" => {
if let Ok(session_id) = server.agent_manager.create_session(agent_id, None).await {
ApiResponse::success(serde_json::json!({
"type": "session_created",
"session_id": session_id
}))
} else {
ApiResponse::error("Failed to create session".to_string())
}
}
"execute_command" => {
if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) {
if let Some(action) = message.get("action").and_then(|v| v.as_str()) {
let parameters = message
.get("parameters")
.and_then(|v| v.as_object())
.map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
match server
.agent_manager
.execute_browser_command(session_id, action.to_string(), parameters)
.await
{
Ok(result) => {
let result_data = serde_json::to_value(result).unwrap_or_default();
ApiResponse::success(serde_json::json!({
"type": "command_result",
"result": result_data
}))
}
Err(e) => ApiResponse::error(format!("Command execution failed: {}", e)),
}
} else {
ApiResponse::error("Missing action parameter".to_string())
}
} else {
ApiResponse::error("Missing session_id parameter".to_string())
}
}
_ => ApiResponse::error(format!("Unknown message type: {}", message_type)),
}
}