dx_forge/server/
api.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6    extract::ws::{Message, WebSocket, WebSocketUpgrade},
7    extract::State,
8    extract::{Path as AxumPath, Query},
9    http::StatusCode,
10    response::{IntoResponse, Response},
11    routing::{delete, get, post},
12    Json, Router,
13};
14use colored::*;
15use futures::{SinkExt, StreamExt};
16use tower_http::cors::{Any, CorsLayer};
17
18use crate::crdt::Operation;
19use crate::storage::{Blob, Database, OperationLog, R2Config, R2Storage};
20use crate::sync::{SyncManager, SyncMessage, GLOBAL_CLOCK};
21use dashmap::DashSet;
22use serde::{Deserialize, Serialize};
23use sha2::{Digest, Sha256};
24use uuid::Uuid;
25
26#[derive(Clone)]
27pub struct AppState {
28    pub oplog: Arc<OperationLog>,
29    pub db: Arc<Database>,
30    pub sync: SyncManager,
31    pub actor_id: String,
32    pub repo_id: String,
33    pub seen: Arc<DashSet<Uuid>>,
34    pub r2: Option<Arc<R2Storage>>, // R2 storage for blobs
35}
36
37/// API error type
38#[derive(Debug)]
39pub enum ApiError {
40    Internal(anyhow::Error),
41    NotFound(String),
42    BadRequest(String),
43}
44
45impl IntoResponse for ApiError {
46    fn into_response(self) -> Response {
47        let (status, message) = match self {
48            ApiError::Internal(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
49            ApiError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
50            ApiError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
51        };
52
53        let body = Json(serde_json::json!({
54            "error": message,
55        }));
56
57        (status, body).into_response()
58    }
59}
60
61impl<E> From<E> for ApiError
62where
63    E: Into<anyhow::Error>,
64{
65    fn from(err: E) -> Self {
66        ApiError::Internal(err.into())
67    }
68}
69
70/// Blob upload request
71#[derive(Debug, Deserialize)]
72pub struct UploadBlobRequest {
73    pub path: String,
74    pub content: String, // Base64 encoded
75}
76
77/// Blob upload response
78#[derive(Debug, Serialize)]
79pub struct UploadBlobResponse {
80    pub hash: String,
81    pub key: String,
82    pub size: u64,
83}
84
85pub async fn serve(port: u16, path: PathBuf) -> Result<()> {
86    // Initialize DB/oplog
87    let forge_path = path.join(".dx/forge");
88    let db = Arc::new(Database::new(&forge_path)?);
89    db.initialize()?;
90    let oplog = Arc::new(OperationLog::new(db.clone()));
91
92    // Load actor/repo identifiers
93    let config_path = forge_path.join("config.json");
94    let default_repo_id = {
95        let mut hasher = Sha256::new();
96        let path_string = forge_path.to_string_lossy().into_owned();
97        hasher.update(path_string.as_bytes());
98        format!("repo-{:x}", hasher.finalize())
99    };
100
101    let (actor_id, repo_id) = if let Ok(bytes) = tokio::fs::read(&config_path).await {
102        if let Ok(cfg) = serde_json::from_slice::<serde_json::Value>(&bytes) {
103            let actor = cfg
104                .get("actor_id")
105                .and_then(|s| s.as_str())
106                .map(|s| s.to_string())
107                .unwrap_or_else(|| whoami::username());
108            let repo = cfg
109                .get("repo_id")
110                .and_then(|s| s.as_str())
111                .map(|s| s.to_string())
112                .unwrap_or_else(|| default_repo_id.clone());
113            (actor, repo)
114        } else {
115            (whoami::username(), default_repo_id.clone())
116        }
117    } else {
118        (whoami::username(), default_repo_id)
119    };
120
121    // Try to load R2 config
122    let r2 = match R2Config::from_env() {
123        Ok(config) => {
124            println!(
125                "{} R2 Bucket: {}",
126                "✓".green(),
127                config.bucket_name.bright_white()
128            );
129            match R2Storage::new(config) {
130                Ok(storage) => {
131                    println!("{} R2 Storage enabled", "✓".green());
132                    Some(Arc::new(storage))
133                }
134                Err(e) => {
135                    println!("{} R2 Storage failed: {}", "⚠".yellow(), e);
136                    None
137                }
138            }
139        }
140        Err(_) => {
141            println!(
142                "{} R2 not configured (set R2_* in .env for blob storage)",
143                "ℹ".blue()
144            );
145            None
146        }
147    };
148
149    let state = AppState {
150        oplog,
151        db,
152        sync: SyncManager::new(),
153        actor_id,
154        repo_id,
155        seen: Arc::new(DashSet::new()),
156        r2,
157    };
158
159    let app = Router::new()
160        .route("/", get(|| async { "Forge API Server" }))
161        .route("/health", get(health_check))
162        .route("/ops", get(get_ops))
163        .route("/ws", get(ws_handler))
164        // Blob endpoints (if R2 is configured)
165        .route("/api/v1/blobs", post(upload_blob))
166        .route("/api/v1/blobs/:hash", get(download_blob))
167        .route("/api/v1/blobs/:hash", delete(delete_blob_handler))
168        .route("/api/v1/blobs/:hash/exists", get(check_blob_exists))
169        .route("/api/v1/blobs/batch", post(batch_upload))
170        // CORS for web clients
171        .layer(
172            CorsLayer::new()
173                .allow_origin(Any)
174                .allow_methods(Any)
175                .allow_headers(Any),
176        )
177        .with_state(state);
178
179    let addr = format!("0.0.0.0:{}", port);
180    println!(
181        "{} Server running at {}",
182        "✓".green(),
183        format!("http://{}", addr).bright_blue()
184    );
185
186    let listener = tokio::net::TcpListener::bind(&addr).await?;
187    axum::serve(listener, app).await?;
188
189    Ok(())
190}
191
192async fn ws_handler(
193    State(state): State<AppState>,
194    ws: WebSocketUpgrade,
195) -> impl axum::response::IntoResponse {
196    ws.on_upgrade(move |socket| handle_ws(state, socket))
197}
198
199async fn handle_ws(state: AppState, socket: WebSocket) {
200    let (mut sender, mut receiver) = socket.split();
201
202    // Send handshake immediately with server metadata
203    let handshake = SyncMessage::handshake(state.actor_id.clone(), state.repo_id.clone());
204    if let Ok(text) = serde_json::to_string(&handshake) {
205        let _ = sender.send(Message::Text(text.into())).await;
206    }
207
208    // Subscribe to local operations and forward to this client
209    let mut rx = state.sync.subscribe();
210    let send_task = tokio::spawn(async move {
211        while let Ok(op_arc) = rx.recv().await {
212            // Forward as JSON text
213            if let Ok(text) = serde_json::to_string(&SyncMessage::operation((*op_arc).clone())) {
214                if sender.send(Message::Text(text.into())).await.is_err() {
215                    break;
216                }
217            }
218        }
219    });
220
221    // Receive from client and publish
222    let state_recv = state.clone();
223    let recv_task = tokio::spawn(async move {
224        let oplog = state_recv.oplog.clone();
225        while let Some(msg) = receiver.next().await {
226            match msg {
227                Ok(Message::Text(text)) => {
228                    let text: String = text.to_string();
229                    if let Ok(msg) = serde_json::from_str::<SyncMessage>(&text) {
230                        match msg {
231                            SyncMessage::Handshake { actor_id, repo_id } => {
232                                println!(
233                                    "{} Peer handshake: actor={} repo={}",
234                                    "↔".bright_blue(),
235                                    actor_id.bright_yellow(),
236                                    repo_id.bright_white()
237                                );
238                            }
239                            SyncMessage::Operation { operation: op } => {
240                                if insert_seen(&state_recv.seen, op.id) {
241                                    if let Some(lamport) = op.lamport() {
242                                        GLOBAL_CLOCK.observe(lamport);
243                                    }
244                                    let _ = oplog.append(op.clone());
245                                    let _ = state_recv.sync.publish(Arc::new(op));
246                                }
247                            }
248                        }
249                    } else if let Ok(op) = serde_json::from_str::<Operation>(&text) {
250                        if insert_seen(&state_recv.seen, op.id) {
251                            if let Some(lamport) = op.lamport() {
252                                GLOBAL_CLOCK.observe(lamport);
253                            }
254                            let _ = oplog.append(op.clone());
255                            let _ = state_recv.sync.publish(Arc::new(op));
256                        }
257                    }
258                }
259                Ok(Message::Binary(bin)) => {
260                    if let Ok(op) = serde_cbor::from_slice::<Operation>(&bin) {
261                        if insert_seen(&state_recv.seen, op.id) {
262                            if let Some(lamport) = op.lamport() {
263                                GLOBAL_CLOCK.observe(lamport);
264                            }
265                            let _ = oplog.append(op.clone());
266                            let _ = state_recv.sync.publish(Arc::new(op));
267                        }
268                    }
269                }
270                Ok(Message::Close(_)) | Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {}
271                Err(_) => break,
272            }
273        }
274    });
275
276    let _ = tokio::join!(send_task, recv_task);
277}
278
279#[derive(Deserialize)]
280struct OpsQuery {
281    file: Option<String>,
282    limit: Option<usize>,
283}
284
285async fn get_ops(
286    State(state): State<AppState>,
287    Query(query): Query<OpsQuery>,
288) -> Result<Json<Vec<Operation>>, axum::http::StatusCode> {
289    let limit = query.limit.unwrap_or(50);
290    let result = if let Some(file) = query.file.as_deref() {
291        let p = std::path::PathBuf::from(file);
292        state.db.get_operations(Some(&p), limit)
293    } else {
294        state.db.get_operations(None, limit)
295    };
296
297    match result {
298        Ok(ops) => Ok(Json(ops)),
299        Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
300    }
301}
302
303const SEEN_LIMIT: usize = 10_000;
304
305fn insert_seen(cache: &DashSet<Uuid>, id: Uuid) -> bool {
306    let inserted = cache.insert(id);
307    if inserted {
308        enforce_seen_limit(cache);
309    }
310    inserted
311}
312
313fn enforce_seen_limit(cache: &DashSet<Uuid>) {
314    while cache.len() > SEEN_LIMIT {
315        if let Some(entry) = cache.iter().next() {
316            let key = *entry.key();
317            drop(entry);
318            cache.remove(&key);
319        } else {
320            break;
321        }
322    }
323}
324
325// ========== Blob Storage Endpoints ==========
326
327/// Health check endpoint with R2 status
328async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
329    Json(serde_json::json!({
330        "status": "healthy",
331        "service": "forge-api",
332        "version": env!("CARGO_PKG_VERSION"),
333        "r2_enabled": state.r2.is_some(),
334    }))
335}
336
337/// Upload blob endpoint
338async fn upload_blob(
339    State(state): State<AppState>,
340    Json(req): Json<UploadBlobRequest>,
341) -> Result<Json<UploadBlobResponse>, ApiError> {
342    let r2 = state
343        .r2
344        .as_ref()
345        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
346
347    // Decode base64 content
348    use base64::Engine;
349    let content = base64::engine::general_purpose::STANDARD
350        .decode(&req.content)
351        .map_err(|e| ApiError::BadRequest(format!("Invalid base64: {}", e)))?;
352
353    let blob = Blob::from_content(&req.path, content);
354    let hash = blob.hash().to_string();
355    let size = blob.metadata.size;
356
357    // Upload to R2
358    let key = r2.upload_blob(&blob).await?;
359
360    Ok(Json(UploadBlobResponse { hash, key, size }))
361}
362
363/// Download blob endpoint
364async fn download_blob(
365    State(state): State<AppState>,
366    AxumPath(hash): AxumPath<String>,
367) -> Result<Response, ApiError> {
368    let r2 = state
369        .r2
370        .as_ref()
371        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
372
373    let blob = r2
374        .download_blob(&hash)
375        .await
376        .map_err(|_| ApiError::NotFound(format!("Blob not found: {}", hash)))?;
377
378    // Return blob content with metadata headers
379    Ok((
380        StatusCode::OK,
381        [
382            ("Content-Type", blob.metadata.mime_type.clone()),
383            ("X-Blob-Hash", hash),
384            ("X-Blob-Size", blob.metadata.size.to_string()),
385        ],
386        blob.content,
387    )
388        .into_response())
389}
390
391/// Delete blob endpoint
392async fn delete_blob_handler(
393    State(state): State<AppState>,
394    AxumPath(hash): AxumPath<String>,
395) -> Result<StatusCode, ApiError> {
396    let r2 = state
397        .r2
398        .as_ref()
399        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
400
401    r2.delete_blob(&hash).await?;
402    Ok(StatusCode::NO_CONTENT)
403}
404
405/// Check if blob exists
406async fn check_blob_exists(
407    State(state): State<AppState>,
408    AxumPath(hash): AxumPath<String>,
409) -> Result<Json<serde_json::Value>, ApiError> {
410    let r2 = state
411        .r2
412        .as_ref()
413        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
414
415    let exists = r2.blob_exists(&hash).await?;
416
417    Ok(Json(serde_json::json!({
418        "exists": exists,
419        "hash": hash,
420    })))
421}
422
423/// Batch upload request
424#[derive(Debug, Deserialize)]
425pub struct BatchUploadRequest {
426    pub blobs: Vec<UploadBlobRequest>,
427}
428
429/// Batch upload response
430#[derive(Debug, Serialize)]
431pub struct BatchUploadResponse {
432    pub uploaded: Vec<UploadBlobResponse>,
433    pub failed: Vec<String>,
434}
435
436/// Batch upload endpoint
437async fn batch_upload(
438    State(state): State<AppState>,
439    Json(req): Json<BatchUploadRequest>,
440) -> Result<Json<BatchUploadResponse>, ApiError> {
441    let r2 = state
442        .r2
443        .as_ref()
444        .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
445
446    let mut uploaded = Vec::new();
447    let mut failed = Vec::new();
448
449    use base64::Engine;
450    for blob_req in req.blobs {
451        match base64::engine::general_purpose::STANDARD.decode(&blob_req.content) {
452            Ok(content) => {
453                let blob = Blob::from_content(&blob_req.path, content);
454                let hash = blob.hash().to_string();
455                let size = blob.metadata.size;
456
457                match r2.upload_blob(&blob).await {
458                    Ok(key) => {
459                        uploaded.push(UploadBlobResponse { hash, key, size });
460                    }
461                    Err(e) => {
462                        failed.push(format!("{}: {}", blob_req.path, e));
463                    }
464                }
465            }
466            Err(e) => {
467                failed.push(format!("{}: Invalid base64: {}", blob_req.path, e));
468            }
469        }
470    }
471
472    Ok(Json(BatchUploadResponse { uploaded, failed }))
473}