1use std::path::PathBuf;
2use std::sync::Arc;
3
4use anyhow::Result;
5use axum::{
6 extract::ws::{Message, WebSocket, WebSocketUpgrade},
7 extract::State,
8 extract::{Path as AxumPath, Query},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 routing::{delete, get, post},
12 Json, Router,
13};
14use colored::*;
15use futures::{SinkExt, StreamExt};
16use tower_http::cors::{Any, CorsLayer};
17
18use crate::crdt::Operation;
19use crate::storage::{Blob, Database, OperationLog, R2Config, R2Storage};
20use crate::sync::{SyncManager, SyncMessage, GLOBAL_CLOCK};
21use crate::server::authentication::{AuthManager, LoginRequest, LoginResponse, CreateUserRequest, ChangePasswordRequest};
22use dashmap::DashSet;
23use serde::{Deserialize, Serialize};
24use sha2::{Digest, Sha256};
25use uuid::Uuid;
26
27#[derive(Clone)]
28pub struct AppState {
29 pub oplog: Arc<OperationLog>,
30 pub db: Arc<Database>,
31 pub sync: SyncManager,
32 pub actor_id: String,
33 pub repo_id: String,
34 pub seen: Arc<DashSet<Uuid>>,
35 pub r2: Option<Arc<R2Storage>>, pub auth: Arc<AuthManager>, }
38
39#[derive(Debug)]
41pub enum ApiError {
42 Internal(anyhow::Error),
43 NotFound(String),
44 BadRequest(String),
45}
46
47impl IntoResponse for ApiError {
48 fn into_response(self) -> Response {
49 let (status, message) = match self {
50 ApiError::Internal(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
51 ApiError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
52 ApiError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
53 };
54
55 let body = Json(serde_json::json!({
56 "error": message,
57 }));
58
59 (status, body).into_response()
60 }
61}
62
63impl<E> From<E> for ApiError
64where
65 E: Into<anyhow::Error>,
66{
67 fn from(err: E) -> Self {
68 ApiError::Internal(err.into())
69 }
70}
71
72#[derive(Debug, Deserialize)]
74pub struct UploadBlobRequest {
75 pub path: String,
76 pub content: String, }
78
79#[derive(Debug, Serialize)]
81pub struct UploadBlobResponse {
82 pub hash: String,
83 pub key: String,
84 pub size: u64,
85}
86
87pub async fn serve(port: u16, path: PathBuf) -> Result<()> {
88 let forge_path = path.join(".dx/forge");
90 let db = Arc::new(Database::new(&forge_path)?);
91 db.initialize()?;
92 let oplog = Arc::new(OperationLog::new(db.clone()));
93
94 let config_path = forge_path.join("config.json");
96 let default_repo_id = {
97 let mut hasher = Sha256::new();
98 let path_string = forge_path.to_string_lossy().into_owned();
99 hasher.update(path_string.as_bytes());
100 format!("repo-{:x}", hasher.finalize())
101 };
102
103 let (actor_id, repo_id) = if let Ok(bytes) = tokio::fs::read(&config_path).await {
104 if let Ok(cfg) = serde_json::from_slice::<serde_json::Value>(&bytes) {
105 let actor = cfg
106 .get("actor_id")
107 .and_then(|s| s.as_str())
108 .map(|s| s.to_string())
109 .unwrap_or_else(|| whoami::username());
110 let repo = cfg
111 .get("repo_id")
112 .and_then(|s| s.as_str())
113 .map(|s| s.to_string())
114 .unwrap_or_else(|| default_repo_id.clone());
115 (actor, repo)
116 } else {
117 (whoami::username(), default_repo_id.clone())
118 }
119 } else {
120 (whoami::username(), default_repo_id)
121 };
122
123 let r2 = match R2Config::from_env() {
125 Ok(config) => {
126 println!(
127 "{} R2 Bucket: {}",
128 "✓".green(),
129 config.bucket_name.bright_white()
130 );
131 match R2Storage::new(config) {
132 Ok(storage) => {
133 println!("{} R2 Storage enabled", "✓".green());
134 Some(Arc::new(storage))
135 }
136 Err(e) => {
137 println!("{} R2 Storage failed: {}", "⚠".yellow(), e);
138 None
139 }
140 }
141 }
142 Err(_) => {
143 println!(
144 "{} R2 not configured (set R2_* in .env for blob storage)",
145 "ℹ".blue()
146 );
147 None
148 }
149 };
150
151 let state = AppState {
152 oplog,
153 db,
154 sync: SyncManager::new(),
155 actor_id,
156 repo_id,
157 seen: Arc::new(DashSet::new()),
158 r2,
159 auth: Arc::new(AuthManager::new()),
160 };
161
162 let app = Router::new()
163 .route("/", get(serve_index))
165 .route("/static/styles.css", get(serve_styles))
166 .route("/static/app.js", get(serve_app_js))
167 .route("/health", get(health_check))
169 .route("/ops", get(get_ops))
170 .route("/ws", get(ws_handler))
171 .route("/api/v1/auth/login", post(login))
173 .route("/api/v1/auth/validate", get(validate_session))
174 .route("/api/v1/auth/me", get(get_current_user))
175 .route("/api/v1/auth/change-password", post(change_password))
176 .route("/api/v1/users", get(list_users))
178 .route("/api/v1/users", post(create_user))
179 .route("/api/v1/users/{username}", delete(delete_user))
180 .route("/api/v1/files", get(list_files))
182 .route("/api/v1/files/{*path}", get(get_file_content))
183 .route("/api/v1/blobs", post(upload_blob))
185 .route("/api/v1/blobs/{hash}", get(download_blob))
186 .route("/api/v1/blobs/{hash}", delete(delete_blob_handler))
187 .route("/api/v1/blobs/{hash}/exists", get(check_blob_exists))
188 .route("/api/v1/blobs/batch", post(batch_upload))
189 .layer(
191 CorsLayer::new()
192 .allow_origin(Any)
193 .allow_methods(Any)
194 .allow_headers(Any),
195 )
196 .with_state(state);
197
198 let addr = format!("0.0.0.0:{}", port);
199 println!(
200 "{} Server running at {}",
201 "✓".green(),
202 format!("http://localhost:{}", port).bright_blue()
203 );
204
205 let listener = tokio::net::TcpListener::bind(&addr).await?;
206 axum::serve(listener, app).await?;
207
208 Ok(())
209}
210
211async fn ws_handler(
212 State(state): State<AppState>,
213 ws: WebSocketUpgrade,
214) -> impl axum::response::IntoResponse {
215 ws.on_upgrade(move |socket| handle_ws(state, socket))
216}
217
218async fn handle_ws(state: AppState, socket: WebSocket) {
219 let (mut sender, mut receiver) = socket.split();
220
221 let handshake = SyncMessage::handshake(state.actor_id.clone(), state.repo_id.clone());
223 if let Ok(text) = serde_json::to_string(&handshake) {
224 let _ = sender.send(Message::Text(text.into())).await;
225 }
226
227 let mut rx = state.sync.subscribe();
229 let send_task = tokio::spawn(async move {
230 while let Ok(op_arc) = rx.recv().await {
231 if let Ok(text) = serde_json::to_string(&SyncMessage::operation((*op_arc).clone())) {
233 if sender.send(Message::Text(text.into())).await.is_err() {
234 break;
235 }
236 }
237 }
238 });
239
240 let state_recv = state.clone();
242 let recv_task = tokio::spawn(async move {
243 let oplog = state_recv.oplog.clone();
244 while let Some(msg) = receiver.next().await {
245 match msg {
246 Ok(Message::Text(text)) => {
247 let text: String = text.to_string();
248 if let Ok(msg) = serde_json::from_str::<SyncMessage>(&text) {
249 match msg {
250 SyncMessage::Handshake { actor_id, repo_id } => {
251 println!(
252 "{} Peer handshake: actor={} repo={}",
253 "↔".bright_blue(),
254 actor_id.bright_yellow(),
255 repo_id.bright_white()
256 );
257 }
258 SyncMessage::Operation { operation: op } => {
259 if insert_seen(&state_recv.seen, op.id) {
260 if let Some(lamport) = op.lamport() {
261 GLOBAL_CLOCK.observe(lamport);
262 }
263 let _ = oplog.append(op.clone());
264 let _ = state_recv.sync.publish(Arc::new(op));
265 }
266 }
267 }
268 } else if let Ok(op) = serde_json::from_str::<Operation>(&text) {
269 if insert_seen(&state_recv.seen, op.id) {
270 if let Some(lamport) = op.lamport() {
271 GLOBAL_CLOCK.observe(lamport);
272 }
273 let _ = oplog.append(op.clone());
274 let _ = state_recv.sync.publish(Arc::new(op));
275 }
276 }
277 }
278 Ok(Message::Binary(bin)) => {
279 if let Ok(op) = serde_cbor::from_slice::<Operation>(&bin) {
280 if insert_seen(&state_recv.seen, op.id) {
281 if let Some(lamport) = op.lamport() {
282 GLOBAL_CLOCK.observe(lamport);
283 }
284 let _ = oplog.append(op.clone());
285 let _ = state_recv.sync.publish(Arc::new(op));
286 }
287 }
288 }
289 Ok(Message::Close(_)) | Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {}
290 Err(_) => break,
291 }
292 }
293 });
294
295 let _ = tokio::join!(send_task, recv_task);
296}
297
298#[derive(Deserialize)]
299struct OpsQuery {
300 file: Option<String>,
301 limit: Option<usize>,
302}
303
304async fn get_ops(
305 State(state): State<AppState>,
306 Query(query): Query<OpsQuery>,
307) -> Result<Json<Vec<Operation>>, axum::http::StatusCode> {
308 let limit = query.limit.unwrap_or(50);
309 let result = if let Some(file) = query.file.as_deref() {
310 let p = std::path::PathBuf::from(file);
311 state.db.get_operations(Some(&p), limit)
312 } else {
313 state.db.get_operations(None, limit)
314 };
315
316 match result {
317 Ok(ops) => Ok(Json(ops)),
318 Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
319 }
320}
321
322const SEEN_LIMIT: usize = 10_000;
323
324fn insert_seen(cache: &DashSet<Uuid>, id: Uuid) -> bool {
325 let inserted = cache.insert(id);
326 if inserted {
327 enforce_seen_limit(cache);
328 }
329 inserted
330}
331
332fn enforce_seen_limit(cache: &DashSet<Uuid>) {
333 while cache.len() > SEEN_LIMIT {
334 if let Some(entry) = cache.iter().next() {
335 let key = *entry.key();
336 drop(entry);
337 cache.remove(&key);
338 } else {
339 break;
340 }
341 }
342}
343
344async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
348 Json(serde_json::json!({
349 "status": "healthy",
350 "service": "forge-api",
351 "version": env!("CARGO_PKG_VERSION"),
352 "r2_enabled": state.r2.is_some(),
353 }))
354}
355
356async fn upload_blob(
358 State(state): State<AppState>,
359 Json(req): Json<UploadBlobRequest>,
360) -> Result<Json<UploadBlobResponse>, ApiError> {
361 let r2 = state
362 .r2
363 .as_ref()
364 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
365
366 use base64::Engine;
368 let content = base64::engine::general_purpose::STANDARD
369 .decode(&req.content)
370 .map_err(|e| ApiError::BadRequest(format!("Invalid base64: {}", e)))?;
371
372 let blob = Blob::from_content(&req.path, content);
373 let hash = blob.hash().to_string();
374 let size = blob.metadata.size;
375
376 let key = r2.upload_blob(&blob).await?;
378
379 Ok(Json(UploadBlobResponse { hash, key, size }))
380}
381
382async fn download_blob(
384 State(state): State<AppState>,
385 AxumPath(hash): AxumPath<String>,
386) -> Result<Response, ApiError> {
387 let r2 = state
388 .r2
389 .as_ref()
390 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
391
392 let blob = r2
393 .download_blob(&hash)
394 .await
395 .map_err(|_| ApiError::NotFound(format!("Blob not found: {}", hash)))?;
396
397 Ok((
399 StatusCode::OK,
400 [
401 ("Content-Type", blob.metadata.mime_type.clone()),
402 ("X-Blob-Hash", hash),
403 ("X-Blob-Size", blob.metadata.size.to_string()),
404 ],
405 blob.content,
406 )
407 .into_response())
408}
409
410async fn delete_blob_handler(
412 State(state): State<AppState>,
413 AxumPath(hash): AxumPath<String>,
414) -> Result<StatusCode, ApiError> {
415 let r2 = state
416 .r2
417 .as_ref()
418 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
419
420 r2.delete_blob(&hash).await?;
421 Ok(StatusCode::NO_CONTENT)
422}
423
424async fn check_blob_exists(
426 State(state): State<AppState>,
427 AxumPath(hash): AxumPath<String>,
428) -> Result<Json<serde_json::Value>, ApiError> {
429 let r2 = state
430 .r2
431 .as_ref()
432 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
433
434 let exists = r2.blob_exists(&hash).await?;
435
436 Ok(Json(serde_json::json!({
437 "exists": exists,
438 "hash": hash,
439 })))
440}
441
442#[derive(Debug, Deserialize)]
444pub struct BatchUploadRequest {
445 pub blobs: Vec<UploadBlobRequest>,
446}
447
448#[derive(Debug, Serialize)]
450pub struct BatchUploadResponse {
451 pub uploaded: Vec<UploadBlobResponse>,
452 pub failed: Vec<String>,
453}
454
455async fn batch_upload(
457 State(state): State<AppState>,
458 Json(req): Json<BatchUploadRequest>,
459) -> Result<Json<BatchUploadResponse>, ApiError> {
460 let r2 = state
461 .r2
462 .as_ref()
463 .ok_or_else(|| ApiError::BadRequest("R2 storage not configured".to_string()))?;
464
465 let mut uploaded = Vec::new();
466 let mut failed = Vec::new();
467
468 use base64::Engine;
469 for blob_req in req.blobs {
470 match base64::engine::general_purpose::STANDARD.decode(&blob_req.content) {
471 Ok(content) => {
472 let blob = Blob::from_content(&blob_req.path, content);
473 let hash = blob.hash().to_string();
474 let size = blob.metadata.size;
475
476 match r2.upload_blob(&blob).await {
477 Ok(key) => {
478 uploaded.push(UploadBlobResponse { hash, key, size });
479 }
480 Err(e) => {
481 failed.push(format!("{}: {}", blob_req.path, e));
482 }
483 }
484 }
485 Err(e) => {
486 failed.push(format!("{}: Invalid base64: {}", blob_req.path, e));
487 }
488 }
489 }
490
491 Ok(Json(BatchUploadResponse { uploaded, failed }))
492}
493
494const INDEX_HTML: &str = include_str!("web_ui/index.html");
497const STYLES_CSS: &str = include_str!("web_ui/styles.css");
498const APP_JS: &str = include_str!("web_ui/app.js");
499
500async fn serve_index() -> impl IntoResponse {
501 (
502 StatusCode::OK,
503 [("Content-Type", "text/html; charset=utf-8")],
504 INDEX_HTML,
505 )
506}
507
508async fn serve_styles() -> impl IntoResponse {
509 (
510 StatusCode::OK,
511 [("Content-Type", "text/css; charset=utf-8")],
512 STYLES_CSS,
513 )
514}
515
516async fn serve_app_js() -> impl IntoResponse {
517 (
518 StatusCode::OK,
519 [("Content-Type", "application/javascript; charset=utf-8")],
520 APP_JS,
521 )
522}
523
524async fn login(
527 State(state): State<AppState>,
528 Json(req): Json<LoginRequest>,
529) -> Result<Json<LoginResponse>, ApiError> {
530 let session = state.auth.login(&req.username, &req.password)?;
531
532 Ok(Json(LoginResponse {
533 token: session.token,
534 username: session.username,
535 role: session.role,
536 expires_at: session.expires_at,
537 }))
538}
539
540async fn validate_session(
541 State(state): State<AppState>,
542 headers: axum::http::HeaderMap,
543) -> Result<StatusCode, ApiError> {
544 let token = extract_token(&headers)?;
545 state.auth.validate_token(&token)?;
546 Ok(StatusCode::OK)
547}
548
549async fn get_current_user(
550 State(_state): State<AppState>,
551 _headers: axum::http::HeaderMap,
552) -> Result<Json<serde_json::Value>, ApiError> {
553 Ok(Json(serde_json::json!({
555 "username": "guest",
556 "role": "admin",
557 "user_id": "guest-id",
558 })))
559}
560
561async fn change_password(
562 State(state): State<AppState>,
563 headers: axum::http::HeaderMap,
564 Json(req): Json<ChangePasswordRequest>,
565) -> Result<StatusCode, ApiError> {
566 let token = extract_token(&headers)?;
567 let session = state.auth.validate_token(&token)?;
568
569 state.auth.update_password(&session.username, &req.old_password, &req.new_password)?;
570 Ok(StatusCode::OK)
571}
572
573fn extract_token(headers: &axum::http::HeaderMap) -> Result<String, ApiError> {
574 let auth_header = headers
575 .get("Authorization")
576 .ok_or_else(|| ApiError::BadRequest("Missing Authorization header".to_string()))?
577 .to_str()
578 .map_err(|_| ApiError::BadRequest("Invalid Authorization header".to_string()))?;
579
580 auth_header
581 .strip_prefix("Bearer ")
582 .map(|s| s.to_string())
583 .ok_or_else(|| ApiError::BadRequest("Invalid Authorization format".to_string()))
584}
585
586async fn list_users(
589 State(state): State<AppState>,
590 _headers: axum::http::HeaderMap,
591) -> Result<Json<Vec<serde_json::Value>>, ApiError> {
592 let users = state.auth.list_users();
601 let user_list: Vec<_> = users.into_iter().map(|u| {
602 serde_json::json!({
603 "username": u.username,
604 "email": u.email,
605 "role": u.role,
606 "created_at": u.created_at,
607 })
608 }).collect();
609
610 Ok(Json(user_list))
611}
612
613async fn create_user(
614 State(state): State<AppState>,
615 headers: axum::http::HeaderMap,
616 Json(req): Json<CreateUserRequest>,
617) -> Result<StatusCode, ApiError> {
618 let token = extract_token(&headers)?;
619 let session = state.auth.validate_token(&token)?;
620
621 if session.role != crate::server::authentication::Role::Admin {
623 return Err(ApiError::BadRequest("Insufficient permissions".to_string()));
624 }
625
626 state.auth.register(req.username, &req.password, req.role)?;
627 Ok(StatusCode::CREATED)
628}
629
630async fn delete_user(
631 State(state): State<AppState>,
632 headers: axum::http::HeaderMap,
633 AxumPath(username): AxumPath<String>,
634) -> Result<StatusCode, ApiError> {
635 let token = extract_token(&headers)?;
636 let session = state.auth.validate_token(&token)?;
637
638 if session.role != crate::server::authentication::Role::Admin {
640 return Err(ApiError::BadRequest("Insufficient permissions".to_string()));
641 }
642
643 if username == session.username {
645 return Err(ApiError::BadRequest("Cannot delete your own account".to_string()));
646 }
647
648 state.auth.delete_user(&username)?;
649 Ok(StatusCode::NO_CONTENT)
650}
651
652#[derive(Serialize)]
655struct FileInfo {
656 name: String,
657 path: String,
658 is_dir: bool,
659 size: Option<u64>,
660}
661
662async fn list_files(
663 State(_state): State<AppState>,
664 _headers: axum::http::HeaderMap,
665) -> Result<Json<Vec<FileInfo>>, ApiError> {
666 let current_dir = std::env::current_dir()
670 .map_err(|e| ApiError::Internal(e.into()))?;
671
672 let mut files = Vec::new();
673
674 if let Ok(entries) = std::fs::read_dir(¤t_dir) {
675 for entry in entries.flatten() {
676 let path = entry.path();
677 let name = path.file_name()
678 .and_then(|n| n.to_str())
679 .unwrap_or("Unknown")
680 .to_string();
681
682 if name.starts_with('.') {
684 continue;
685 }
686
687 let is_dir = path.is_dir();
688 let size = if is_dir {
689 None
690 } else {
691 std::fs::metadata(&path).ok().map(|m| m.len())
692 };
693
694 files.push(FileInfo {
695 name,
696 path: path.strip_prefix(¤t_dir)
697 .ok()
698 .and_then(|p| p.to_str())
699 .unwrap_or("")
700 .to_string(),
701 is_dir,
702 size,
703 });
704 }
705 }
706
707 files.sort_by(|a, b| {
709 match (a.is_dir, b.is_dir) {
710 (true, false) => std::cmp::Ordering::Less,
711 (false, true) => std::cmp::Ordering::Greater,
712 _ => a.name.cmp(&b.name),
713 }
714 });
715
716 Ok(Json(files))
717}
718
719#[derive(Serialize)]
720struct FileContentResponse {
721 content: String,
722 path: String,
723}
724
725async fn get_file_content(
726 State(_state): State<AppState>,
727 _headers: axum::http::HeaderMap,
728 AxumPath(path): AxumPath<String>,
729) -> Result<Json<FileContentResponse>, ApiError> {
730 println!("DEBUG: Requested file path: {}", path);
733
734 let current_dir = std::env::current_dir()
735 .map_err(|e| ApiError::Internal(e.into()))?;
736
737 let file_path = current_dir.join(&path);
738 println!("DEBUG: Resolved file path: {:?}", file_path);
739
740 if !file_path.starts_with(¤t_dir) {
742 return Err(ApiError::BadRequest("Invalid file path".to_string()));
743 }
744
745 if !file_path.exists() {
747 return Err(ApiError::NotFound(format!("File not found: {}", path)));
748 }
749
750 if file_path.is_dir() {
751 return Err(ApiError::BadRequest("Path is a directory".to_string()));
752 }
753
754 let content = std::fs::read_to_string(&file_path)
756 .map_err(|e| ApiError::Internal(e.into()))?;
757
758 Ok(Json(FileContentResponse {
759 content,
760 path,
761 }))
762}