Skip to main content

ditto_os/server/
mod.rs

1use axum::{
2    extract::{Path, State, WebSocketUpgrade},
3    http::StatusCode,
4    response::Json,
5    routing::{get, post},
6    Router,
7};
8use futures_util::{SinkExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use std::net::SocketAddr;
11use std::sync::Arc;
12use tracing::{debug, error, info, warn};
13
14use crate::agent::AgentManager;
15use crate::browser::BrowserManager;
16use crate::security::{AuthManager, RateLimiter};
17use crate::skills::SkillEngine;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ServerConfig {
21    pub host: String,
22    pub port: u16,
23    pub enable_websocket: bool,
24    pub enable_cors: bool,
25    pub max_request_size_mb: usize,
26}
27
28impl Default for ServerConfig {
29    fn default() -> Self {
30        Self {
31            host: "0.0.0.0".to_string(),
32            port: 8080,
33            enable_websocket: true,
34            enable_cors: true,
35            max_request_size_mb: 10,
36        }
37    }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ApiResponse<T> {
42    pub success: bool,
43    pub data: Option<T>,
44    pub error: Option<String>,
45    pub timestamp: chrono::DateTime<chrono::Utc>,
46}
47
48impl<T> ApiResponse<T> {
49    pub fn success(data: T) -> Self {
50        Self {
51            success: true,
52            data: Some(data),
53            error: None,
54            timestamp: chrono::Utc::now(),
55        }
56    }
57
58    pub fn error(error: String) -> Self {
59        Self {
60            success: false,
61            data: None,
62            error: Some(error),
63            timestamp: chrono::Utc::now(),
64        }
65    }
66}
67
68pub struct DittoServer {
69    config: ServerConfig,
70    agent_manager: Arc<AgentManager>,
71    browser_manager: Arc<BrowserManager>,
72    skill_engine: Arc<SkillEngine>,
73    auth_manager: Arc<AuthManager>,
74    rate_limiter: Arc<RateLimiter>,
75}
76
77impl DittoServer {
78    pub fn new(config: ServerConfig) -> Self {
79        let browser_manager = Arc::new(BrowserManager::new());
80        let skill_engine = Arc::new(SkillEngine::new());
81        let agent_manager = Arc::new(AgentManager::new(
82            Arc::clone(&browser_manager),
83            Arc::clone(&skill_engine),
84        ));
85        let auth_manager = Arc::new(AuthManager::new("ditto-secret-key-2024".to_string()));
86        let rate_limiter = Arc::new(RateLimiter::new(crate::security::RateLimitConfig::default()));
87
88        Self {
89            config,
90            agent_manager,
91            browser_manager,
92            skill_engine,
93            auth_manager,
94            rate_limiter,
95        }
96    }
97
98    pub async fn initialize(&self) -> Result<(), anyhow::Error> {
99        info!("Initializing Ditto server...");
100
101        // Initialize browser manager
102        self.browser_manager.init_browsers().await?;
103
104        // Initialize default agent
105        self.agent_manager.init_default_agent().await?;
106
107        // Initialize default skills
108        self.skill_engine.init_default_skills().await?;
109
110        // Initialize auth manager (note: this creates a default admin agent, not the same as agent manager)
111        self.auth_manager.init_default_agent().await?;
112
113        info!("Ditto server initialized successfully");
114        Ok(())
115    }
116
117    pub fn router(&self) -> Router {
118        Router::new()
119            // Agent endpoints
120            .route("/api/v1/agents", post(create_agent))
121            .route("/api/v1/agents", get(list_agents))
122            .route("/api/v1/agents/:agent_id", get(get_agent))
123            // .route("/api/v1/agents/:agent_id/sessions", post(create_session))
124            // .route("/api/v1/agents/:agent_id/sessions", get(list_sessions))
125            .route(
126                "/api/v1/agents/:agent_id/sessions/:session_id",
127                get(get_session),
128            )
129            // .route("/api/v1/agents/:agent_id/sessions/:session_id", delete(delete_session))
130            // .route("/api/v1/agents/:agent_id/sessions/:session_id/execute", post(execute_action))
131            // Skill endpoints
132            .route("/api/v1/skills", get(list_skills))
133            .route("/api/v1/skills", post(create_skill))
134            .route("/api/v1/skills/:skill_id", get(get_skill))
135            .route("/api/v1/skills/:skill_id/execute", post(execute_skill))
136            // .route("/api/v1/executions", get(list_executions))
137            // .route("/api/v1/executions/:execution_id", get(get_execution))
138            // System endpoints
139            .route("/api/v1/health", get(get_health_check))
140            // .route("/api/v1/stats", get(get_stats))
141            // WebSocket endpoint
142            .route("/ws/agents/:agent_id", get(websocket_handler))
143            .with_state(Arc::new(self.clone()))
144    }
145
146    pub async fn start(&self) -> Result<(), anyhow::Error> {
147        let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
148            .parse()
149            .map_err(|e| anyhow::anyhow!("Invalid address: {}", e))?;
150        info!("Starting Ditto server on {}", addr);
151
152        let app = self.router();
153
154        let listener = tokio::net::TcpListener::bind(addr)
155            .await
156            .map_err(|e| anyhow::anyhow!("Failed to bind to address: {}", e))?;
157
158        axum::serve(listener, app)
159            .await
160            .map_err(|e| anyhow::anyhow!("Server failed to start: {}", e))
161    }
162
163    #[allow(dead_code)]
164    async fn authenticate_request(
165        &self,
166        headers: axum::http::HeaderMap,
167    ) -> Result<String, StatusCode> {
168        let token = headers
169            .get("authorization")
170            .and_then(|h| h.to_str().ok())
171            .and_then(|t| t.strip_prefix("Bearer "));
172
173        let token = match token {
174            Some(t) => t,
175            None => return Err(StatusCode::UNAUTHORIZED),
176        };
177
178        match self.auth_manager.validate_token(token).await {
179            Ok(agent_id) => Ok(agent_id),
180            Err(_) => Err(StatusCode::UNAUTHORIZED),
181        }
182    }
183
184    #[allow(dead_code)]
185    async fn check_rate_limit(
186        &self,
187        agent_id: &str,
188        ip: std::net::IpAddr,
189    ) -> Result<(), StatusCode> {
190        match self.rate_limiter.check_agent_request(agent_id, ip).await {
191            Ok(_) => Ok(()),
192            Err(e) => {
193                warn!("Rate limit exceeded for agent {}: {}", agent_id, e);
194                Err(StatusCode::TOO_MANY_REQUESTS)
195            }
196        }
197    }
198}
199
200impl Clone for DittoServer {
201    fn clone(&self) -> Self {
202        Self {
203            config: self.config.clone(),
204            agent_manager: Arc::clone(&self.agent_manager),
205            browser_manager: Arc::clone(&self.browser_manager),
206            skill_engine: Arc::clone(&self.skill_engine),
207            auth_manager: Arc::clone(&self.auth_manager),
208            rate_limiter: Arc::clone(&self.rate_limiter),
209        }
210    }
211}
212
213// HTTP Handlers
214async fn create_agent(
215    State(_server): State<Arc<DittoServer>>,
216    Json(agent_data): Json<serde_json::Value>,
217) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
218    // Parse agent data (simplified)
219    let agent_id = agent_data
220        .get("id")
221        .and_then(|v| v.as_str())
222        .unwrap_or("unknown");
223
224    let response = ApiResponse::success(serde_json::json!({
225        "agent_id": agent_id,
226        "status": "created"
227    }));
228
229    Ok(Json(response))
230}
231
232async fn list_agents(
233    State(server): State<Arc<DittoServer>>,
234) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
235    let agents = server.agent_manager.list_agents(None).await;
236
237    let agent_data: Vec<serde_json::Value> = agents
238        .into_iter()
239        .map(|agent| serde_json::to_value(agent).unwrap_or_default())
240        .collect();
241
242    Ok(Json(ApiResponse::success(agent_data)))
243}
244
245async fn get_agent(
246    State(server): State<Arc<DittoServer>>,
247    Path(agent_id): Path<String>,
248) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
249    match server.agent_manager.get_agent(&agent_id).await {
250        Some(agent) => {
251            let agent_data = serde_json::to_value(agent).unwrap_or_default();
252            Ok(Json(ApiResponse::success(agent_data)))
253        }
254        None => Ok(Json(ApiResponse::error("Agent not found".to_string()))),
255    }
256}
257
258#[allow(dead_code)]
259async fn update_agent(
260    State(_server): State<Arc<DittoServer>>,
261    Path(_agent_id): Path<String>,
262    Json(_updates): Json<serde_json::Value>,
263) -> Result<Json<ApiResponse<()>>, StatusCode> {
264    // Simplified - just return success
265    Ok(Json(ApiResponse::success(())))
266}
267
268#[allow(dead_code)]
269async fn create_agent_session(
270    State(_server): State<Arc<DittoServer>>,
271    Path(_agent_id): Path<String>,
272    Json(session_data): Json<serde_json::Value>,
273) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
274    let browser_type = session_data
275        .get("browser_type")
276        .and_then(|v| v.as_str())
277        .map(|s| s.to_string());
278
279    // Simplified - just return a mock session ID
280    let session_id = format!("session_{}", uuid::Uuid::new_v4());
281    let response = serde_json::json!({
282        "session_id": session_id,
283        "browser_type": browser_type.unwrap_or("chrome".to_string()),
284        "status": "created"
285    });
286
287    Ok(Json(ApiResponse::success(response)))
288}
289
290#[allow(dead_code)]
291async fn list_agent_sessions(
292    State(server): State<Arc<DittoServer>>,
293    Path(agent_id): Path<String>,
294) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
295    let sessions = server.agent_manager.list_sessions(Some(&agent_id)).await;
296
297    let session_data: Vec<serde_json::Value> = sessions
298        .into_iter()
299        .map(|session| serde_json::to_value(session).unwrap_or_default())
300        .collect();
301
302    Ok(Json(ApiResponse::success(session_data)))
303}
304
305async fn get_session(
306    State(server): State<Arc<DittoServer>>,
307    Path(session_id): Path<String>,
308) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
309    match server.agent_manager.get_session(&session_id).await {
310        Some(session) => {
311            let session_data = serde_json::to_value(session).unwrap_or_default();
312            Ok(Json(ApiResponse::success(session_data)))
313        }
314        None => Ok(Json(ApiResponse::error("Session not found".to_string()))),
315    }
316}
317
318#[allow(dead_code)]
319async fn close_session(
320    State(server): State<Arc<DittoServer>>,
321    Path(session_id): Path<String>,
322) -> Result<Json<ApiResponse<()>>, StatusCode> {
323    match server.agent_manager.close_session(&session_id).await {
324        Ok(_) => Ok(Json(ApiResponse::success(()))),
325        Err(e) => Ok(Json(ApiResponse::error(format!(
326            "Failed to close session: {}",
327            e
328        )))),
329    }
330}
331
332#[allow(dead_code)]
333async fn execute_command(
334    State(server): State<Arc<DittoServer>>,
335    Path(session_id): Path<String>,
336    Json(command): Json<serde_json::Value>,
337) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
338    let action = command
339        .get("action")
340        .and_then(|v| v.as_str())
341        .unwrap_or("unknown");
342
343    let parameters = command
344        .get("parameters")
345        .and_then(|v| v.as_object())
346        .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
347        .unwrap_or_default();
348
349    match server
350        .agent_manager
351        .execute_browser_command(&session_id, action.to_string(), parameters)
352        .await
353    {
354        Ok(result) => {
355            let result_data = serde_json::to_value(result).unwrap_or_default();
356            Ok(Json(ApiResponse::success(result_data)))
357        }
358        Err(e) => Ok(Json(ApiResponse::error(format!(
359            "Command execution failed: {}",
360            e
361        )))),
362    }
363}
364
365async fn create_skill(
366    State(_server): State<Arc<DittoServer>>,
367    Json(skill_data): Json<serde_json::Value>,
368) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
369    // Simplified skill creation
370    let skill_id = skill_data
371        .get("id")
372        .and_then(|v| v.as_str())
373        .unwrap_or("unknown");
374
375    let response = serde_json::json!({
376        "skill_id": skill_id,
377        "status": "created"
378    });
379
380    Ok(Json(ApiResponse::success(response)))
381}
382
383async fn list_skills(
384    State(server): State<Arc<DittoServer>>,
385) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
386    let skills = server.skill_engine.list_skills(None, None).await;
387
388    let skill_data: Vec<serde_json::Value> = skills
389        .into_iter()
390        .map(|skill| serde_json::to_value(skill).unwrap_or_default())
391        .collect();
392
393    Ok(Json(ApiResponse::success(skill_data)))
394}
395
396async fn get_skill(
397    State(server): State<Arc<DittoServer>>,
398    Path(skill_id): Path<String>,
399) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
400    match server.skill_engine.get_skill(&skill_id).await {
401        Some(skill) => {
402            let skill_data = serde_json::to_value(skill).unwrap_or_default();
403            Ok(Json(ApiResponse::success(skill_data)))
404        }
405        None => Ok(Json(ApiResponse::error("Skill not found".to_string()))),
406    }
407}
408
409#[allow(dead_code)]
410async fn search_skills(
411    State(server): State<Arc<DittoServer>>,
412    Path(query): Path<String>,
413) -> Result<Json<ApiResponse<Vec<serde_json::Value>>>, StatusCode> {
414    let skills = server.skill_engine.search_skills(&query).await;
415
416    let skill_data: Vec<serde_json::Value> = skills
417        .into_iter()
418        .map(|skill| serde_json::to_value(skill).unwrap_or_default())
419        .collect();
420
421    Ok(Json(ApiResponse::success(skill_data)))
422}
423
424async fn execute_skill(
425    State(server): State<Arc<DittoServer>>,
426    Path(skill_id): Path<String>,
427    Json(execution_data): Json<serde_json::Value>,
428) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
429    let agent_id = execution_data
430        .get("agent_id")
431        .and_then(|v| v.as_str())
432        .unwrap_or("admin");
433
434    let parameters = execution_data
435        .get("parameters")
436        .and_then(|v| v.as_object())
437        .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
438        .unwrap_or_default();
439
440    match server
441        .agent_manager
442        .execute_skill(agent_id, &skill_id, parameters)
443        .await
444    {
445        Ok(execution_id) => {
446            let response = serde_json::json!({
447                "execution_id": execution_id,
448                "skill_id": skill_id,
449                "status": "started"
450            });
451            Ok(Json(ApiResponse::success(response)))
452        }
453        Err(e) => Ok(Json(ApiResponse::error(format!(
454            "Skill execution failed: {}",
455            e
456        )))),
457    }
458}
459
460#[allow(dead_code)]
461async fn get_system_stats(
462    State(server): State<Arc<DittoServer>>,
463) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
464    let agent_stats = server.agent_manager.get_agent_stats().await;
465    let session_stats = server
466        .browser_manager
467        .get_session_stats()
468        .await
469        .map_err(|e| {
470            error!("Failed to get session stats: {}", e);
471            StatusCode::INTERNAL_SERVER_ERROR
472        })?;
473    let skill_stats = server.skill_engine.get_skill_stats().await;
474    let rate_limit_stats = server.rate_limiter.get_rate_limit_stats().await;
475    let stats = serde_json::json!({
476        "agents": agent_stats,
477        "sessions": session_stats,
478        "skills": skill_stats,
479        "rate_limits": rate_limit_stats,
480        "server": {
481            "version": "0.1.0",
482            "uptime": "0s", // Would calculate actual uptime
483            "memory_usage": "0MB" // Would get actual memory usage
484        }
485    });
486
487    Ok(Json(ApiResponse::success(stats)))
488}
489
490async fn get_health_check(
491    State(_server): State<Arc<DittoServer>>,
492) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
493    let health = serde_json::json!({
494        "status": "healthy",
495        "timestamp": chrono::Utc::now().to_rfc3339(),
496        "components": {
497            "browser_manager": "healthy",
498            "agent_manager": "healthy",
499            "skill_engine": "healthy",
500            "auth_manager": "healthy",
501            "rate_limiter": "healthy"
502        }
503    });
504
505    Ok(Json(ApiResponse::success(health)))
506}
507
508async fn websocket_handler(
509    State(server): State<Arc<DittoServer>>,
510    Path(agent_id): Path<String>,
511    ws: WebSocketUpgrade,
512) -> axum::response::Response {
513    ws.on_upgrade(move |socket| handle_websocket(socket, server, agent_id))
514}
515
516async fn handle_websocket(
517    socket: axum::extract::ws::WebSocket,
518    server: Arc<DittoServer>,
519    agent_id: String,
520) {
521    info!("Agent {} connected via WebSocket", agent_id);
522
523    let (mut sender, mut receiver) = socket.split();
524
525    while let Some(msg) = receiver.next().await {
526        match msg {
527            Ok(axum::extract::ws::Message::Text(text)) => {
528                debug!(
529                    "Received WebSocket message from agent {}: {}",
530                    agent_id, text
531                );
532
533                // Parse and handle the message
534                if let Ok(message) = serde_json::from_str::<serde_json::Value>(&text) {
535                    let response = handle_websocket_message(&server, &agent_id, message).await;
536
537                    if let Ok(response_text) = serde_json::to_string(&response) {
538                        if let Err(e) = sender
539                            .send(axum::extract::ws::Message::Text(response_text))
540                            .await
541                        {
542                            error!("Failed to send WebSocket response: {}", e);
543                            break;
544                        }
545                    }
546                } else {
547                    warn!("Invalid JSON received from agent {}", agent_id);
548                }
549            }
550            Ok(axum::extract::ws::Message::Close(_)) => {
551                info!("Agent {} disconnected", agent_id);
552                break;
553            }
554            Err(e) => {
555                error!("WebSocket error for agent {}: {}", agent_id, e);
556                break;
557            }
558            _ => {}
559        }
560    }
561}
562
563async fn handle_websocket_message(
564    server: &DittoServer,
565    agent_id: &str,
566    message: serde_json::Value,
567) -> ApiResponse<serde_json::Value> {
568    let message_type = message
569        .get("type")
570        .and_then(|v| v.as_str())
571        .unwrap_or("unknown");
572
573    match message_type {
574        "ping" => ApiResponse::success(serde_json::json!({"type": "pong"})),
575        "create_session" => {
576            if let Ok(session_id) = server.agent_manager.create_session(agent_id, None).await {
577                ApiResponse::success(serde_json::json!({
578                    "type": "session_created",
579                    "session_id": session_id
580                }))
581            } else {
582                ApiResponse::error("Failed to create session".to_string())
583            }
584        }
585        "execute_command" => {
586            if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) {
587                if let Some(action) = message.get("action").and_then(|v| v.as_str()) {
588                    let parameters = message
589                        .get("parameters")
590                        .and_then(|v| v.as_object())
591                        .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
592                        .unwrap_or_default();
593
594                    match server
595                        .agent_manager
596                        .execute_browser_command(session_id, action.to_string(), parameters)
597                        .await
598                    {
599                        Ok(result) => {
600                            let result_data = serde_json::to_value(result).unwrap_or_default();
601                            ApiResponse::success(serde_json::json!({
602                                "type": "command_result",
603                                "result": result_data
604                            }))
605                        }
606                        Err(e) => ApiResponse::error(format!("Command execution failed: {}", e)),
607                    }
608                } else {
609                    ApiResponse::error("Missing action parameter".to_string())
610                }
611            } else {
612                ApiResponse::error("Missing session_id parameter".to_string())
613            }
614        }
615        _ => ApiResponse::error(format!("Unknown message type: {}", message_type)),
616    }
617}