gosh_lan_transfer/
client.rs

1// SPDX-License-Identifier: MIT
2// gosh-lan-transfer - HTTP client for sending file transfers
3//
4// The client explicitly resolves hostnames and attempts all IPs.
5// This ensures reliable connections over LAN, Tailscale, and VPNs.
6
7use crate::config::EngineConfig;
8use crate::error::{EngineError, EngineResult};
9use crate::events::EventHandler;
10use crate::history::HistoryPersistence;
11use crate::protocol::{
12    EngineEvent, TransferApprovalStatus, TransferDecision, TransferFile, TransferProgress,
13    TransferRequest, TransferResponse,
14};
15use crate::types::{
16    NetworkInterface, ResolveResult, TransferDirection, TransferRecord, TransferStatus,
17};
18use futures::StreamExt;
19use reqwest::{Body, Client};
20use std::{
21    net::ToSocketAddrs,
22    path::{Path, PathBuf},
23    sync::{
24        atomic::{AtomicU64, Ordering},
25        Arc,
26    },
27    time::Duration,
28};
29use tokio::{
30    fs::File,
31    time::{sleep, Instant},
32};
33use tokio_util::io::ReaderStream;
34use uuid::Uuid;
35
36/// Client for sending files to a peer
37pub struct TransferClient {
38    http_client: Client,
39    event_handler: Arc<dyn EventHandler>,
40    history: Option<Arc<dyn HistoryPersistence>>,
41    /// Maximum retry attempts
42    max_retries: u32,
43    /// Base delay between retries in milliseconds
44    retry_delay_ms: u64,
45}
46
47/// Parameters for sending a single file
48struct SendFileParams<'a> {
49    address: &'a str,
50    port: u16,
51    transfer_id: &'a str,
52    token: &'a str,
53    file_id: &'a str,
54    file_path: &'a Path,
55    total_transfer_size: u64,
56    bytes_sent_so_far: Arc<AtomicU64>,
57    transfer_start_time: Instant,
58}
59
60impl TransferClient {
61    /// Create a new transfer client with the given event handler
62    pub fn new(event_handler: Arc<dyn EventHandler>) -> Self {
63        let config = EngineConfig::default();
64        Self::new_with_config(event_handler, &config)
65    }
66
67    /// Create a new transfer client with config
68    pub fn new_with_config(event_handler: Arc<dyn EventHandler>, config: &EngineConfig) -> Self {
69        let http_client = Client::builder()
70            // No global timeout - large file transfers can take a long time
71            // Use read_timeout to detect stalled connections instead
72            .read_timeout(Duration::from_secs(60))
73            .connect_timeout(Duration::from_secs(30))
74            .build()
75            .expect("Failed to create HTTP client");
76
77        Self {
78            http_client,
79            event_handler,
80            history: None,
81            max_retries: config.max_retries,
82            retry_delay_ms: config.retry_delay_ms,
83        }
84    }
85
86    /// Create a new transfer client with history persistence
87    pub fn new_with_history(
88        event_handler: Arc<dyn EventHandler>,
89        history: Arc<dyn HistoryPersistence>,
90    ) -> Self {
91        let config = EngineConfig::default();
92        Self::new_with_history_and_config(event_handler, history, &config)
93    }
94
95    /// Create a new transfer client with history and config
96    pub fn new_with_history_and_config(
97        event_handler: Arc<dyn EventHandler>,
98        history: Arc<dyn HistoryPersistence>,
99        config: &EngineConfig,
100    ) -> Self {
101        let http_client = Client::builder()
102            .read_timeout(Duration::from_secs(60))
103            .connect_timeout(Duration::from_secs(30))
104            .build()
105            .expect("Failed to create HTTP client");
106
107        Self {
108            http_client,
109            event_handler,
110            history: Some(history),
111            max_retries: config.max_retries,
112            retry_delay_ms: config.retry_delay_ms,
113        }
114    }
115
116    /// Update retry settings from config
117    pub fn update_config(&mut self, config: &EngineConfig) {
118        self.max_retries = config.max_retries;
119        self.retry_delay_ms = config.retry_delay_ms;
120    }
121
122    /// Check if an error is transient and should be retried
123    fn is_transient_error(error: &EngineError) -> bool {
124        matches!(
125            error,
126            EngineError::Network(_) | EngineError::ConnectionRefused(_)
127        )
128    }
129
130    /// Resolve a hostname or IP to all available addresses
131    pub fn resolve_address(address: &str) -> ResolveResult {
132        // First, check if it's already an IP address
133        if let Ok(ip) = address.parse::<std::net::IpAddr>() {
134            return ResolveResult {
135                hostname: address.to_string(),
136                ips: vec![ip.to_string()],
137                success: true,
138                error: None,
139            };
140        }
141
142        // Attempt DNS resolution
143        let addr_with_port = format!("{}:0", address);
144        match addr_with_port.to_socket_addrs() {
145            Ok(addrs) => {
146                let ips: Vec<String> = addrs.map(|a| a.ip().to_string()).collect();
147
148                if ips.is_empty() {
149                    ResolveResult {
150                        hostname: address.to_string(),
151                        ips: Vec::new(),
152                        success: false,
153                        error: Some("No IP addresses found".to_string()),
154                    }
155                } else {
156                    tracing::info!("Resolved {} to {:?}", address, ips);
157                    ResolveResult {
158                        hostname: address.to_string(),
159                        ips,
160                        success: true,
161                        error: None,
162                    }
163                }
164            }
165            Err(e) => ResolveResult {
166                hostname: address.to_string(),
167                ips: Vec::new(),
168                success: false,
169                error: Some(format!("DNS resolution failed: {}", e)),
170            },
171        }
172    }
173
174    /// Resolve a hostname or IP, returning an error if resolution fails
175    pub fn resolve_address_or_err(address: &str) -> EngineResult<Vec<String>> {
176        let result = Self::resolve_address(address);
177        if result.success {
178            Ok(result.ips)
179        } else {
180            Err(EngineError::DnsResolution(result.error.unwrap_or_else(
181                || format!("Failed to resolve {}", address),
182            )))
183        }
184    }
185
186    /// Check if a peer is reachable by hitting the /health endpoint
187    pub async fn check_peer(&self, address: &str, port: u16) -> EngineResult<bool> {
188        let url = format!("http://{}:{}/health", address, port);
189
190        match self.http_client.get(&url).send().await {
191            Ok(response) => {
192                if response.status().is_success() {
193                    Ok(true)
194                } else {
195                    Err(EngineError::Network(format!(
196                        "Peer returned status {}",
197                        response.status()
198                    )))
199                }
200            }
201            Err(e) => {
202                if e.is_connect() {
203                    Err(EngineError::ConnectionRefused(format!(
204                        "Cannot connect to {}:{} - {}",
205                        address, port, e
206                    )))
207                } else if e.is_timeout() {
208                    Err(EngineError::Network(format!(
209                        "Connection timed out to {}:{}",
210                        address, port
211                    )))
212                } else {
213                    Err(EngineError::Network(format!("Request failed: {}", e)))
214                }
215            }
216        }
217    }
218
219    /// Get peer info
220    pub async fn get_peer_info(&self, address: &str, port: u16) -> EngineResult<serde_json::Value> {
221        let url = format!("http://{}:{}/info", address, port);
222
223        let response = self
224            .http_client
225            .get(&url)
226            .send()
227            .await
228            .map_err(|e| EngineError::Network(format!("Failed to get peer info: {}", e)))?;
229
230        response
231            .json()
232            .await
233            .map_err(|e| EngineError::Serialization(format!("Failed to parse peer info: {}", e)))
234    }
235
236    /// Initiate a transfer request to a peer
237    pub async fn request_transfer(
238        &self,
239        address: &str,
240        port: u16,
241        transfer_id: &str,
242        files: Vec<TransferFile>,
243        sender_name: Option<String>,
244    ) -> EngineResult<TransferResponse> {
245        let total_size: u64 = files.iter().map(|f| f.size).sum();
246
247        let request = TransferRequest {
248            transfer_id: transfer_id.to_string(),
249            sender_name,
250            files,
251            total_size,
252        };
253
254        let url = format!("http://{}:{}/transfer", address, port);
255
256        let mut last_error = None;
257        for attempt in 0..=self.max_retries {
258            let result = self.http_client.post(&url).json(&request).send().await;
259
260            match result {
261                Ok(response) => {
262                    let transfer_response: TransferResponse =
263                        response.json().await.map_err(|e| {
264                            EngineError::Serialization(format!("Failed to parse response: {}", e))
265                        })?;
266                    return Ok(transfer_response);
267                }
268                Err(e) => {
269                    let error = if e.is_connect() {
270                        EngineError::ConnectionRefused(format!(
271                            "Cannot connect to {}:{} - {}",
272                            address, port, e
273                        ))
274                    } else {
275                        EngineError::Network(format!("Transfer request failed: {}", e))
276                    };
277
278                    // Only retry transient errors
279                    if !Self::is_transient_error(&error) || attempt == self.max_retries {
280                        return Err(error);
281                    }
282
283                    // Emit retry event
284                    self.event_handler.on_event(EngineEvent::TransferRetry {
285                        transfer_id: transfer_id.to_string(),
286                        attempt: attempt + 1,
287                        max_attempts: self.max_retries,
288                        error: error.to_string(),
289                    });
290
291                    // Exponential backoff
292                    let delay = self.retry_delay_ms * 2u64.pow(attempt);
293                    sleep(Duration::from_millis(delay)).await;
294
295                    last_error = Some(error);
296                }
297            }
298        }
299
300        Err(last_error.unwrap_or_else(|| EngineError::Network("Unknown error".to_string())))
301    }
302
303    async fn wait_for_approval(
304        &self,
305        address: &str,
306        port: u16,
307        transfer_id: &str,
308    ) -> EngineResult<TransferApprovalStatus> {
309        let url = format!(
310            "http://{}:{}/transfer/status?transfer_id={}",
311            address, port, transfer_id
312        );
313        let timeout = Duration::from_secs(120);
314        let poll_interval = Duration::from_millis(500);
315        let started = Instant::now();
316
317        loop {
318            let response = self.http_client.get(&url).send().await.map_err(|e| {
319                EngineError::Network(format!("Failed to check transfer status: {}", e))
320            })?;
321
322            if !response.status().is_success() {
323                return Err(EngineError::Network(format!(
324                    "Status check failed: {}",
325                    response.status()
326                )));
327            }
328
329            let status: TransferApprovalStatus = response.json().await.map_err(|e| {
330                EngineError::Serialization(format!("Failed to parse status: {}", e))
331            })?;
332
333            match status.status {
334                TransferDecision::Pending => {
335                    if started.elapsed() > timeout {
336                        return Err(EngineError::TransferTimeout);
337                    }
338                    sleep(poll_interval).await;
339                }
340                TransferDecision::Accepted => return Ok(status),
341                TransferDecision::Rejected => return Err(EngineError::TransferRejected),
342                TransferDecision::NotFound => {
343                    return Err(EngineError::TransferNotFound(transfer_id.to_string()))
344                }
345            }
346        }
347    }
348
349    /// Send a file to a peer (after transfer is accepted)
350    async fn send_file(&self, params: SendFileParams<'_>) -> EngineResult<()> {
351        let url = format!(
352            "http://{}:{}/chunk?transfer_id={}&file_id={}&token={}",
353            params.address, params.port, params.transfer_id, params.file_id, params.token
354        );
355
356        // Open and read the file
357        let file = File::open(params.file_path)
358            .await
359            .map_err(|e| EngineError::FileIo(format!("Failed to open file: {}", e)))?;
360
361        let metadata = file
362            .metadata()
363            .await
364            .map_err(|e| EngineError::FileIo(format!("Failed to get file metadata: {}", e)))?;
365
366        let file_size = metadata.len();
367
368        // Create progress-tracking stream
369        let event_handler = self.event_handler.clone();
370        let transfer_id_owned = params.transfer_id.to_string();
371        let file_name = params
372            .file_path
373            .file_name()
374            .unwrap()
375            .to_string_lossy()
376            .to_string();
377        let last_update = Arc::new(AtomicU64::new(0));
378        let total_transfer_size = params.total_transfer_size;
379
380        let stream = ReaderStream::new(file).inspect({
381            let event_handler = event_handler.clone();
382            let transfer_id = transfer_id_owned.clone();
383            let file_name = file_name.clone();
384            let bytes_sent = params.bytes_sent_so_far.clone();
385            let last_update = last_update.clone();
386            let start_time = params.transfer_start_time;
387
388            move |chunk_result| {
389                if let Ok(chunk) = chunk_result {
390                    let new_total = bytes_sent.fetch_add(chunk.len() as u64, Ordering::SeqCst)
391                        + chunk.len() as u64;
392                    let last = last_update.load(Ordering::SeqCst);
393
394                    // Throttle updates to every 32KB to avoid flooding
395                    if new_total - last >= 32768 || new_total == total_transfer_size {
396                        last_update.store(new_total, Ordering::SeqCst);
397
398                        // Calculate speed based on elapsed time
399                        let elapsed_secs = start_time.elapsed().as_secs_f64();
400                        let speed_bps = if elapsed_secs > 0.0 {
401                            (new_total as f64 / elapsed_secs) as u64
402                        } else {
403                            0
404                        };
405
406                        event_handler.on_event(EngineEvent::TransferProgress(TransferProgress {
407                            transfer_id: transfer_id.clone(),
408                            current_file: Some(file_name.clone()),
409                            bytes_transferred: new_total,
410                            total_bytes: total_transfer_size,
411                            speed_bps,
412                        }));
413                    }
414                }
415            }
416        });
417
418        // Send the file
419        let response = self
420            .http_client
421            .post(&url)
422            .header("Content-Type", "application/octet-stream")
423            .header("Content-Length", file_size)
424            .body(Body::wrap_stream(stream))
425            .send()
426            .await
427            .map_err(|e| EngineError::Network(format!("Failed to send file: {}", e)))?;
428
429        if !response.status().is_success() {
430            let error_text = response.text().await.unwrap_or_default();
431            return Err(EngineError::Network(format!(
432                "Server returned error: {}",
433                error_text
434            )));
435        }
436
437        // Send final progress update for this file
438        let final_bytes = params.bytes_sent_so_far.load(Ordering::SeqCst);
439        let elapsed_secs = params.transfer_start_time.elapsed().as_secs_f64();
440        let speed_bps = if elapsed_secs > 0.0 {
441            (final_bytes as f64 / elapsed_secs) as u64
442        } else {
443            0
444        };
445        self.event_handler
446            .on_event(EngineEvent::TransferProgress(TransferProgress {
447                transfer_id: transfer_id_owned,
448                current_file: Some(file_name),
449                bytes_transferred: final_bytes,
450                total_bytes: total_transfer_size,
451                speed_bps,
452            }));
453
454        Ok(())
455    }
456
457    /// Send multiple files to a peer
458    pub async fn send_files(
459        &self,
460        address: &str,
461        port: u16,
462        file_paths: Vec<PathBuf>,
463        sender_name: Option<String>,
464    ) -> EngineResult<()> {
465        let transfer_id = Uuid::new_v4().to_string();
466        let started_at = chrono::Utc::now();
467
468        // Build file list with metadata
469        let mut files = Vec::new();
470        for path in &file_paths {
471            let metadata = tokio::fs::metadata(path)
472                .await
473                .map_err(|e| EngineError::FileIo(format!("Failed to get file info: {}", e)))?;
474
475            let name = path
476                .file_name()
477                .ok_or_else(|| EngineError::FileIo("Invalid file path".to_string()))?
478                .to_string_lossy()
479                .to_string();
480
481            let mime_type = mime_guess::from_path(path).first().map(|m| m.to_string());
482
483            files.push(TransferFile {
484                id: Uuid::new_v4().to_string(),
485                name,
486                size: metadata.len(),
487                mime_type,
488                relative_path: None,
489            });
490        }
491
492        // Calculate total transfer size
493        let total_transfer_size: u64 = files.iter().map(|f| f.size).sum();
494
495        // Helper closure to record history
496        let record_history = |history: &Arc<dyn HistoryPersistence>,
497                              files: &[TransferFile],
498                              status: TransferStatus,
499                              bytes: u64,
500                              error: Option<String>| {
501            let record = TransferRecord {
502                id: transfer_id.clone(),
503                direction: TransferDirection::Sent,
504                status,
505                peer_address: address.to_string(),
506                files: files.to_vec(),
507                total_size: total_transfer_size,
508                bytes_transferred: bytes,
509                started_at,
510                completed_at: Some(chrono::Utc::now()),
511                error,
512            };
513            if let Err(e) = history.add(record) {
514                tracing::warn!("Failed to record transfer history: {}", e);
515            }
516        };
517
518        // Request transfer
519        let response = match self
520            .request_transfer(address, port, &transfer_id, files.clone(), sender_name)
521            .await
522        {
523            Ok(r) => r,
524            Err(e) => {
525                // Record failed transfer to history
526                if let Some(ref history) = self.history {
527                    record_history(
528                        history,
529                        &files,
530                        TransferStatus::Failed,
531                        0,
532                        Some(e.to_string()),
533                    );
534                }
535                self.event_handler.on_event(EngineEvent::TransferFailed {
536                    transfer_id: transfer_id.clone(),
537                    error: e.to_string(),
538                });
539                return Err(e);
540            }
541        };
542
543        let token = if response.accepted {
544            match response.token {
545                Some(t) => t,
546                None => {
547                    let err = EngineError::Network("No token received".to_string());
548                    if let Some(ref history) = self.history {
549                        record_history(
550                            history,
551                            &files,
552                            TransferStatus::Failed,
553                            0,
554                            Some(err.to_string()),
555                        );
556                    }
557                    self.event_handler.on_event(EngineEvent::TransferFailed {
558                        transfer_id: transfer_id.clone(),
559                        error: err.to_string(),
560                    });
561                    return Err(err);
562                }
563            }
564        } else {
565            match self.wait_for_approval(address, port, &transfer_id).await {
566                Ok(status) => match status.token {
567                    Some(t) => t,
568                    None => {
569                        let err = EngineError::Network("No token received".to_string());
570                        if let Some(ref history) = self.history {
571                            record_history(
572                                history,
573                                &files,
574                                TransferStatus::Failed,
575                                0,
576                                Some(err.to_string()),
577                            );
578                        }
579                        self.event_handler.on_event(EngineEvent::TransferFailed {
580                            transfer_id: transfer_id.clone(),
581                            error: err.to_string(),
582                        });
583                        return Err(err);
584                    }
585                },
586                Err(e) => {
587                    // Record rejected/timed out transfer
588                    let status = if matches!(e, EngineError::TransferRejected) {
589                        TransferStatus::Rejected
590                    } else {
591                        TransferStatus::Failed
592                    };
593                    if let Some(ref history) = self.history {
594                        record_history(history, &files, status, 0, Some(e.to_string()));
595                    }
596                    self.event_handler.on_event(EngineEvent::TransferFailed {
597                        transfer_id: transfer_id.clone(),
598                        error: e.to_string(),
599                    });
600                    return Err(e);
601                }
602            }
603        };
604
605        let bytes_sent_so_far = Arc::new(AtomicU64::new(0));
606        let transfer_start_time = Instant::now();
607
608        // Send each file
609        for (file, path) in files.iter().zip(file_paths.iter()) {
610            if let Err(e) = self
611                .send_file(SendFileParams {
612                    address,
613                    port,
614                    transfer_id: &transfer_id,
615                    token: &token,
616                    file_id: &file.id,
617                    file_path: path,
618                    total_transfer_size,
619                    bytes_sent_so_far: bytes_sent_so_far.clone(),
620                    transfer_start_time,
621                })
622                .await
623            {
624                // Record failed transfer
625                let bytes = bytes_sent_so_far.load(Ordering::SeqCst);
626                if let Some(ref history) = self.history {
627                    record_history(
628                        history,
629                        &files,
630                        TransferStatus::Failed,
631                        bytes,
632                        Some(e.to_string()),
633                    );
634                }
635                self.event_handler.on_event(EngineEvent::TransferFailed {
636                    transfer_id: transfer_id.clone(),
637                    error: e.to_string(),
638                });
639                return Err(e);
640            }
641
642            tracing::info!("Sent file: {}", file.name);
643        }
644
645        // Record successful transfer
646        if let Some(ref history) = self.history {
647            record_history(
648                history,
649                &files,
650                TransferStatus::Completed,
651                total_transfer_size,
652                None,
653            );
654        }
655
656        // Emit completion event
657        self.event_handler
658            .on_event(EngineEvent::TransferComplete { transfer_id });
659
660        Ok(())
661    }
662
663    /// Send a directory and all its contents to a peer
664    ///
665    /// Recursively enumerates all files in the directory and sends them
666    /// with their relative paths preserved.
667    pub async fn send_directory(
668        &self,
669        address: &str,
670        port: u16,
671        dir_path: impl AsRef<Path>,
672        sender_name: Option<String>,
673    ) -> EngineResult<()> {
674        let dir_path = dir_path.as_ref();
675
676        let metadata = tokio::fs::metadata(dir_path)
677            .await
678            .map_err(|e| EngineError::FileIo(format!("Failed to access path: {}", e)))?;
679
680        if !metadata.is_dir() {
681            return Err(EngineError::FileIo(format!(
682                "Path is not a directory: {}",
683                dir_path.display()
684            )));
685        }
686
687        // Collect all files recursively
688        let mut files_to_send: Vec<(PathBuf, String)> = Vec::new();
689        Self::collect_directory_files_async(dir_path, dir_path, &mut files_to_send).await?;
690
691        if files_to_send.is_empty() {
692            return Err(EngineError::FileIo("Directory is empty".to_string()));
693        }
694
695        let transfer_id = Uuid::new_v4().to_string();
696        let started_at = chrono::Utc::now();
697
698        // Build file list with metadata and relative paths
699        let mut files = Vec::new();
700        let mut file_paths = Vec::new();
701
702        for (path, relative_path) in &files_to_send {
703            let metadata = tokio::fs::metadata(path)
704                .await
705                .map_err(|e| EngineError::FileIo(format!("Failed to get file info: {}", e)))?;
706
707            let name = path
708                .file_name()
709                .ok_or_else(|| EngineError::FileIo("Invalid file path".to_string()))?
710                .to_string_lossy()
711                .to_string();
712
713            let mime_type = mime_guess::from_path(path).first().map(|m| m.to_string());
714
715            files.push(TransferFile {
716                id: Uuid::new_v4().to_string(),
717                name,
718                size: metadata.len(),
719                mime_type,
720                relative_path: Some(relative_path.clone()),
721            });
722            file_paths.push(path.clone());
723        }
724
725        // Calculate total transfer size
726        let total_transfer_size: u64 = files.iter().map(|f| f.size).sum();
727
728        // Helper closure to record history
729        let record_history = |history: &Arc<dyn HistoryPersistence>,
730                              files: &[TransferFile],
731                              status: TransferStatus,
732                              bytes: u64,
733                              error: Option<String>| {
734            let record = TransferRecord {
735                id: transfer_id.clone(),
736                direction: TransferDirection::Sent,
737                status,
738                peer_address: address.to_string(),
739                files: files.to_vec(),
740                total_size: total_transfer_size,
741                bytes_transferred: bytes,
742                started_at,
743                completed_at: Some(chrono::Utc::now()),
744                error,
745            };
746            if let Err(e) = history.add(record) {
747                tracing::warn!("Failed to record transfer history: {}", e);
748            }
749        };
750
751        // Request transfer
752        let response = match self
753            .request_transfer(address, port, &transfer_id, files.clone(), sender_name)
754            .await
755        {
756            Ok(r) => r,
757            Err(e) => {
758                if let Some(ref history) = self.history {
759                    record_history(
760                        history,
761                        &files,
762                        TransferStatus::Failed,
763                        0,
764                        Some(e.to_string()),
765                    );
766                }
767                self.event_handler.on_event(EngineEvent::TransferFailed {
768                    transfer_id: transfer_id.clone(),
769                    error: e.to_string(),
770                });
771                return Err(e);
772            }
773        };
774
775        let token = if response.accepted {
776            match response.token {
777                Some(t) => t,
778                None => {
779                    let err = EngineError::Network("No token received".to_string());
780                    if let Some(ref history) = self.history {
781                        record_history(
782                            history,
783                            &files,
784                            TransferStatus::Failed,
785                            0,
786                            Some(err.to_string()),
787                        );
788                    }
789                    self.event_handler.on_event(EngineEvent::TransferFailed {
790                        transfer_id: transfer_id.clone(),
791                        error: err.to_string(),
792                    });
793                    return Err(err);
794                }
795            }
796        } else {
797            match self.wait_for_approval(address, port, &transfer_id).await {
798                Ok(status) => match status.token {
799                    Some(t) => t,
800                    None => {
801                        let err = EngineError::Network("No token received".to_string());
802                        if let Some(ref history) = self.history {
803                            record_history(
804                                history,
805                                &files,
806                                TransferStatus::Failed,
807                                0,
808                                Some(err.to_string()),
809                            );
810                        }
811                        self.event_handler.on_event(EngineEvent::TransferFailed {
812                            transfer_id: transfer_id.clone(),
813                            error: err.to_string(),
814                        });
815                        return Err(err);
816                    }
817                },
818                Err(e) => {
819                    let status = if matches!(e, EngineError::TransferRejected) {
820                        TransferStatus::Rejected
821                    } else {
822                        TransferStatus::Failed
823                    };
824                    if let Some(ref history) = self.history {
825                        record_history(history, &files, status, 0, Some(e.to_string()));
826                    }
827                    self.event_handler.on_event(EngineEvent::TransferFailed {
828                        transfer_id: transfer_id.clone(),
829                        error: e.to_string(),
830                    });
831                    return Err(e);
832                }
833            }
834        };
835
836        let bytes_sent_so_far = Arc::new(AtomicU64::new(0));
837        let transfer_start_time = Instant::now();
838
839        // Send each file
840        for (file, path) in files.iter().zip(file_paths.iter()) {
841            if let Err(e) = self
842                .send_file(SendFileParams {
843                    address,
844                    port,
845                    transfer_id: &transfer_id,
846                    token: &token,
847                    file_id: &file.id,
848                    file_path: path,
849                    total_transfer_size,
850                    bytes_sent_so_far: bytes_sent_so_far.clone(),
851                    transfer_start_time,
852                })
853                .await
854            {
855                let bytes = bytes_sent_so_far.load(Ordering::SeqCst);
856                if let Some(ref history) = self.history {
857                    record_history(
858                        history,
859                        &files,
860                        TransferStatus::Failed,
861                        bytes,
862                        Some(e.to_string()),
863                    );
864                }
865                self.event_handler.on_event(EngineEvent::TransferFailed {
866                    transfer_id: transfer_id.clone(),
867                    error: e.to_string(),
868                });
869                return Err(e);
870            }
871
872            tracing::info!(
873                "Sent file: {} ({})",
874                file.name,
875                file.relative_path.as_deref().unwrap_or("")
876            );
877        }
878
879        // Record successful transfer
880        if let Some(ref history) = self.history {
881            record_history(
882                history,
883                &files,
884                TransferStatus::Completed,
885                total_transfer_size,
886                None,
887            );
888        }
889
890        // Emit completion event
891        self.event_handler
892            .on_event(EngineEvent::TransferComplete { transfer_id });
893
894        Ok(())
895    }
896
897    /// Recursively collect all files in a directory with their relative paths (async version)
898    async fn collect_directory_files_async(
899        base_path: &Path,
900        current_path: &Path,
901        files: &mut Vec<(PathBuf, String)>,
902    ) -> EngineResult<()> {
903        let mut entries = tokio::fs::read_dir(current_path)
904            .await
905            .map_err(|e| EngineError::FileIo(format!("Failed to read directory: {}", e)))?;
906
907        while let Some(entry) = entries.next_entry().await.map_err(|e| {
908            EngineError::FileIo(format!("Failed to read directory entry: {}", e))
909        })? {
910            let path = entry.path();
911            let file_type = entry.file_type().await.map_err(|e| {
912                EngineError::FileIo(format!("Failed to get file type: {}", e))
913            })?;
914
915            if file_type.is_file() {
916                let relative = path
917                    .strip_prefix(base_path)
918                    .map_err(|_| EngineError::FileIo("Failed to calculate relative path".to_string()))?
919                    .to_string_lossy()
920                    .to_string();
921                files.push((path, relative));
922            } else if file_type.is_dir() {
923                Box::pin(Self::collect_directory_files_async(base_path, &path, files)).await?;
924            }
925        }
926
927        Ok(())
928    }
929}
930
931/// Get all network interfaces with their IP addresses
932pub fn get_network_interfaces() -> Vec<NetworkInterface> {
933    let mut interfaces = Vec::new();
934
935    if let Ok(addrs) = local_ip_address::list_afinet_netifas() {
936        for (name, ip) in addrs {
937            let is_loopback = ip.is_loopback();
938            interfaces.push(NetworkInterface {
939                name,
940                ip: ip.to_string(),
941                is_loopback,
942            });
943        }
944    }
945
946    interfaces
947}