1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6 extract::ws::{Message, WebSocket, WebSocketUpgrade},
7 extract::State,
8 extract::{Path as AxumPath, Query},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 routing::{delete, get, post},
12 Json, Router,
13};
14use colored::*;
15use futures::{SinkExt, StreamExt};
16use tower_http::cors::{Any, CorsLayer};
17
18use crate::crdt::Operation;
19use crate::storage::{Blob, Database, OperationLog, R2Config, R2Storage};
20use crate::sync::{SyncManager, SyncMessage, GLOBAL_CLOCK};
21use dashmap::DashSet;
22use serde::{Deserialize, Serialize};
23use sha2::{Digest, Sha256};
24use uuid::Uuid;
25
26#[derive(Clone)]
27pub struct AppState {
28 pub oplog: Arc<OperationLog>,
29 pub db: Arc<Database>,
30 pub sync: SyncManager,
31 pub actor_id: String,
32 pub repo_id: String,
33 pub seen: Arc<DashSet<Uuid>>,
34 pub r2: Option<Arc<R2Storage>>, }
36
37#[derive(Debug)]
39pub enum ApiError {
40 Internal(anyhow::Error),
41 NotFound(String),
42 BadRequest(String),
43}
44
45impl IntoResponse for ApiError {
46 fn into_response(self) -> Response {
47 let (status, message) = match self {
48 ApiError::Internal(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
49 ApiError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
50 ApiError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
51 };
52
53 let body = Json(serde_json::json!({
54 "error": message,
55 }));
56
57 (status, body).into_response()
58 }
59}
60
61impl<E> From<E> for ApiError
62where
63 E: Into<anyhow::Error>,
64{
65 fn from(err: E) -> Self {
66 ApiError::Internal(err.into())
67 }
68}
69
70#[derive(Debug, Deserialize)]
72pub struct UploadBlobRequest {
73 pub path: String,
74 pub content: String, }
76
77#[derive(Debug, Serialize)]
79pub struct UploadBlobResponse {
80 pub hash: String,
81 pub key: String,
82 pub size: u64,
83}
84
85pub async fn serve(port: u16, path: PathBuf) -> Result<()> {
86 let forge_path = path.join(".dx/forge");
88 let db = Arc::new(Database::new(&forge_path)?);
89 db.initialize()?;
90 let oplog = Arc::new(OperationLog::new(db.clone()));
91
92 let config_path = forge_path.join("config.json");
94 let default_repo_id = {
95 let mut hasher = Sha256::new();
96 let path_string = forge_path.to_string_lossy().into_owned();
97 hasher.update(path_string.as_bytes());
98 format!("repo-{:x}", hasher.finalize())
99 };
100
101 let (actor_id, repo_id) = if let Ok(bytes) = tokio::fs::read(&config_path).await {
102 if let Ok(cfg) = serde_json::from_slice::<serde_json::Value>(&bytes) {
103 let actor = cfg
104 .get("actor_id")
105 .and_then(|s| s.as_str())
106 .map(|s| s.to_string())
107 .unwrap_or_else(|| whoami::username());
108 let repo = cfg
109 .get("repo_id")
110 .and_then(|s| s.as_str())
111 .map(|s| s.to_string())
112 .unwrap_or_else(|| default_repo_id.clone());
113 (actor, repo)
114 } else {
115 (whoami::username(), default_repo_id.clone())
116 }
117 } else {
118 (whoami::username(), default_repo_id)
119 };
120
121 let r2 = match R2Config::from_env() {
123 Ok(config) => {
124 println!(
125 "{} R2 Bucket: {}",
126 "✓".green(),
127 config.bucket_name.bright_white()
128 );
129 match R2Storage::new(config) {
130 Ok(storage) => {
131 println!("{} R2 Storage enabled", "✓".green());
132 Some(Arc::new(storage))
133 }
134 Err(e) => {
135 println!("{} R2 Storage failed: {}", "⚠".yellow(), e);
136 None
137 }
138 }
139 }
140 Err(_) => {
141 println!(
142 "{} R2 not configured (set R2_* in .env for blob storage)",
143 "ℹ".blue()
144 );
145 None
146 }
147 };
148
149 let state = AppState {
150 oplog,
151 db,
152 sync: SyncManager::new(),
153 actor_id,
154 repo_id,
155 seen: Arc::new(DashSet::new()),
156 r2,
157 };
158
159 let app = Router::new()
160 .route("/", get(|| async { "Forge API Server" }))
161 .route("/health", get(health_check))
162 .route("/ops", get(get_ops))
163 .route("/ws", get(ws_handler))
164 .route("/api/v1/blobs", post(upload_blob))
166 .route("/api/v1/blobs/:hash", get(download_blob))
167 .route("/api/v1/blobs/:hash", delete(delete_blob_handler))
168 .route("/api/v1/blobs/:hash/exists", get(check_blob_exists))
169 .route("/api/v1/blobs/batch", post(batch_upload))
170 .layer(
172 CorsLayer::new()
173 .allow_origin(Any)
174 .allow_methods(Any)
175 .allow_headers(Any),
176 )
177 .with_state(state);
178
179 let addr = format!("0.0.0.0:{}", port);
180 println!(
181 "{} Server running at {}",
182 "✓".green(),
183 format!("http://{}", addr).bright_blue()
184 );
185
186 let listener = tokio::net::TcpListener::bind(&addr).await?;
187 axum::serve(listener, app).await?;
188
189 Ok(())
190}
191
192async fn ws_handler(
193 State(state): State<AppState>,
194 ws: WebSocketUpgrade,
195) -> impl axum::response::IntoResponse {
196 ws.on_upgrade(move |socket| handle_ws(state, socket))
197}
198
199async fn handle_ws(state: AppState, socket: WebSocket) {
200 let (mut sender, mut receiver) = socket.split();
201
202 let handshake = SyncMessage::handshake(state.actor_id.clone(), state.repo_id.clone());
204 if let Ok(text) = serde_json::to_string(&handshake) {
205 let _ = sender.send(Message::Text(text.into())).await;
206 }
207
208 let mut rx = state.sync.subscribe();
210 let send_task = tokio::spawn(async move {
211 while let Ok(op_arc) = rx.recv().await {
212 if let Ok(text) = serde_json::to_string(&SyncMessage::operation((*op_arc).clone())) {
214 if sender.send(Message::Text(text.into())).await.is_err() {
215 break;
216 }
217 }
218 }
219 });
220
221 let state_recv = state.clone();
223 let recv_task = tokio::spawn(async move {
224 let oplog = state_recv.oplog.clone();
225 while let Some(msg) = receiver.next().await {
226 match msg {
227 Ok(Message::Text(text)) => {
228 let text: String = text.to_string();
229 if let Ok(msg) = serde_json::from_str::<SyncMessage>(&text) {
230 match msg {
231 SyncMessage::Handshake { actor_id, repo_id } => {
232 println!(
233 "{} Peer handshake: actor={} repo={}",
234 "↔".bright_blue(),
235 actor_id.bright_yellow(),
236 repo_id.bright_white()
237 );
238 }
239 SyncMessage::Operation { operation: op } => {
240 if insert_seen(&state_recv.seen, op.id) {
241 if let Some(lamport) = op.lamport() {
242 GLOBAL_CLOCK.observe(lamport);
243 }
244 let _ = oplog.append(op.clone());
245 let _ = state_recv.sync.publish(Arc::new(op));
246 }
247 }
248 }
249 } else if let Ok(op) = serde_json::from_str::<Operation>(&text) {
250 if insert_seen(&state_recv.seen, op.id) {
251 if let Some(lamport) = op.lamport() {
252 GLOBAL_CLOCK.observe(lamport);
253 }
254 let _ = oplog.append(op.clone());
255 let _ = state_recv.sync.publish(Arc::new(op));
256 }
257 }
258 }
259 Ok(Message::Binary(bin)) => {
260 if let Ok(op) = serde_cbor::from_slice::<Operation>(&bin) {
261 if insert_seen(&state_recv.seen, op.id) {
262 if let Some(lamport) = op.lamport() {
263 GLOBAL_CLOCK.observe(lamport);
264 }
265 let _ = oplog.append(op.clone());
266 let _ = state_recv.sync.publish(Arc::new(op));
267 }
268 }
269 }
270 Ok(Message::Close(_)) | Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {}
271 Err(_) => break,
272 }
273 }
274 });
275
276 let _ = tokio::join!(send_task, recv_task);
277}
278
279#[derive(Deserialize)]
280struct OpsQuery {
281 file: Option<String>,
282 limit: Option<usize>,
283}
284
285async fn get_ops(
286 State(state): State<AppState>,
287 Query(query): Query<OpsQuery>,
288) -> Result<Json<Vec<Operation>>, axum::http::StatusCode> {
289 let limit = query.limit.unwrap_or(50);
290 let result = if let Some(file) = query.file.as_deref() {
291 let p = std::path::PathBuf::from(file);
292 state.db.get_operations(Some(&p), limit)
293 } else {
294 state.db.get_operations(None, limit)
295 };
296
297 match result {
298 Ok(ops) => Ok(Json(ops)),
299 Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
300 }
301}
302
303const SEEN_LIMIT: usize = 10_000;
304
305fn insert_seen(cache: &DashSet<Uuid>, id: Uuid) -> bool {
306 let inserted = cache.insert(id);
307 if inserted {
308 enforce_seen_limit(cache);
309 }
310 inserted
311}
312
313fn enforce_seen_limit(cache: &DashSet<Uuid>) {
314 while cache.len() > SEEN_LIMIT {
315 if let Some(entry) = cache.iter().next() {
316 let key = *entry.key();
317 drop(entry);
318 cache.remove(&key);
319 } else {
320 break;
321 }
322 }
323}
324
325async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
329 Json(serde_json::json!({
330 "status": "healthy",
331 "service": "forge-api",
332 "version": env!("CARGO_PKG_VERSION"),
333 "r2_enabled": state.r2.is_some(),
334 }))
335}
336
337async fn upload_blob(
339 State(state): State<AppState>,
340 Json(req): Json<UploadBlobRequest>,
341) -> Result<Json<UploadBlobResponse>, ApiError> {
342 let r2 = state
343 .r2
344 .as_ref()
345 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
346
347 use base64::Engine;
349 let content = base64::engine::general_purpose::STANDARD
350 .decode(&req.content)
351 .map_err(|e| ApiError::BadRequest(format!("Invalid base64: {}", e)))?;
352
353 let blob = Blob::from_content(&req.path, content);
354 let hash = blob.hash().to_string();
355 let size = blob.metadata.size;
356
357 let key = r2.upload_blob(&blob).await?;
359
360 Ok(Json(UploadBlobResponse { hash, key, size }))
361}
362
363async fn download_blob(
365 State(state): State<AppState>,
366 AxumPath(hash): AxumPath<String>,
367) -> Result<Response, ApiError> {
368 let r2 = state
369 .r2
370 .as_ref()
371 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
372
373 let blob = r2
374 .download_blob(&hash)
375 .await
376 .map_err(|_| ApiError::NotFound(format!("Blob not found: {}", hash)))?;
377
378 Ok((
380 StatusCode::OK,
381 [
382 ("Content-Type", blob.metadata.mime_type.clone()),
383 ("X-Blob-Hash", hash),
384 ("X-Blob-Size", blob.metadata.size.to_string()),
385 ],
386 blob.content,
387 )
388 .into_response())
389}
390
391async fn delete_blob_handler(
393 State(state): State<AppState>,
394 AxumPath(hash): AxumPath<String>,
395) -> Result<StatusCode, ApiError> {
396 let r2 = state
397 .r2
398 .as_ref()
399 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
400
401 r2.delete_blob(&hash).await?;
402 Ok(StatusCode::NO_CONTENT)
403}
404
405async fn check_blob_exists(
407 State(state): State<AppState>,
408 AxumPath(hash): AxumPath<String>,
409) -> Result<Json<serde_json::Value>, ApiError> {
410 let r2 = state
411 .r2
412 .as_ref()
413 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
414
415 let exists = r2.blob_exists(&hash).await?;
416
417 Ok(Json(serde_json::json!({
418 "exists": exists,
419 "hash": hash,
420 })))
421}
422
423#[derive(Debug, Deserialize)]
425pub struct BatchUploadRequest {
426 pub blobs: Vec<UploadBlobRequest>,
427}
428
429#[derive(Debug, Serialize)]
431pub struct BatchUploadResponse {
432 pub uploaded: Vec<UploadBlobResponse>,
433 pub failed: Vec<String>,
434}
435
436async fn batch_upload(
438 State(state): State<AppState>,
439 Json(req): Json<BatchUploadRequest>,
440) -> Result<Json<BatchUploadResponse>, ApiError> {
441 let r2 = state
442 .r2
443 .as_ref()
444 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
445
446 let mut uploaded = Vec::new();
447 let mut failed = Vec::new();
448
449 use base64::Engine;
450 for blob_req in req.blobs {
451 match base64::engine::general_purpose::STANDARD.decode(&blob_req.content) {
452 Ok(content) => {
453 let blob = Blob::from_content(&blob_req.path, content);
454 let hash = blob.hash().to_string();
455 let size = blob.metadata.size;
456
457 match r2.upload_blob(&blob).await {
458 Ok(key) => {
459 uploaded.push(UploadBlobResponse { hash, key, size });
460 }
461 Err(e) => {
462 failed.push(format!("{}: {}", blob_req.path, e));
463 }
464 }
465 }
466 Err(e) => {
467 failed.push(format!("{}: Invalid base64: {}", blob_req.path, e));
468 }
469 }
470 }
471
472 Ok(Json(BatchUploadResponse { uploaded, failed }))
473}