gosh_lan_transfer/
server.rs

1// SPDX-License-Identifier: MIT
2// gosh-lan-transfer - HTTP server for receiving file transfers
3//
4// The server binds to 0.0.0.0 and :: to accept connections from any interface.
5// This ensures it works reliably on LAN, Tailscale, and VPNs.
6
7use axum::{
8    body::Body,
9    extract::{ConnectInfo, Query, State},
10    http::StatusCode,
11    response::{IntoResponse, Sse},
12    routing::{get, post},
13    Json, Router,
14};
15use futures_util::StreamExt;
16use serde::Deserialize;
17use std::{
18    collections::{HashMap, HashSet},
19    io::ErrorKind,
20    net::SocketAddr,
21    path::{Path, PathBuf},
22    sync::Arc,
23};
24use tokio::{
25    fs::{File, OpenOptions},
26    io::AsyncWriteExt,
27    sync::{broadcast, oneshot, RwLock},
28};
29use tokio_stream::wrappers::BroadcastStream;
30use uuid::Uuid;
31
32use crate::config::EngineConfig;
33use crate::error::{EngineError, EngineResult};
34use crate::events::EventHandler;
35use crate::history::HistoryPersistence;
36use crate::protocol::{
37    EngineEvent, PendingTransfer, TransferApprovalStatus, TransferDecision, TransferProgress,
38    TransferRequest, TransferResponse,
39};
40use crate::types::{TransferDirection, TransferRecord, TransferStatus};
41
42/// Server state shared across handlers
43pub struct ServerState {
44    /// Engine configuration
45    pub config: RwLock<EngineConfig>,
46    /// Pending transfers awaiting user approval
47    pub pending_transfers: RwLock<HashMap<String, PendingTransfer>>,
48    /// Approved transfer tokens (transfer_id -> token)
49    pub approved_tokens: RwLock<HashMap<String, String>>,
50    /// Rejected transfers (transfer_id -> reason)
51    pub rejected_transfers: RwLock<HashMap<String, String>>,
52    /// Cancelled transfers (transfer_id)
53    pub cancelled_transfers: RwLock<HashSet<String>>,
54    /// Received files per transfer (transfer_id -> set of file_ids)
55    pub received_files: RwLock<HashMap<String, HashSet<String>>>,
56    /// Bytes received per transfer (transfer_id -> total bytes received so far)
57    pub transfer_bytes: RwLock<HashMap<String, u64>>,
58    /// Transfer start times (transfer_id -> start instant)
59    pub transfer_start_times: RwLock<HashMap<String, std::time::Instant>>,
60    /// Channel for internal SSE events
61    internal_event_tx: broadcast::Sender<InternalEvent>,
62    /// Event handler for engine events
63    event_handler: Arc<dyn EventHandler>,
64    /// Optional history persistence
65    history: Option<Arc<dyn HistoryPersistence>>,
66}
67
68/// Internal events for SSE streaming (serializable)
69#[derive(Debug, Clone, serde::Serialize)]
70#[serde(tag = "type", rename_all = "camelCase")]
71enum InternalEvent {
72    TransferRequest {
73        transfer: PendingTransfer,
74    },
75    Progress {
76        progress: TransferProgress,
77    },
78    TransferComplete {
79        #[serde(rename = "transferId")]
80        transfer_id: String,
81    },
82    TransferFailed {
83        #[serde(rename = "transferId")]
84        transfer_id: String,
85        error: String,
86    },
87    PortChanged {
88        #[serde(rename = "oldPort")]
89        old_port: u16,
90        #[serde(rename = "newPort")]
91        new_port: u16,
92    },
93}
94
95impl ServerState {
96    /// Create a new server state with the given configuration and event handler
97    pub fn new(config: EngineConfig, event_handler: Arc<dyn EventHandler>) -> Self {
98        let (internal_event_tx, _) = broadcast::channel(100);
99
100        Self {
101            config: RwLock::new(config),
102            pending_transfers: RwLock::new(HashMap::new()),
103            approved_tokens: RwLock::new(HashMap::new()),
104            rejected_transfers: RwLock::new(HashMap::new()),
105            cancelled_transfers: RwLock::new(HashSet::new()),
106            received_files: RwLock::new(HashMap::new()),
107            transfer_bytes: RwLock::new(HashMap::new()),
108            transfer_start_times: RwLock::new(HashMap::new()),
109            internal_event_tx,
110            event_handler,
111            history: None,
112        }
113    }
114
115    /// Create a new server state with history persistence
116    pub fn new_with_history(
117        config: EngineConfig,
118        event_handler: Arc<dyn EventHandler>,
119        history: Arc<dyn HistoryPersistence>,
120    ) -> Self {
121        let (internal_event_tx, _) = broadcast::channel(100);
122
123        Self {
124            config: RwLock::new(config),
125            pending_transfers: RwLock::new(HashMap::new()),
126            approved_tokens: RwLock::new(HashMap::new()),
127            rejected_transfers: RwLock::new(HashMap::new()),
128            cancelled_transfers: RwLock::new(HashSet::new()),
129            received_files: RwLock::new(HashMap::new()),
130            transfer_bytes: RwLock::new(HashMap::new()),
131            transfer_start_times: RwLock::new(HashMap::new()),
132            internal_event_tx,
133            event_handler,
134            history: Some(history),
135        }
136    }
137
138    /// Emit an event to both the event handler and internal SSE channel
139    fn emit_event(&self, event: EngineEvent) {
140        // Send to the event handler
141        self.event_handler.on_event(event.clone());
142
143        // Also send to internal SSE channel
144        let internal = match event {
145            EngineEvent::TransferRequest(transfer) => InternalEvent::TransferRequest { transfer },
146            EngineEvent::TransferProgress(progress) => InternalEvent::Progress { progress },
147            EngineEvent::TransferComplete { transfer_id } => {
148                InternalEvent::TransferComplete { transfer_id }
149            }
150            EngineEvent::TransferFailed { transfer_id, error } => {
151                InternalEvent::TransferFailed { transfer_id, error }
152            }
153            EngineEvent::PortChanged { old_port, new_port } => {
154                InternalEvent::PortChanged { old_port, new_port }
155            }
156            _ => return, // Don't send server start/stop events to SSE
157        };
158        let _ = self.internal_event_tx.send(internal);
159    }
160
161    /// Record a completed or failed receive transfer to history
162    fn record_receive_history(
163        &self,
164        transfer: &PendingTransfer,
165        status: TransferStatus,
166        bytes_transferred: u64,
167        error: Option<String>,
168    ) {
169        if let Some(ref history) = self.history {
170            let record = TransferRecord {
171                id: transfer.id.clone(),
172                direction: TransferDirection::Received,
173                status,
174                peer_address: transfer.source_ip.clone(),
175                files: transfer.files.clone(),
176                total_size: transfer.total_size,
177                bytes_transferred,
178                started_at: transfer.received_at,
179                completed_at: Some(chrono::Utc::now()),
180                error,
181            };
182            if let Err(e) = history.add(record) {
183                tracing::warn!("Failed to record transfer history: {}", e);
184            }
185        }
186    }
187
188    /// Accept a pending transfer
189    pub async fn accept_transfer(&self, transfer_id: &str) -> EngineResult<String> {
190        // Check if transfer exists
191        let pending = self.pending_transfers.read().await;
192        if !pending.contains_key(transfer_id) {
193            return Err(EngineError::TransferNotFound(transfer_id.to_string()));
194        }
195        drop(pending);
196
197        // Generate token and approve
198        let token = Uuid::new_v4().to_string();
199        self.approved_tokens
200            .write()
201            .await
202            .insert(transfer_id.to_string(), token.clone());
203        self.rejected_transfers.write().await.remove(transfer_id);
204
205        Ok(token)
206    }
207
208    /// Reject a pending transfer
209    pub async fn reject_transfer(&self, transfer_id: &str) -> EngineResult<()> {
210        // Check if transfer exists
211        let pending = self.pending_transfers.read().await;
212        if !pending.contains_key(transfer_id) {
213            return Err(EngineError::TransferNotFound(transfer_id.to_string()));
214        }
215        drop(pending);
216
217        // Mark as rejected
218        self.rejected_transfers
219            .write()
220            .await
221            .insert(transfer_id.to_string(), "Rejected by user".to_string());
222        self.approved_tokens.write().await.remove(transfer_id);
223
224        Ok(())
225    }
226
227    /// Cancel an in-progress transfer
228    ///
229    /// This will cause subsequent chunk uploads to be rejected.
230    pub async fn cancel_transfer(&self, transfer_id: &str) -> EngineResult<()> {
231        // Check if transfer exists (either pending or approved) and get transfer info for history
232        let pending = self.pending_transfers.read().await;
233        let approved = self.approved_tokens.read().await;
234        if !pending.contains_key(transfer_id) && !approved.contains_key(transfer_id) {
235            return Err(EngineError::TransferNotFound(transfer_id.to_string()));
236        }
237        let transfer_info = pending.get(transfer_id).cloned();
238        drop(pending);
239        drop(approved);
240
241        // Get bytes transferred so far
242        let bytes_transferred = *self
243            .transfer_bytes
244            .read()
245            .await
246            .get(transfer_id)
247            .unwrap_or(&0);
248
249        // Mark as cancelled
250        self.cancelled_transfers
251            .write()
252            .await
253            .insert(transfer_id.to_string());
254
255        // Clean up the transfer state
256        self.pending_transfers.write().await.remove(transfer_id);
257        self.approved_tokens.write().await.remove(transfer_id);
258        self.received_files.write().await.remove(transfer_id);
259        self.transfer_bytes.write().await.remove(transfer_id);
260        self.transfer_start_times.write().await.remove(transfer_id);
261
262        // Emit cancellation event
263        self.emit_event(EngineEvent::TransferFailed {
264            transfer_id: transfer_id.to_string(),
265            error: "Transfer cancelled".to_string(),
266        });
267
268        // Record to history if we have transfer info
269        if let Some(transfer) = transfer_info {
270            self.record_receive_history(
271                &transfer,
272                TransferStatus::Failed,
273                bytes_transferred,
274                Some("Transfer cancelled".to_string()),
275            );
276        }
277
278        Ok(())
279    }
280
281    /// Check if a transfer has been cancelled
282    pub async fn is_transfer_cancelled(&self, transfer_id: &str) -> bool {
283        self.cancelled_transfers.read().await.contains(transfer_id)
284    }
285
286    /// Get all pending transfers
287    pub async fn get_pending_transfers(&self) -> Vec<PendingTransfer> {
288        self.pending_transfers
289            .read()
290            .await
291            .values()
292            .cloned()
293            .collect()
294    }
295
296    /// Update the configuration
297    pub async fn update_config(&self, config: EngineConfig) {
298        *self.config.write().await = config;
299    }
300}
301
302/// Handle for controlling a running server
303pub struct ServerHandle {
304    shutdown_tx: oneshot::Sender<()>,
305}
306
307impl ServerHandle {
308    /// Shutdown the server gracefully
309    pub fn shutdown(self) {
310        let _ = self.shutdown_tx.send(());
311    }
312}
313
314/// Query parameters for file chunk uploads
315#[derive(Debug, Deserialize)]
316pub struct ChunkParams {
317    transfer_id: String,
318    file_id: String,
319    token: String,
320}
321
322#[derive(Debug, Deserialize)]
323pub struct TransferStatusParams {
324    transfer_id: String,
325}
326
327/// Create the Axum router for the file transfer server
328pub fn create_router(state: Arc<ServerState>) -> Router {
329    Router::new()
330        // Health check - useful for testing connectivity
331        .route("/health", get(health_handler))
332        // Server info - returns device name and version
333        .route("/info", get(info_handler))
334        // Transfer request - initiate a new transfer
335        .route("/transfer", post(transfer_request_handler))
336        // Transfer approval status
337        .route("/transfer/status", get(transfer_status_handler))
338        // Chunk upload - stream file data
339        .route("/chunk", post(chunk_upload_handler))
340        // SSE endpoint for transfer progress
341        .route("/events", get(events_handler))
342        .with_state(state)
343}
344
345fn sanitize_file_name(name: &str, fallback: &str) -> String {
346    let trimmed = name.trim();
347    let file_name = Path::new(trimmed)
348        .file_name()
349        .and_then(|n| n.to_str())
350        .map(|n| n.trim())
351        .filter(|n| !n.is_empty() && *n != "." && *n != "..");
352
353    file_name
354        .map(|n| n.to_string())
355        .unwrap_or_else(|| fallback.to_string())
356}
357
358fn split_file_name(name: &str) -> (&str, &str) {
359    if let Some((stem, ext)) = name.rsplit_once('.') {
360        if !stem.is_empty() {
361            return (stem, ext);
362        }
363    }
364    (name, "")
365}
366
367/// Sanitize a relative path to prevent directory traversal attacks
368fn sanitize_relative_path(path: &str) -> PathBuf {
369    let mut result = PathBuf::new();
370
371    for component in Path::new(path).components() {
372        // Only allow normal path components (no . or ..)
373        // Skip root, parent (..), current (.), and prefix components
374        if let std::path::Component::Normal(name) = component {
375            if let Some(name_str) = name.to_str() {
376                let safe_name = sanitize_file_name(name_str, "file");
377                result.push(safe_name);
378            }
379        }
380    }
381
382    result
383}
384
385async fn open_unique_file(
386    download_dir: &Path,
387    base_name: &str,
388) -> Result<(PathBuf, File), std::io::Error> {
389    let (stem, ext) = split_file_name(base_name);
390
391    for index in 0..1000 {
392        let candidate = if index == 0 {
393            base_name.to_string()
394        } else if ext.is_empty() {
395            format!("{} ({})", stem, index)
396        } else {
397            format!("{} ({}).{}", stem, index, ext)
398        };
399
400        let path = download_dir.join(&candidate);
401        match OpenOptions::new()
402            .write(true)
403            .create_new(true)
404            .open(&path)
405            .await
406        {
407            Ok(file) => return Ok((path, file)),
408            Err(e) if e.kind() == ErrorKind::AlreadyExists => continue,
409            Err(e) => return Err(e),
410        }
411    }
412
413    Err(std::io::Error::new(
414        ErrorKind::AlreadyExists,
415        "Too many filename conflicts",
416    ))
417}
418
419/// Health check endpoint
420async fn health_handler() -> impl IntoResponse {
421    Json(serde_json::json!({
422        "status": "ok",
423        "app": "gosh-lan-transfer",
424        "version": env!("CARGO_PKG_VERSION")
425    }))
426}
427
428/// Server info endpoint
429async fn info_handler(State(state): State<Arc<ServerState>>) -> impl IntoResponse {
430    let config = state.config.read().await;
431
432    Json(serde_json::json!({
433        "name": config.device_name,
434        "version": env!("CARGO_PKG_VERSION"),
435        "app": "gosh-lan-transfer"
436    }))
437}
438
439/// Handle incoming transfer request
440async fn transfer_request_handler(
441    State(state): State<Arc<ServerState>>,
442    ConnectInfo(addr): ConnectInfo<SocketAddr>,
443    Json(request): Json<TransferRequest>,
444) -> impl IntoResponse {
445    let computed_total: u64 = request.files.iter().map(|f| f.size).sum();
446
447    if computed_total != request.total_size {
448        tracing::warn!(
449            "Transfer total mismatch for {}: client {}, computed {}",
450            request.transfer_id,
451            request.total_size,
452            computed_total
453        );
454    }
455
456    tracing::info!(
457        "Received transfer request: {} files, {} bytes",
458        request.files.len(),
459        computed_total
460    );
461
462    let source_ip = addr.ip().to_string();
463
464    // Create a pending transfer record
465    let pending = PendingTransfer {
466        id: request.transfer_id.clone(),
467        source_ip: source_ip.clone(),
468        sender_name: request.sender_name.clone(),
469        files: request.files.clone(),
470        total_size: computed_total,
471        received_at: chrono::Utc::now(),
472    };
473
474    // Check if sender is in trusted hosts
475    let config = state.config.read().await;
476    let is_trusted = config.trusted_hosts.iter().any(|host| host == &source_ip);
477
478    state
479        .pending_transfers
480        .write()
481        .await
482        .insert(request.transfer_id.clone(), pending.clone());
483    state
484        .rejected_transfers
485        .write()
486        .await
487        .remove(&request.transfer_id);
488
489    if is_trusted {
490        // Auto-accept from trusted hosts
491        let token = Uuid::new_v4().to_string();
492        state
493            .approved_tokens
494            .write()
495            .await
496            .insert(request.transfer_id.clone(), token.clone());
497
498        state
499            .rejected_transfers
500            .write()
501            .await
502            .remove(&request.transfer_id);
503
504        return Json(TransferResponse {
505            accepted: true,
506            message: Some("Auto-accepted from trusted host".to_string()),
507            token: Some(token),
508        });
509    }
510
511    // Notify about the incoming request via event handler
512    state.emit_event(EngineEvent::TransferRequest(pending));
513
514    // Return pending status - UI will call accept/reject
515    Json(TransferResponse {
516        accepted: false,
517        message: Some("Awaiting user approval".to_string()),
518        token: None,
519    })
520}
521
522/// Check transfer approval status
523async fn transfer_status_handler(
524    State(state): State<Arc<ServerState>>,
525    Query(params): Query<TransferStatusParams>,
526) -> impl IntoResponse {
527    let approved = state.approved_tokens.read().await;
528    if let Some(token) = approved.get(&params.transfer_id) {
529        return Json(TransferApprovalStatus {
530            status: TransferDecision::Accepted,
531            token: Some(token.clone()),
532            message: Some("Accepted".to_string()),
533        });
534    }
535    drop(approved);
536
537    let rejected = state.rejected_transfers.read().await;
538    if let Some(reason) = rejected.get(&params.transfer_id) {
539        return Json(TransferApprovalStatus {
540            status: TransferDecision::Rejected,
541            token: None,
542            message: Some(reason.clone()),
543        });
544    }
545    drop(rejected);
546
547    let pending = state.pending_transfers.read().await;
548    if pending.contains_key(&params.transfer_id) {
549        return Json(TransferApprovalStatus {
550            status: TransferDecision::Pending,
551            token: None,
552            message: Some("Awaiting user approval".to_string()),
553        });
554    }
555
556    Json(TransferApprovalStatus {
557        status: TransferDecision::NotFound,
558        token: None,
559        message: Some("Transfer not found".to_string()),
560    })
561}
562
563/// Handle file chunk upload
564async fn chunk_upload_handler(
565    State(state): State<Arc<ServerState>>,
566    Query(params): Query<ChunkParams>,
567    body: Body,
568) -> impl IntoResponse {
569    // Check if transfer was cancelled
570    if state.is_transfer_cancelled(&params.transfer_id).await {
571        return (
572            StatusCode::GONE,
573            Json(serde_json::json!({"error": "Transfer was cancelled"})),
574        );
575    }
576
577    // Verify the token
578    let approved = state.approved_tokens.read().await;
579    let expected_token = approved.get(&params.transfer_id);
580
581    if expected_token != Some(&params.token) {
582        return (
583            StatusCode::UNAUTHORIZED,
584            Json(serde_json::json!({"error": "Invalid or expired token"})),
585        );
586    }
587    drop(approved);
588
589    // Get download directory
590    let download_dir = state.config.read().await.download_dir.clone();
591    if let Err(e) = tokio::fs::create_dir_all(&download_dir).await {
592        tracing::error!("Failed to create download directory: {}", e);
593        return (
594            StatusCode::INTERNAL_SERVER_ERROR,
595            Json(
596                serde_json::json!({"error": format!("Failed to create download directory: {}", e)}),
597            ),
598        );
599    }
600
601    // Find the file info from pending transfers
602    let pending = state.pending_transfers.read().await;
603    let transfer = match pending.get(&params.transfer_id) {
604        Some(t) => t.clone(),
605        None => {
606            return (
607                StatusCode::NOT_FOUND,
608                Json(serde_json::json!({"error": "Transfer not found"})),
609            );
610        }
611    };
612    drop(pending);
613
614    let file_info = match transfer.files.iter().find(|f| f.id == params.file_id) {
615        Some(f) => f.clone(),
616        None => {
617            return (
618                StatusCode::NOT_FOUND,
619                Json(serde_json::json!({"error": "File not found in transfer"})),
620            );
621        }
622    };
623
624    // Determine the target path based on relative_path
625    let (file_path, mut file) = if let Some(ref relative_path) = file_info.relative_path {
626        // Sanitize the relative path to prevent directory traversal
627        let sanitized_relative = sanitize_relative_path(relative_path);
628        let target_path = download_dir.join(&sanitized_relative);
629
630        // Create parent directories if needed
631        if let Some(parent) = target_path.parent() {
632            if let Err(e) = tokio::fs::create_dir_all(parent).await {
633                tracing::error!("Failed to create directory structure: {}", e);
634                return (
635                    StatusCode::INTERNAL_SERVER_ERROR,
636                    Json(
637                        serde_json::json!({"error": format!("Failed to create directories: {}", e)}),
638                    ),
639                );
640            }
641        }
642
643        // Open or create with unique name within the subdirectory
644        let base_name = target_path
645            .file_name()
646            .and_then(|n| n.to_str())
647            .unwrap_or(&file_info.id);
648        let parent_dir = target_path.parent().unwrap_or(&download_dir);
649
650        match open_unique_file(parent_dir, base_name).await {
651            Ok(result) => result,
652            Err(e) => {
653                tracing::error!("Failed to create file: {}", e);
654                return (
655                    StatusCode::INTERNAL_SERVER_ERROR,
656                    Json(serde_json::json!({"error": format!("Failed to create file: {}", e)})),
657                );
658            }
659        }
660    } else {
661        // No relative path, save directly in download_dir
662        let safe_name = sanitize_file_name(&file_info.name, &file_info.id);
663        match open_unique_file(&download_dir, &safe_name).await {
664            Ok(result) => result,
665            Err(e) => {
666                tracing::error!("Failed to create file: {}", e);
667                return (
668                    StatusCode::INTERNAL_SERVER_ERROR,
669                    Json(serde_json::json!({"error": format!("Failed to create file: {}", e)})),
670                );
671            }
672        }
673    };
674
675    let stored_name = file_path
676        .file_name()
677        .and_then(|n| n.to_str())
678        .unwrap_or(&file_info.id)
679        .to_string();
680
681    // Initialize or get transfer start time and cumulative bytes
682    {
683        let mut start_times = state.transfer_start_times.write().await;
684        start_times
685            .entry(params.transfer_id.clone())
686            .or_insert_with(std::time::Instant::now);
687    }
688
689    // Stream the body to the file
690    let mut bytes_received: u64 = 0;
691    let mut stream = body.into_data_stream();
692    let mut last_progress_bytes: u64 = 0;
693
694    while let Some(chunk) = stream.next().await {
695        match chunk {
696            Ok(data) => {
697                let next_size = bytes_received + data.len() as u64;
698                if next_size > file_info.size {
699                    tracing::error!("Received more data than expected for {}", file_info.name);
700                    drop(file);
701                    let _ = tokio::fs::remove_file(&file_path).await;
702                    return (
703                        StatusCode::PAYLOAD_TOO_LARGE,
704                        Json(serde_json::json!({"error": "Received more data than expected"})),
705                    );
706                }
707
708                if let Err(e) = file.write_all(&data).await {
709                    tracing::error!("Failed to write chunk: {}", e);
710                    return (
711                        StatusCode::INTERNAL_SERVER_ERROR,
712                        Json(serde_json::json!({"error": format!("Failed to write: {}", e)})),
713                    );
714                }
715
716                bytes_received = next_size;
717
718                // Update cumulative transfer bytes
719                {
720                    let mut transfer_bytes = state.transfer_bytes.write().await;
721                    let total = transfer_bytes
722                        .entry(params.transfer_id.clone())
723                        .or_insert(0);
724                    *total += data.len() as u64;
725                }
726
727                // Throttle progress updates to every 32KB
728                if bytes_received - last_progress_bytes >= 32768 || bytes_received == file_info.size
729                {
730                    last_progress_bytes = bytes_received;
731
732                    // Calculate speed based on elapsed time
733                    let speed_bps = {
734                        let start_times = state.transfer_start_times.read().await;
735                        let transfer_bytes = state.transfer_bytes.read().await;
736                        if let (Some(start_time), Some(&total_bytes)) = (
737                            start_times.get(&params.transfer_id),
738                            transfer_bytes.get(&params.transfer_id),
739                        ) {
740                            let elapsed_secs = start_time.elapsed().as_secs_f64();
741                            if elapsed_secs > 0.0 {
742                                (total_bytes as f64 / elapsed_secs) as u64
743                            } else {
744                                0
745                            }
746                        } else {
747                            0
748                        }
749                    };
750
751                    // Send progress update
752                    state.emit_event(EngineEvent::TransferProgress(TransferProgress {
753                        transfer_id: params.transfer_id.clone(),
754                        current_file: Some(stored_name.clone()),
755                        bytes_transferred: bytes_received,
756                        total_bytes: file_info.size,
757                        speed_bps,
758                    }));
759                }
760            }
761            Err(e) => {
762                tracing::error!("Error reading chunk: {}", e);
763                return (
764                    StatusCode::INTERNAL_SERVER_ERROR,
765                    Json(serde_json::json!({"error": format!("Stream error: {}", e)})),
766                );
767            }
768        }
769    }
770
771    // Ensure all data is flushed
772    if let Err(e) = file.flush().await {
773        tracing::error!("Failed to flush file: {}", e);
774    }
775
776    if bytes_received != file_info.size {
777        tracing::warn!(
778            "Size mismatch for {}: expected {}, received {}",
779            file_info.name,
780            file_info.size,
781            bytes_received
782        );
783        let _ = tokio::fs::remove_file(&file_path).await;
784        return (
785            StatusCode::BAD_REQUEST,
786            Json(serde_json::json!({"error": "Incomplete file received"})),
787        );
788    }
789
790    tracing::info!(
791        "File received: {} ({} bytes)",
792        file_path.display(),
793        bytes_received
794    );
795
796    // Track received file and check if transfer is complete
797    let transfer_id = params.transfer_id.clone();
798    let file_id = params.file_id.clone();
799
800    // Add file to received set
801    {
802        let mut received = state.received_files.write().await;
803        received
804            .entry(transfer_id.clone())
805            .or_insert_with(HashSet::new)
806            .insert(file_id);
807    }
808
809    // Check if all files have been received
810    let pending = state.pending_transfers.read().await;
811    if let Some(transfer) = pending.get(&transfer_id) {
812        let expected_count = transfer.files.len();
813        let received = state.received_files.read().await;
814        let received_count = received.get(&transfer_id).map(|s| s.len()).unwrap_or(0);
815
816        if received_count >= expected_count {
817            tracing::info!(
818                "Transfer {} complete: all {} files received",
819                transfer_id,
820                expected_count
821            );
822
823            // Record to history before cleanup (clone transfer info)
824            let transfer_clone = transfer.clone();
825            let total_bytes = *state
826                .transfer_bytes
827                .read()
828                .await
829                .get(&transfer_id)
830                .unwrap_or(&transfer.total_size);
831
832            // Emit completion event
833            state.emit_event(EngineEvent::TransferComplete {
834                transfer_id: transfer_id.clone(),
835            });
836
837            // Record to history
838            state.record_receive_history(
839                &transfer_clone,
840                TransferStatus::Completed,
841                total_bytes,
842                None,
843            );
844
845            // Clean up transfer state (drop the read lock first)
846            drop(pending);
847            drop(received);
848
849            // Remove from tracking maps
850            state.pending_transfers.write().await.remove(&transfer_id);
851            state.approved_tokens.write().await.remove(&transfer_id);
852            state.received_files.write().await.remove(&transfer_id);
853            state.transfer_bytes.write().await.remove(&transfer_id);
854            state
855                .transfer_start_times
856                .write()
857                .await
858                .remove(&transfer_id);
859        }
860    }
861
862    (
863        StatusCode::OK,
864        Json(serde_json::json!({
865            "status": "ok",
866            "file": stored_name,
867            "bytes_received": bytes_received
868        })),
869    )
870}
871
872/// SSE endpoint for real-time transfer events
873async fn events_handler(
874    State(state): State<Arc<ServerState>>,
875) -> Sse<
876    impl futures_util::Stream<Item = Result<axum::response::sse::Event, std::convert::Infallible>>,
877> {
878    let rx = state.internal_event_tx.subscribe();
879
880    let stream = BroadcastStream::new(rx).map(|result: Result<InternalEvent, _>| {
881        let event: InternalEvent = match result {
882            Ok(event) => event,
883            Err(_) => {
884                return Ok::<_, std::convert::Infallible>(
885                    axum::response::sse::Event::default().data("heartbeat"),
886                )
887            }
888        };
889
890        let data = serde_json::to_string(&event).unwrap_or_default();
891        Ok(axum::response::sse::Event::default().data(data))
892    });
893
894    Sse::new(stream)
895}
896
897/// Start the HTTP server and return a handle for controlling it
898pub async fn start_server(state: Arc<ServerState>, port: u16) -> EngineResult<ServerHandle> {
899    let app = create_router(state.clone());
900
901    tracing::info!("Starting server on port {}", port);
902
903    // Try binding to IPv6 wildcard first (dual-stack on most systems)
904    // Fall back to IPv4 only if IPv6 binding fails
905    let addr_v6 = SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port));
906    let addr_v4 = SocketAddr::from(([0, 0, 0, 0], port));
907
908    let listener = match tokio::net::TcpListener::bind(addr_v6).await {
909        Ok(l) => {
910            tracing::info!("Bound to IPv6 wildcard [::]:{}  (dual-stack)", port);
911            l
912        }
913        Err(e) => {
914            tracing::debug!("IPv6 bind failed ({}), falling back to IPv4", e);
915            tokio::net::TcpListener::bind(addr_v4).await.map_err(|e| {
916                EngineError::Network(format!("Failed to bind to port {}: {}", port, e))
917            })?
918        }
919    };
920
921    let (shutdown_tx, shutdown_rx) = oneshot::channel();
922
923    // Spawn the server in the background
924    tokio::spawn(async move {
925        axum::serve(
926            listener,
927            app.into_make_service_with_connect_info::<SocketAddr>(),
928        )
929        .with_graceful_shutdown(async {
930            let _ = shutdown_rx.await;
931        })
932        .await
933        .ok();
934    });
935
936    // Emit server started event
937    state
938        .event_handler
939        .on_event(EngineEvent::ServerStarted { port });
940
941    Ok(ServerHandle { shutdown_tx })
942}