1use axum::{
2 extract::{Path, State, WebSocketUpgrade},
3 http::StatusCode,
4 response::Json,
5 routing::{get, post},
6 Router,
7};
8use futures_util::{SinkExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tracing::{debug, error, info, warn};
13
14use crate::agent::AgentManager;
15use crate::browser::BrowserManager;
16use crate::security::{AuthManager, RateLimiter};
17use crate::skills::SkillEngine;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ServerConfig {
21 pub host: String,
22 pub port: u16,
23 pub enable_websocket: bool,
24 pub enable_cors: bool,
25 pub max_request_size_mb: usize,
26}
27
28impl Default for ServerConfig {
29 fn default() -> Self {
30 Self {
31 host: "0.0.0.0".to_string(),
32 port: 8080,
33 enable_websocket: true,
34 enable_cors: true,
35 max_request_size_mb: 10,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ApiResponse<T> {
42 pub success: bool,
43 pub data: Option<T>,
44 pub error: Option<String>,
45 pub timestamp: chrono::DateTime<chrono::Utc>,
46}
47
48impl<T> ApiResponse<T> {
49 pub fn success(data: T) -> Self {
50 Self {
51 success: true,
52 data: Some(data),
53 error: None,
54 timestamp: chrono::Utc::now(),
55 }
56 }
57
58 pub fn error(error: String) -> Self {
59 Self {
60 success: false,
61 data: None,
62 error: Some(error),
63 timestamp: chrono::Utc::now(),
64 }
65 }
66}
67
68pub struct DittoServer {
69 config: ServerConfig,
70 agent_manager: Arc<AgentManager>,
71 browser_manager: Arc<BrowserManager>,
72 skill_engine: Arc<SkillEngine>,
73 auth_manager: Arc<AuthManager>,
74 rate_limiter: Arc<RateLimiter>,
75}
76
77impl DittoServer {
78 pub fn new(config: ServerConfig) -> Self {
79 let browser_manager = Arc::new(BrowserManager::new());
80 let skill_engine = Arc::new(SkillEngine::new());
81 let agent_manager = Arc::new(AgentManager::new(
82 Arc::clone(&browser_manager),
83 Arc::clone(&skill_engine),
84 ));
85 let auth_manager = Arc::new(AuthManager::new("ditto-secret-key-2024".to_string()));
86 let rate_limiter = Arc::new(RateLimiter::new(crate::security::RateLimitConfig::default()));
87
88 Self {
89 config,
90 agent_manager,
91 browser_manager,
92 skill_engine,
93 auth_manager,
94 rate_limiter,
95 }
96 }
97
98 pub async fn initialize(&self) -> Result<(), anyhow::Error> {
99 info!("Initializing Ditto server...");
100
101 self.browser_manager.init_browsers().await?;
103
104 self.agent_manager.init_default_agent().await?;
106
107 self.skill_engine.init_default_skills().await?;
109
110 self.auth_manager.init_default_agent().await?;
112
113 info!("Ditto server initialized successfully");
114 Ok(())
115 }
116
117 pub fn router(&self) -> Router {
118 Router::new()
119 .route("/api/v1/agents", post(create_agent))
121 .route("/api/v1/agents", get(list_agents))
122 .route("/api/v1/agents/:agent_id", get(get_agent))
123 .route(
126 "/api/v1/agents/:agent_id/sessions/:session_id",
127 get(get_session),
128 )
129 .route("/api/v1/skills", get(list_skills))
133 .route("/api/v1/skills", post(create_skill))
134 .route("/api/v1/skills/:skill_id", get(get_skill))
135 .route("/api/v1/skills/:skill_id/execute", post(execute_skill))
136 .route("/api/v1/health", get(get_health_check))
140 .route("/ws/agents/:agent_id", get(websocket_handler))
143 .with_state(Arc::new(self.clone()))
144 }
145
146 pub async fn start(&self) -> Result<(), anyhow::Error> {
147 let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
148 .parse()
149 .map_err(|e| anyhow::anyhow!("Invalid address: {}", e))?;
150 info!("Starting Ditto server on {}", addr);
151
152 let app = self.router();
153
154 let listener = tokio::net::TcpListener::bind(addr)
155 .await
156 .map_err(|e| anyhow::anyhow!("Failed to bind to address: {}", e))?;
157
158 axum::serve(listener, app)
159 .await
160 .map_err(|e| anyhow::anyhow!("Server failed to start: {}", e))
161 }
162
163 #[allow(dead_code)]
164 async fn authenticate_request(
165 &self,
166 headers: axum::http::HeaderMap,
167 ) -> Result<String, StatusCode> {
168 let token = headers
169 .get("authorization")
170 .and_then(|h| h.to_str().ok())
171 .and_then(|t| t.strip_prefix("Bearer "));
172
173 let token = match token {
174 Some(t) => t,
175 None => return Err(StatusCode::UNAUTHORIZED),
176 };
177
178 match self.auth_manager.validate_token(token).await {
179 Ok(agent_id) => Ok(agent_id),
180 Err(_) => Err(StatusCode::UNAUTHORIZED),
181 }
182 }
183
184 #[allow(dead_code)]
185 async fn check_rate_limit(
186 &self,
187 agent_id: &str,
188 ip: std::net::IpAddr,
189 ) -> Result<(), StatusCode> {
190 match self.rate_limiter.check_agent_request(agent_id, ip).await {
191 Ok(_) => Ok(()),
192 Err(e) => {
193 warn!("Rate limit exceeded for agent {}: {}", agent_id, e);
194 Err(StatusCode::TOO_MANY_REQUESTS)
195 }
196 }
197 }
198}
199
200impl Clone for DittoServer {
201 fn clone(&self) -> Self {
202 Self {
203 config: self.config.clone(),
204 agent_manager: Arc::clone(&self.agent_manager),
205 browser_manager: Arc::clone(&self.browser_manager),
206 skill_engine: Arc::clone(&self.skill_engine),
207 auth_manager: Arc::clone(&self.auth_manager),
208 rate_limiter: Arc::clone(&self.rate_limiter),
209 }
210 }
211}
212
213async fn create_agent(
215 State(_server): State<Arc<DittoServer>>,
216 Json(agent_data): Json<serde_json::Value>,
217) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
218 let agent_id = agent_data
220 .get("id")
221 .and_then(|v| v.as_str())
222 .unwrap_or("unknown");
223
224 let response = ApiResponse::success(serde_json::json!({
225 "agent_id": agent_id,
226 "status": "created"
227 }));
228
229 Ok(Json(response))
230}
231
232async fn list_agents(
233 State(server): State<Arc<DittoServer>>,
234) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
235 let agents = server.agent_manager.list_agents(None).await;
236
237 let agent_data: Vec<serde_json::Value> = agents
238 .into_iter()
239 .map(|agent| serde_json::to_value(agent).unwrap_or_default())
240 .collect();
241
242 Ok(Json(ApiResponse::success(agent_data)))
243}
244
245async fn get_agent(
246 State(server): State<Arc<DittoServer>>,
247 Path(agent_id): Path<String>,
248) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
249 match server.agent_manager.get_agent(&agent_id).await {
250 Some(agent) => {
251 let agent_data = serde_json::to_value(agent).unwrap_or_default();
252 Ok(Json(ApiResponse::success(agent_data)))
253 }
254 None => Ok(Json(ApiResponse::error("Agent not found".to_string()))),
255 }
256}
257
258#[allow(dead_code)]
259async fn update_agent(
260 State(_server): State<Arc<DittoServer>>,
261 Path(_agent_id): Path<String>,
262 Json(_updates): Json<serde_json::Value>,
263) -> Result<Json<ApiResponse<()>>, StatusCode> {
264 Ok(Json(ApiResponse::success(())))
266}
267
268#[allow(dead_code)]
269async fn create_agent_session(
270 State(_server): State<Arc<DittoServer>>,
271 Path(_agent_id): Path<String>,
272 Json(session_data): Json<serde_json::Value>,
273) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
274 let browser_type = session_data
275 .get("browser_type")
276 .and_then(|v| v.as_str())
277 .map(|s| s.to_string());
278
279 let session_id = format!("session_{}", uuid::Uuid::new_v4());
281 let response = serde_json::json!({
282 "session_id": session_id,
283 "browser_type": browser_type.unwrap_or("chrome".to_string()),
284 "status": "created"
285 });
286
287 Ok(Json(ApiResponse::success(response)))
288}
289
290#[allow(dead_code)]
291async fn list_agent_sessions(
292 State(server): State<Arc<DittoServer>>,
293 Path(agent_id): Path<String>,
294) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
295 let sessions = server.agent_manager.list_sessions(Some(&agent_id)).await;
296
297 let session_data: Vec<serde_json::Value> = sessions
298 .into_iter()
299 .map(|session| serde_json::to_value(session).unwrap_or_default())
300 .collect();
301
302 Ok(Json(ApiResponse::success(session_data)))
303}
304
305async fn get_session(
306 State(server): State<Arc<DittoServer>>,
307 Path(session_id): Path<String>,
308) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
309 match server.agent_manager.get_session(&session_id).await {
310 Some(session) => {
311 let session_data = serde_json::to_value(session).unwrap_or_default();
312 Ok(Json(ApiResponse::success(session_data)))
313 }
314 None => Ok(Json(ApiResponse::error("Session not found".to_string()))),
315 }
316}
317
318#[allow(dead_code)]
319async fn close_session(
320 State(server): State<Arc<DittoServer>>,
321 Path(session_id): Path<String>,
322) -> Result<Json<ApiResponse<()>>, StatusCode> {
323 match server.agent_manager.close_session(&session_id).await {
324 Ok(_) => Ok(Json(ApiResponse::success(()))),
325 Err(e) => Ok(Json(ApiResponse::error(format!(
326 "Failed to close session: {}",
327 e
328 )))),
329 }
330}
331
332#[allow(dead_code)]
333async fn execute_command(
334 State(server): State<Arc<DittoServer>>,
335 Path(session_id): Path<String>,
336 Json(command): Json<serde_json::Value>,
337) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
338 let action = command
339 .get("action")
340 .and_then(|v| v.as_str())
341 .unwrap_or("unknown");
342
343 let parameters = command
344 .get("parameters")
345 .and_then(|v| v.as_object())
346 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
347 .unwrap_or_default();
348
349 match server
350 .agent_manager
351 .execute_browser_command(&session_id, action.to_string(), parameters)
352 .await
353 {
354 Ok(result) => {
355 let result_data = serde_json::to_value(result).unwrap_or_default();
356 Ok(Json(ApiResponse::success(result_data)))
357 }
358 Err(e) => Ok(Json(ApiResponse::error(format!(
359 "Command execution failed: {}",
360 e
361 )))),
362 }
363}
364
365async fn create_skill(
366 State(_server): State<Arc<DittoServer>>,
367 Json(skill_data): Json<serde_json::Value>,
368) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
369 let skill_id = skill_data
371 .get("id")
372 .and_then(|v| v.as_str())
373 .unwrap_or("unknown");
374
375 let response = serde_json::json!({
376 "skill_id": skill_id,
377 "status": "created"
378 });
379
380 Ok(Json(ApiResponse::success(response)))
381}
382
383async fn list_skills(
384 State(server): State<Arc<DittoServer>>,
385) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
386 let skills = server.skill_engine.list_skills(None, None).await;
387
388 let skill_data: Vec<serde_json::Value> = skills
389 .into_iter()
390 .map(|skill| serde_json::to_value(skill).unwrap_or_default())
391 .collect();
392
393 Ok(Json(ApiResponse::success(skill_data)))
394}
395
396async fn get_skill(
397 State(server): State<Arc<DittoServer>>,
398 Path(skill_id): Path<String>,
399) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
400 match server.skill_engine.get_skill(&skill_id).await {
401 Some(skill) => {
402 let skill_data = serde_json::to_value(skill).unwrap_or_default();
403 Ok(Json(ApiResponse::success(skill_data)))
404 }
405 None => Ok(Json(ApiResponse::error("Skill not found".to_string()))),
406 }
407}
408
409#[allow(dead_code)]
410async fn search_skills(
411 State(server): State<Arc<DittoServer>>,
412 Path(query): Path<String>,
413) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
414 let skills = server.skill_engine.search_skills(&query).await;
415
416 let skill_data: Vec<serde_json::Value> = skills
417 .into_iter()
418 .map(|skill| serde_json::to_value(skill).unwrap_or_default())
419 .collect();
420
421 Ok(Json(ApiResponse::success(skill_data)))
422}
423
424async fn execute_skill(
425 State(server): State<Arc<DittoServer>>,
426 Path(skill_id): Path<String>,
427 Json(execution_data): Json<serde_json::Value>,
428) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
429 let agent_id = execution_data
430 .get("agent_id")
431 .and_then(|v| v.as_str())
432 .unwrap_or("admin");
433
434 let parameters = execution_data
435 .get("parameters")
436 .and_then(|v| v.as_object())
437 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
438 .unwrap_or_default();
439
440 match server
441 .agent_manager
442 .execute_skill(agent_id, &skill_id, parameters)
443 .await
444 {
445 Ok(execution_id) => {
446 let response = serde_json::json!({
447 "execution_id": execution_id,
448 "skill_id": skill_id,
449 "status": "started"
450 });
451 Ok(Json(ApiResponse::success(response)))
452 }
453 Err(e) => Ok(Json(ApiResponse::error(format!(
454 "Skill execution failed: {}",
455 e
456 )))),
457 }
458}
459
460#[allow(dead_code)]
461async fn get_system_stats(
462 State(server): State<Arc<DittoServer>>,
463) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
464 let agent_stats = server.agent_manager.get_agent_stats().await;
465 let session_stats = server
466 .browser_manager
467 .get_session_stats()
468 .await
469 .map_err(|e| {
470 error!("Failed to get session stats: {}", e);
471 StatusCode::INTERNAL_SERVER_ERROR
472 })?;
473 let skill_stats = server.skill_engine.get_skill_stats().await;
474 let rate_limit_stats = server.rate_limiter.get_rate_limit_stats().await;
475 let stats = serde_json::json!({
476 "agents": agent_stats,
477 "sessions": session_stats,
478 "skills": skill_stats,
479 "rate_limits": rate_limit_stats,
480 "server": {
481 "version": "0.1.0",
482 "uptime": "0s", "memory_usage": "0MB" }
485 });
486
487 Ok(Json(ApiResponse::success(stats)))
488}
489
490async fn get_health_check(
491 State(_server): State<Arc<DittoServer>>,
492) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
493 let health = serde_json::json!({
494 "status": "healthy",
495 "timestamp": chrono::Utc::now().to_rfc3339(),
496 "components": {
497 "browser_manager": "healthy",
498 "agent_manager": "healthy",
499 "skill_engine": "healthy",
500 "auth_manager": "healthy",
501 "rate_limiter": "healthy"
502 }
503 });
504
505 Ok(Json(ApiResponse::success(health)))
506}
507
508async fn websocket_handler(
509 State(server): State<Arc<DittoServer>>,
510 Path(agent_id): Path<String>,
511 ws: WebSocketUpgrade,
512) -> axum::response::Response {
513 ws.on_upgrade(move |socket| handle_websocket(socket, server, agent_id))
514}
515
516async fn handle_websocket(
517 socket: axum::extract::ws::WebSocket,
518 server: Arc<DittoServer>,
519 agent_id: String,
520) {
521 info!("Agent {} connected via WebSocket", agent_id);
522
523 let (mut sender, mut receiver) = socket.split();
524
525 while let Some(msg) = receiver.next().await {
526 match msg {
527 Ok(axum::extract::ws::Message::Text(text)) => {
528 debug!(
529 "Received WebSocket message from agent {}: {}",
530 agent_id, text
531 );
532
533 if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
535 let response = handle_websocket_message(&server, &agent_id, message).await;
536
537 if let Ok(response_text) = serde_json::to_string(&response) {
538 if let Err(e) = sender
539 .send(axum::extract::ws::Message::Text(response_text))
540 .await
541 {
542 error!("Failed to send WebSocket response: {}", e);
543 break;
544 }
545 }
546 } else {
547 warn!("Invalid JSON received from agent {}", agent_id);
548 }
549 }
550 Ok(axum::extract::ws::Message::Close(_)) => {
551 info!("Agent {} disconnected", agent_id);
552 break;
553 }
554 Err(e) => {
555 error!("WebSocket error for agent {}: {}", agent_id, e);
556 break;
557 }
558 _ => {}
559 }
560 }
561}
562
563async fn handle_websocket_message(
564 server: &DittoServer,
565 agent_id: &str,
566 message: serde_json::Value,
567) -> ApiResponse<serde_json::Value> {
568 let message_type = message
569 .get("type")
570 .and_then(|v| v.as_str())
571 .unwrap_or("unknown");
572
573 match message_type {
574 "ping" => ApiResponse::success(serde_json::json!({"type": "pong"})),
575 "create_session" => {
576 if let Ok(session_id) = server.agent_manager.create_session(agent_id, None).await {
577 ApiResponse::success(serde_json::json!({
578 "type": "session_created",
579 "session_id": session_id
580 }))
581 } else {
582 ApiResponse::error("Failed to create session".to_string())
583 }
584 }
585 "execute_command" => {
586 if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) {
587 if let Some(action) = message.get("action").and_then(|v| v.as_str()) {
588 let parameters = message
589 .get("parameters")
590 .and_then(|v| v.as_object())
591 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
592 .unwrap_or_default();
593
594 match server
595 .agent_manager
596 .execute_browser_command(session_id, action.to_string(), parameters)
597 .await
598 {
599 Ok(result) => {
600 let result_data = serde_json::to_value(result).unwrap_or_default();
601 ApiResponse::success(serde_json::json!({
602 "type": "command_result",
603 "result": result_data
604 }))
605 }
606 Err(e) => ApiResponse::error(format!("Command execution failed: {}", e)),
607 }
608 } else {
609 ApiResponse::error("Missing action parameter".to_string())
610 }
611 } else {
612 ApiResponse::error("Missing session_id parameter".to_string())
613 }
614 }
615 _ => ApiResponse::error(format!("Unknown message type: {}", message_type)),
616 }
617}