dx_forge/server/
api.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6    extract::ws::{Message, WebSocket, WebSocketUpgrade},
7    extract::State,
8    extract::{Path as AxumPath, Query},
9    http::StatusCode,
10    response::{IntoResponse, Response},
11    routing::{delete, get, post},
12    Json, Router,
13};
14use colored::*;
15use futures::{SinkExt, StreamExt};
16use tower_http::cors::{Any, CorsLayer};
17
18use crate::crdt::Operation;
19use crate::storage::{Blob, Database, OperationLog, R2Config, R2Storage};
20use crate::sync::{SyncManager, SyncMessage, GLOBAL_CLOCK};
21use crate::server::authentication::{AuthManager, LoginRequest, LoginResponse, CreateUserRequest, ChangePasswordRequest};
22use dashmap::DashSet;
23use serde::{Deserialize, Serialize};
24use sha2::{Digest, Sha256};
25use uuid::Uuid;
26
27#[derive(Clone)]
28pub struct AppState {
29    pub oplog: Arc<OperationLog>,
30    pub db: Arc<Database>,
31    pub sync: SyncManager,
32    pub actor_id: String,
33    pub repo_id: String,
34    pub seen: Arc<DashSet<Uuid>>,
35    pub r2: Option<Arc<R2Storage>>, // R2 storage for blobs
36    pub auth: Arc<AuthManager>, // Authentication manager
37}
38
39/// API error type
40#[derive(Debug)]
41pub enum ApiError {
42    Internal(anyhow::Error),
43    NotFound(String),
44    BadRequest(String),
45}
46
47impl IntoResponse for ApiError {
48    fn into_response(self) -> Response {
49        let (status, message) = match self {
50            ApiError::Internal(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
51            ApiError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
52            ApiError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
53        };
54
55        let body = Json(serde_json::json!({
56            "error": message,
57        }));
58
59        (status, body).into_response()
60    }
61}
62
63impl<E> From<E> for ApiError
64where
65    E: Into<anyhow::Error>,
66{
67    fn from(err: E) -> Self {
68        ApiError::Internal(err.into())
69    }
70}
71
72/// Blob upload request
73#[derive(Debug, Deserialize)]
74pub struct UploadBlobRequest {
75    pub path: String,
76    pub content: String, // Base64 encoded
77}
78
79/// Blob upload response
80#[derive(Debug, Serialize)]
81pub struct UploadBlobResponse {
82    pub hash: String,
83    pub key: String,
84    pub size: u64,
85}
86
87pub async fn serve(port: u16, path: PathBuf) -> Result<()> {
88    // Initialize DB/oplog
89    let forge_path = path.join(".dx/forge");
90    let db = Arc::new(Database::new(&forge_path)?);
91    db.initialize()?;
92    let oplog = Arc::new(OperationLog::new(db.clone()));
93
94    // Load actor/repo identifiers
95    let config_path = forge_path.join("config.json");
96    let default_repo_id = {
97        let mut hasher = Sha256::new();
98        let path_string = forge_path.to_string_lossy().into_owned();
99        hasher.update(path_string.as_bytes());
100        format!("repo-{:x}", hasher.finalize())
101    };
102
103    let (actor_id, repo_id) = if let Ok(bytes) = tokio::fs::read(&config_path).await {
104        if let Ok(cfg) = serde_json::from_slice::<serde_json::Value>(&bytes) {
105            let actor = cfg
106                .get("actor_id")
107                .and_then(|s| s.as_str())
108                .map(|s| s.to_string())
109                .unwrap_or_else(|| whoami::username());
110            let repo = cfg
111                .get("repo_id")
112                .and_then(|s| s.as_str())
113                .map(|s| s.to_string())
114                .unwrap_or_else(|| default_repo_id.clone());
115            (actor, repo)
116        } else {
117            (whoami::username(), default_repo_id.clone())
118        }
119    } else {
120        (whoami::username(), default_repo_id)
121    };
122
123    // Try to load R2 config
124    let r2 = match R2Config::from_env() {
125        Ok(config) => {
126            println!(
127                "{} R2 Bucket: {}",
128                "✓".green(),
129                config.bucket_name.bright_white()
130            );
131            match R2Storage::new(config) {
132                Ok(storage) => {
133                    println!("{} R2 Storage enabled", "✓".green());
134                    Some(Arc::new(storage))
135                }
136                Err(e) => {
137                    println!("{} R2 Storage failed: {}", "⚠".yellow(), e);
138                    None
139                }
140            }
141        }
142        Err(_) => {
143            println!(
144                "{} R2 not configured (set R2_* in .env for blob storage)",
145                "ℹ".blue()
146            );
147            None
148        }
149    };
150
151    let state = AppState {
152        oplog,
153        db,
154        sync: SyncManager::new(),
155        actor_id,
156        repo_id,
157        seen: Arc::new(DashSet::new()),
158        r2,
159        auth: Arc::new(AuthManager::new()),
160    };
161
162    let app = Router::new()
163        // Static files for web UI
164        .route("/", get(serve_index))
165        .route("/static/styles.css", get(serve_styles))
166        .route("/static/app.js", get(serve_app_js))
167        // API routes
168        .route("/health", get(health_check))
169        .route("/ops", get(get_ops))
170        .route("/ws", get(ws_handler))
171        // Authentication endpoints
172        .route("/api/v1/auth/login", post(login))
173        .route("/api/v1/auth/validate", get(validate_session))
174        .route("/api/v1/auth/me", get(get_current_user))
175        .route("/api/v1/auth/change-password", post(change_password))
176        // User management endpoints
177        .route("/api/v1/users", get(list_users))
178        .route("/api/v1/users", post(create_user))
179        .route("/api/v1/users/{username}", delete(delete_user))
180        // File browser endpoints
181        .route("/api/v1/files", get(list_files))
182        .route("/api/v1/files/{*path}", get(get_file_content))
183        // Blob endpoints (if R2 is configured)
184        .route("/api/v1/blobs", post(upload_blob))
185        .route("/api/v1/blobs/{hash}", get(download_blob))
186        .route("/api/v1/blobs/{hash}", delete(delete_blob_handler))
187        .route("/api/v1/blobs/{hash}/exists", get(check_blob_exists))
188        .route("/api/v1/blobs/batch", post(batch_upload))
189        // CORS for web clients
190        .layer(
191            CorsLayer::new()
192                .allow_origin(Any)
193                .allow_methods(Any)
194                .allow_headers(Any),
195        )
196        .with_state(state);
197
198    let addr = format!("0.0.0.0:{}", port);
199    println!(
200        "{} Server running at {}",
201        "✓".green(),
202        format!("http://localhost:{}", port).bright_blue()
203    );
204
205    let listener = tokio::net::TcpListener::bind(&addr).await?;
206    axum::serve(listener, app).await?;
207
208    Ok(())
209}
210
211async fn ws_handler(
212    State(state): State<AppState>,
213    ws: WebSocketUpgrade,
214) -> impl axum::response::IntoResponse {
215    ws.on_upgrade(move |socket| handle_ws(state, socket))
216}
217
218async fn handle_ws(state: AppState, socket: WebSocket) {
219    let (mut sender, mut receiver) = socket.split();
220
221    // Send handshake immediately with server metadata
222    let handshake = SyncMessage::handshake(state.actor_id.clone(), state.repo_id.clone());
223    if let Ok(text) = serde_json::to_string(&handshake) {
224        let _ = sender.send(Message::Text(text.into())).await;
225    }
226
227    // Subscribe to local operations and forward to this client
228    let mut rx = state.sync.subscribe();
229    let send_task = tokio::spawn(async move {
230        while let Ok(op_arc) = rx.recv().await {
231            // Forward as JSON text
232            if let Ok(text) = serde_json::to_string(&SyncMessage::operation((*op_arc).clone())) {
233                if sender.send(Message::Text(text.into())).await.is_err() {
234                    break;
235                }
236            }
237        }
238    });
239
240    // Receive from client and publish
241    let state_recv = state.clone();
242    let recv_task = tokio::spawn(async move {
243        let oplog = state_recv.oplog.clone();
244        while let Some(msg) = receiver.next().await {
245            match msg {
246                Ok(Message::Text(text)) => {
247                    let text: String = text.to_string();
248                    if let Ok(msg) = serde_json::from_str::<SyncMessage>(&text) {
249                        match msg {
250                            SyncMessage::Handshake { actor_id, repo_id } => {
251                                println!(
252                                    "{} Peer handshake: actor={} repo={}",
253                                    "↔".bright_blue(),
254                                    actor_id.bright_yellow(),
255                                    repo_id.bright_white()
256                                );
257                            }
258                            SyncMessage::Operation { operation: op } => {
259                                if insert_seen(&state_recv.seen, op.id) {
260                                    if let Some(lamport) = op.lamport() {
261                                        GLOBAL_CLOCK.observe(lamport);
262                                    }
263                                    let _ = oplog.append(op.clone());
264                                    let _ = state_recv.sync.publish(Arc::new(op));
265                                }
266                            }
267                        }
268                    } else if let Ok(op) = serde_json::from_str::<Operation>(&text) {
269                        if insert_seen(&state_recv.seen, op.id) {
270                            if let Some(lamport) = op.lamport() {
271                                GLOBAL_CLOCK.observe(lamport);
272                            }
273                            let _ = oplog.append(op.clone());
274                            let _ = state_recv.sync.publish(Arc::new(op));
275                        }
276                    }
277                }
278                Ok(Message::Binary(bin)) => {
279                    if let Ok(op) = serde_cbor::from_slice::<Operation>(&bin) {
280                        if insert_seen(&state_recv.seen, op.id) {
281                            if let Some(lamport) = op.lamport() {
282                                GLOBAL_CLOCK.observe(lamport);
283                            }
284                            let _ = oplog.append(op.clone());
285                            let _ = state_recv.sync.publish(Arc::new(op));
286                        }
287                    }
288                }
289                Ok(Message::Close(_)) | Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {}
290                Err(_) => break,
291            }
292        }
293    });
294
295    let _ = tokio::join!(send_task, recv_task);
296}
297
298#[derive(Deserialize)]
299struct OpsQuery {
300    file: Option<String>,
301    limit: Option<usize>,
302}
303
304async fn get_ops(
305    State(state): State<AppState>,
306    Query(query): Query<OpsQuery>,
307) -> Result<Json<Vec<Operation>>, axum::http::StatusCode> {
308    let limit = query.limit.unwrap_or(50);
309    let result = if let Some(file) = query.file.as_deref() {
310        let p = std::path::PathBuf::from(file);
311        state.db.get_operations(Some(&p), limit)
312    } else {
313        state.db.get_operations(None, limit)
314    };
315
316    match result {
317        Ok(ops) => Ok(Json(ops)),
318        Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
319    }
320}
321
322const SEEN_LIMIT: usize = 10_000;
323
324fn insert_seen(cache: &DashSet<Uuid>, id: Uuid) -> bool {
325    let inserted = cache.insert(id);
326    if inserted {
327        enforce_seen_limit(cache);
328    }
329    inserted
330}
331
332fn enforce_seen_limit(cache: &DashSet<Uuid>) {
333    while cache.len() > SEEN_LIMIT {
334        if let Some(entry) = cache.iter().next() {
335            let key = *entry.key();
336            drop(entry);
337            cache.remove(&key);
338        } else {
339            break;
340        }
341    }
342}
343
344// ========== Blob Storage Endpoints ==========
345
346/// Health check endpoint with R2 status
347async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
348    Json(serde_json::json!({
349        "status": "healthy",
350        "service": "forge-api",
351        "version": env!("CARGO_PKG_VERSION"),
352        "r2_enabled": state.r2.is_some(),
353    }))
354}
355
356/// Upload blob endpoint
357async fn upload_blob(
358    State(state): State<AppState>,
359    Json(req): Json<UploadBlobRequest>,
360) -> Result<Json<UploadBlobResponse>, ApiError> {
361    let r2 = state
362        .r2
363        .as_ref()
364        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
365
366    // Decode base64 content
367    use base64::Engine;
368    let content = base64::engine::general_purpose::STANDARD
369        .decode(&req.content)
370        .map_err(|e| ApiError::BadRequest(format!("Invalid base64: {}", e)))?;
371
372    let blob = Blob::from_content(&req.path, content);
373    let hash = blob.hash().to_string();
374    let size = blob.metadata.size;
375
376    // Upload to R2
377    let key = r2.upload_blob(&blob).await?;
378
379    Ok(Json(UploadBlobResponse { hash, key, size }))
380}
381
382/// Download blob endpoint
383async fn download_blob(
384    State(state): State<AppState>,
385    AxumPath(hash): AxumPath<String>,
386) -> Result<Response, ApiError> {
387    let r2 = state
388        .r2
389        .as_ref()
390        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
391
392    let blob = r2
393        .download_blob(&hash)
394        .await
395        .map_err(|_| ApiError::NotFound(format!("Blob not found: {}", hash)))?;
396
397    // Return blob content with metadata headers
398    Ok((
399        StatusCode::OK,
400        [
401            ("Content-Type", blob.metadata.mime_type.clone()),
402            ("X-Blob-Hash", hash),
403            ("X-Blob-Size", blob.metadata.size.to_string()),
404        ],
405        blob.content,
406    )
407        .into_response())
408}
409
410/// Delete blob endpoint
411async fn delete_blob_handler(
412    State(state): State<AppState>,
413    AxumPath(hash): AxumPath<String>,
414) -> Result<StatusCode, ApiError> {
415    let r2 = state
416        .r2
417        .as_ref()
418        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
419
420    r2.delete_blob(&hash).await?;
421    Ok(StatusCode::NO_CONTENT)
422}
423
424/// Check if blob exists
425async fn check_blob_exists(
426    State(state): State<AppState>,
427    AxumPath(hash): AxumPath<String>,
428) -> Result<Json<serde_json::Value>, ApiError> {
429    let r2 = state
430        .r2
431        .as_ref()
432        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
433
434    let exists = r2.blob_exists(&hash).await?;
435
436    Ok(Json(serde_json::json!({
437        "exists": exists,
438        "hash": hash,
439    })))
440}
441
442/// Batch upload request
443#[derive(Debug, Deserialize)]
444pub struct BatchUploadRequest {
445    pub blobs: Vec<UploadBlobRequest>,
446}
447
448/// Batch upload response
449#[derive(Debug, Serialize)]
450pub struct BatchUploadResponse {
451    pub uploaded: Vec<UploadBlobResponse>,
452    pub failed: Vec<String>,
453}
454
455/// Batch upload endpoint
456async fn batch_upload(
457    State(state): State<AppState>,
458    Json(req): Json<BatchUploadRequest>,
459) -> Result<Json<BatchUploadResponse>, ApiError> {
460    let r2 = state
461        .r2
462        .as_ref()
463        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
464
465    let mut uploaded = Vec::new();
466    let mut failed = Vec::new();
467
468    use base64::Engine;
469    for blob_req in req.blobs {
470        match base64::engine::general_purpose::STANDARD.decode(&blob_req.content) {
471            Ok(content) => {
472                let blob = Blob::from_content(&blob_req.path, content);
473                let hash = blob.hash().to_string();
474                let size = blob.metadata.size;
475
476                match r2.upload_blob(&blob).await {
477                    Ok(key) => {
478                        uploaded.push(UploadBlobResponse { hash, key, size });
479                    }
480                    Err(e) => {
481                        failed.push(format!("{}: {}", blob_req.path, e));
482                    }
483                }
484            }
485            Err(e) => {
486                failed.push(format!("{}: Invalid base64: {}", blob_req.path, e));
487            }
488        }
489    }
490
491    Ok(Json(BatchUploadResponse { uploaded, failed }))
492}
493
494// ========== Static File Serving ==========
495
496const INDEX_HTML: &str = include_str!("web_ui/index.html");
497const STYLES_CSS: &str = include_str!("web_ui/styles.css");
498const APP_JS: &str = include_str!("web_ui/app.js");
499
500async fn serve_index() -> impl IntoResponse {
501    (
502        StatusCode::OK,
503        [("Content-Type", "text/html; charset=utf-8")],
504        INDEX_HTML,
505    )
506}
507
508async fn serve_styles() -> impl IntoResponse {
509    (
510        StatusCode::OK,
511        [("Content-Type", "text/css; charset=utf-8")],
512        STYLES_CSS,
513    )
514}
515
516async fn serve_app_js() -> impl IntoResponse {
517    (
518        StatusCode::OK,
519        [("Content-Type", "application/javascript; charset=utf-8")],
520        APP_JS,
521    )
522}
523
524// ========== Authentication Endpoints ==========
525
526async fn login(
527    State(state): State<AppState>,
528    Json(req): Json<LoginRequest>,
529) -> Result<Json<LoginResponse>, ApiError> {
530    let session = state.auth.login(&req.username, &req.password)?;
531    
532    Ok(Json(LoginResponse {
533        token: session.token,
534        username: session.username,
535        role: session.role,
536        expires_at: session.expires_at,
537    }))
538}
539
540async fn validate_session(
541    State(state): State<AppState>,
542    headers: axum::http::HeaderMap,
543) -> Result<StatusCode, ApiError> {
544    let token = extract_token(&headers)?;
545    state.auth.validate_token(&token)?;
546    Ok(StatusCode::OK)
547}
548
549async fn get_current_user(
550    State(_state): State<AppState>,
551    _headers: axum::http::HeaderMap,
552) -> Result<Json<serde_json::Value>, ApiError> {
553    // Return dummy user for no-auth mode
554    Ok(Json(serde_json::json!({
555        "username": "guest",
556        "role": "admin",
557        "user_id": "guest-id",
558    })))
559}
560
561async fn change_password(
562    State(state): State<AppState>,
563    headers: axum::http::HeaderMap,
564    Json(req): Json<ChangePasswordRequest>,
565) -> Result<StatusCode, ApiError> {
566    let token = extract_token(&headers)?;
567    let session = state.auth.validate_token(&token)?;
568    
569    state.auth.update_password(&session.username, &req.old_password, &req.new_password)?;
570    Ok(StatusCode::OK)
571}
572
573fn extract_token(headers: &axum::http::HeaderMap) -> Result<String, ApiError> {
574    let auth_header = headers
575        .get("Authorization")
576        .ok_or_else(|| ApiError::BadRequest("Missing Authorization header".to_string()))?
577        .to_str()
578        .map_err(|_| ApiError::BadRequest("Invalid Authorization header".to_string()))?;
579    
580    auth_header
581        .strip_prefix("Bearer ")
582        .map(|s| s.to_string())
583        .ok_or_else(|| ApiError::BadRequest("Invalid Authorization format".to_string()))
584}
585
586// ========== User Management Endpoints ==========
587
588async fn list_users(
589    State(state): State<AppState>,
590    _headers: axum::http::HeaderMap,
591) -> Result<Json<Vec<serde_json::Value>>, ApiError> {
592    // let token = extract_token(&headers)?;
593    // let session = state.auth.validate_token(&token)?;
594    
595    // Only allow admins and developers to list users
596    // if session.role != crate::server::authentication::Role::Admin {
597    //     return Err(ApiError::BadRequest("Insufficient permissions".to_string()));
598    // }
599    
600    let users = state.auth.list_users();
601    let user_list: Vec<_> = users.into_iter().map(|u| {
602        serde_json::json!({
603            "username": u.username,
604            "email": u.email,
605            "role": u.role,
606            "created_at": u.created_at,
607        })
608    }).collect();
609    
610    Ok(Json(user_list))
611}
612
613async fn create_user(
614    State(state): State<AppState>,
615    headers: axum::http::HeaderMap,
616    Json(req): Json<CreateUserRequest>,
617) -> Result<StatusCode, ApiError> {
618    let token = extract_token(&headers)?;
619    let session = state.auth.validate_token(&token)?;
620    
621    // Only admins can create users
622    if session.role != crate::server::authentication::Role::Admin {
623        return Err(ApiError::BadRequest("Insufficient permissions".to_string()));
624    }
625    
626    state.auth.register(req.username, &req.password, req.role)?;
627    Ok(StatusCode::CREATED)
628}
629
630async fn delete_user(
631    State(state): State<AppState>,
632    headers: axum::http::HeaderMap,
633    AxumPath(username): AxumPath<String>,
634) -> Result<StatusCode, ApiError> {
635    let token = extract_token(&headers)?;
636    let session = state.auth.validate_token(&token)?;
637    
638    // Only admins can delete users
639    if session.role != crate::server::authentication::Role::Admin {
640        return Err(ApiError::BadRequest("Insufficient permissions".to_string()));
641    }
642    
643    // Prevent self-deletion
644    if username == session.username {
645        return Err(ApiError::BadRequest("Cannot delete your own account".to_string()));
646    }
647    
648    state.auth.delete_user(&username)?;
649    Ok(StatusCode::NO_CONTENT)
650}
651
652// ========== File Browser Endpoints ==========
653
654#[derive(Serialize)]
655struct FileInfo {
656    name: String,
657    path: String,
658    is_dir: bool,
659    size: Option<u64>,
660}
661
662async fn list_files(
663    State(_state): State<AppState>,
664    _headers: axum::http::HeaderMap,
665) -> Result<Json<Vec<FileInfo>>, ApiError> {
666    // No auth check needed
667    
668    // List files in current directory  
669    let current_dir = std::env::current_dir()
670        .map_err(|e| ApiError::Internal(e.into()))?;
671    
672    let mut files = Vec::new();
673    
674    if let Ok(entries) = std::fs::read_dir(&current_dir) {
675        for entry in entries.flatten() {
676            let path = entry.path();
677            let name = path.file_name()
678                .and_then(|n| n.to_str())
679                .unwrap_or("Unknown")
680                .to_string();
681            
682            // Skip hidden files and forge directory
683            if name.starts_with('.') {
684                continue;
685            }
686            
687            let is_dir = path.is_dir();
688            let size = if is_dir {
689                None
690            } else {
691                std::fs::metadata(&path).ok().map(|m| m.len())
692            };
693            
694            files.push(FileInfo {
695                name,
696                path: path.strip_prefix(&current_dir)
697                    .ok()
698                    .and_then(|p| p.to_str())
699                    .unwrap_or("")
700                    .to_string(),
701                is_dir,
702                size,
703            });
704        }
705    }
706    
707    // Sort: directories first, then by name
708    files.sort_by(|a, b| {
709        match (a.is_dir, b.is_dir) {
710            (true, false) => std::cmp::Ordering::Less,
711            (false, true) => std::cmp::Ordering::Greater,
712            _ => a.name.cmp(&b.name),
713        }
714    });
715    
716    Ok(Json(files))
717}
718
719#[derive(Serialize)]
720struct FileContentResponse {
721    content: String,
722    path: String,
723}
724
725async fn get_file_content(
726    State(_state): State<AppState>,
727    _headers: axum::http::HeaderMap,
728    AxumPath(path): AxumPath<String>,
729) -> Result<Json<FileContentResponse>, ApiError> {
730    // No auth check needed
731    
732    println!("DEBUG: Requested file path: {}", path);
733
734    let current_dir = std::env::current_dir()
735        .map_err(|e| ApiError::Internal(e.into()))?;
736    
737    let file_path = current_dir.join(&path);
738    println!("DEBUG: Resolved file path: {:?}", file_path);
739    
740    // Security check: ensure path doesn't escape current directory
741    if !file_path.starts_with(&current_dir) {
742        return Err(ApiError::BadRequest("Invalid file path".to_string()));
743    }
744    
745    // Check if file exists and is not a directory
746    if !file_path.exists() {
747        return Err(ApiError::NotFound(format!("File not found: {}", path)));
748    }
749    
750    if file_path.is_dir() {
751        return Err(ApiError::BadRequest("Path is a directory".to_string()));
752    }
753    
754    // Read file content
755    let content = std::fs::read_to_string(&file_path)
756        .map_err(|e| ApiError::Internal(e.into()))?;
757    
758    Ok(Json(FileContentResponse {
759        content,
760        path,
761    }))
762}