alopex_chirps_file_transfer/
service.rs

1use crate::TransferSessionId;
2use crate::config::FileTransferConfig;
3use crate::error::FileTransferError;
4use crate::manifest::FileMetadata;
5use crate::metrics::PrometheusMetrics;
6use crate::ops::ChunkStreamOpener;
7use crate::ops::broadcast::broadcast_file_with_context;
8use crate::ops::conversions::from_wire_file_metadata;
9use crate::ops::send::send_file_with_context;
10use crate::ops::sync::sync_file_with_context;
11use crate::ops::{ControlDispatcher, ReceiveHandler};
12use crate::options::{ListOptions, RemoveOptions, SortBy, SyncOptions, TransferOptions};
13use crate::path::PathValidator;
14use crate::persistence::SessionPersistence;
15use crate::progress::{BroadcastHandle, SyncHandle, TransferHandle};
16use crate::session::{TransferControlState, TransferSession, TransferSessionInfo, TransferState};
17use alopex_chirps_core::backend::MessageBackend;
18use alopex_chirps_wire::file_transfer::{
19    CancelRequest, ExistsRequest, FileInfo, FileTransferMessage, ListRequest, MetadataRequest,
20    RemoveRequest,
21};
22use alopex_chirps_wire::node_id::NodeId;
23use async_trait::async_trait;
24use std::collections::{HashMap, HashSet};
25use std::path::Path;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29/// High-level file transfer service API.
30#[async_trait]
31pub trait FileTransferService: Send + Sync {
32    /// Sends a file to a target node.
33    ///
34    /// # Errors
35    /// Returns `FileTransferError` for validation, I/O, or transport failures.
36    ///
37    /// # Panics
38    /// This method does not panic.
39    ///
40    /// # Examples
41    /// ```no_run
42    /// use alopex_chirps_file_transfer::{FileTransferService, TransferOptions};
43    /// use alopex_chirps_wire::node_id::NodeId;
44    /// use std::path::Path;
45    ///
46    /// async fn send(
47    ///     service: &dyn FileTransferService,
48    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
49    ///     let target = NodeId::new();
50    ///     service
51    ///         .send_file(
52    ///             target,
53    ///             Path::new("source.bin"),
54    ///             Path::new("dest.bin"),
55    ///             TransferOptions::default(),
56    ///         )
57    ///         .await?;
58    ///     Ok(())
59    /// }
60    /// ```
61    async fn send_file(
62        &self,
63        target: NodeId,
64        source_path: &Path,
65        dest_path: &Path,
66        options: TransferOptions,
67    ) -> Result<TransferHandle, FileTransferError>;
68
69    /// Sends a file to all connected peers.
70    ///
71    /// # Errors
72    /// Returns `FileTransferError` for validation, I/O, or transport failures.
73    ///
74    /// # Panics
75    /// This method does not panic.
76    ///
77    /// # Examples
78    /// ```no_run
79    /// use alopex_chirps_file_transfer::{FileTransferService, TransferOptions};
80    /// use std::path::Path;
81    ///
82    /// async fn broadcast(
83    ///     service: &dyn FileTransferService,
84    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
85    ///     service
86    ///         .broadcast_file(
87    ///             Path::new("source.bin"),
88    ///             Path::new("dest.bin"),
89    ///             TransferOptions::default(),
90    ///         )
91    ///         .await?;
92    ///     Ok(())
93    /// }
94    /// ```
95    async fn broadcast_file(
96        &self,
97        source_path: &Path,
98        dest_path: &Path,
99        options: TransferOptions,
100    ) -> Result<BroadcastHandle, FileTransferError>;
101
102    /// Synchronizes a local path with a remote path.
103    ///
104    /// # Errors
105    /// Returns `FileTransferError` for validation, I/O, or transport failures.
106    ///
107    /// # Panics
108    /// This method does not panic.
109    ///
110    /// # Examples
111    /// ```no_run
112    /// use alopex_chirps_file_transfer::{FileTransferService, SyncOptions};
113    /// use alopex_chirps_wire::node_id::NodeId;
114    /// use std::path::Path;
115    ///
116    /// async fn sync(
117    ///     service: &dyn FileTransferService,
118    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
119    ///     let target = NodeId::new();
120    ///     service
121    ///         .sync_file(
122    ///             Path::new("local.db"),
123    ///             Path::new("remote.db"),
124    ///             Some(vec![target]),
125    ///             SyncOptions::default(),
126    ///         )
127    ///         .await?;
128    ///     Ok(())
129    /// }
130    /// ```
131    async fn sync_file(
132        &self,
133        local_path: &Path,
134        remote_path: &Path,
135        targets: Option<Vec<NodeId>>,
136        options: SyncOptions,
137    ) -> Result<SyncHandle, FileTransferError>;
138
139    /// Checks if a path exists on a target node.
140    ///
141    /// # Errors
142    /// Returns `FileTransferError` for validation or transport failures.
143    ///
144    /// # Panics
145    /// This method does not panic.
146    ///
147    /// # Examples
148    /// ```no_run
149    /// use alopex_chirps_file_transfer::FileTransferService;
150    /// use alopex_chirps_wire::node_id::NodeId;
151    /// use std::path::Path;
152    ///
153    /// async fn check(
154    ///     service: &dyn FileTransferService,
155    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
156    ///     let target = NodeId::new();
157    ///     let exists = service.exists(target, Path::new("data.bin")).await?;
158    ///     let _ = exists;
159    ///     Ok(())
160    /// }
161    /// ```
162    async fn exists(&self, target: NodeId, path: &Path) -> Result<bool, FileTransferError>;
163    /// Removes a file or directory on a target node.
164    ///
165    /// # Errors
166    /// Returns `FileTransferError` for validation or transport failures.
167    ///
168    /// # Panics
169    /// This method does not panic.
170    ///
171    /// # Examples
172    /// ```no_run
173    /// use alopex_chirps_file_transfer::{FileTransferService, RemoveOptions};
174    /// use alopex_chirps_wire::node_id::NodeId;
175    /// use std::path::Path;
176    ///
177    /// async fn remove_path(
178    ///     service: &dyn FileTransferService,
179    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
180    ///     let target = NodeId::new();
181    ///     service
182    ///         .remove(target, Path::new("data.bin"), RemoveOptions::default())
183    ///         .await?;
184    ///     Ok(())
185    /// }
186    /// ```
187    async fn remove(
188        &self,
189        target: NodeId,
190        path: &Path,
191        options: RemoveOptions,
192    ) -> Result<(), FileTransferError>;
193    /// Fetches metadata for a file or directory on a target node.
194    ///
195    /// # Errors
196    /// Returns `FileTransferError` for validation or transport failures.
197    ///
198    /// # Panics
199    /// This method does not panic.
200    ///
201    /// # Examples
202    /// ```no_run
203    /// use alopex_chirps_file_transfer::FileTransferService;
204    /// use alopex_chirps_wire::node_id::NodeId;
205    /// use std::path::Path;
206    ///
207    /// async fn read_metadata(
208    ///     service: &dyn FileTransferService,
209    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
210    ///     let target = NodeId::new();
211    ///     let metadata = service.metadata(target, Path::new("data.bin")).await?;
212    ///     let _ = metadata;
213    ///     Ok(())
214    /// }
215    /// ```
216    async fn metadata(
217        &self,
218        target: NodeId,
219        path: &Path,
220    ) -> Result<FileMetadata, FileTransferError>;
221    /// Lists files within a directory on a target node.
222    ///
223    /// # Errors
224    /// Returns `FileTransferError` for validation or transport failures.
225    ///
226    /// # Panics
227    /// This method does not panic.
228    ///
229    /// # Examples
230    /// ```no_run
231    /// use alopex_chirps_file_transfer::{FileTransferService, ListOptions};
232    /// use alopex_chirps_wire::node_id::NodeId;
233    /// use std::path::Path;
234    ///
235    /// async fn list(
236    ///     service: &dyn FileTransferService,
237    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
238    ///     let target = NodeId::new();
239    ///     let files = service
240    ///         .list_files(target, Path::new("data"), ListOptions::default())
241    ///         .await?;
242    ///     let _ = files;
243    ///     Ok(())
244    /// }
245    /// ```
246    async fn list_files(
247        &self,
248        target: NodeId,
249        dir_path: &Path,
250        options: ListOptions,
251    ) -> Result<Vec<FileInfo>, FileTransferError>;
252
253    /// Cancels an active transfer by session id.
254    ///
255    /// # Errors
256    /// Returns `FileTransferError` if the session cannot be cancelled.
257    ///
258    /// # Panics
259    /// This method does not panic.
260    ///
261    /// # Examples
262    /// ```no_run
263    /// use alopex_chirps_file_transfer::{FileTransferService, TransferSessionId};
264    ///
265    /// async fn cancel(
266    ///     service: &dyn FileTransferService,
267    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
268    ///     let session_id = TransferSessionId::new();
269    ///     service.cancel_transfer(session_id).await?;
270    ///     Ok(())
271    /// }
272    /// ```
273    async fn cancel_transfer(&self, session_id: TransferSessionId)
274    -> Result<(), FileTransferError>;
275    /// Pauses an active transfer by session id.
276    ///
277    /// # Errors
278    /// Returns `FileTransferError` if the session cannot be paused.
279    ///
280    /// # Panics
281    /// This method does not panic.
282    ///
283    /// # Examples
284    /// ```no_run
285    /// use alopex_chirps_file_transfer::{FileTransferService, TransferSessionId};
286    ///
287    /// async fn pause(
288    ///     service: &dyn FileTransferService,
289    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
290    ///     let session_id = TransferSessionId::new();
291    ///     service.pause_transfer(session_id).await?;
292    ///     Ok(())
293    /// }
294    /// ```
295    async fn pause_transfer(&self, session_id: TransferSessionId) -> Result<(), FileTransferError>;
296    /// Resumes a paused transfer by session id.
297    ///
298    /// # Errors
299    /// Returns `FileTransferError` if the session cannot be resumed.
300    ///
301    /// # Panics
302    /// This method does not panic.
303    ///
304    /// # Examples
305    /// ```no_run
306    /// use alopex_chirps_file_transfer::{FileTransferService, TransferSessionId};
307    ///
308    /// async fn resume(
309    ///     service: &dyn FileTransferService,
310    /// ) -> Result<(), alopex_chirps_file_transfer::FileTransferError> {
311    ///     let session_id = TransferSessionId::new();
312    ///     let handle = service.resume_transfer(session_id).await?;
313    ///     let _ = handle;
314    ///     Ok(())
315    /// }
316    /// ```
317    async fn resume_transfer(
318        &self,
319        session_id: TransferSessionId,
320    ) -> Result<TransferHandle, FileTransferError>;
321    /// Returns a snapshot of active transfer sessions.
322    ///
323    /// # Examples
324    /// ```no_run
325    /// use alopex_chirps_file_transfer::FileTransferService;
326    ///
327    /// fn snapshot(service: &dyn FileTransferService) {
328    ///     let sessions = service.active_transfers();
329    ///     let _ = sessions;
330    /// }
331    /// ```
332    ///
333    /// # Panics
334    /// This method does not panic.
335    fn active_transfers(&self) -> Vec<TransferSessionInfo>;
336}
337
338/// Default file transfer service implementation.
339pub struct FileTransferServiceImpl {
340    source_node: NodeId,
341    backend: Arc<dyn MessageBackend>,
342    control: Arc<ControlDispatcher>,
343    stream_opener: Arc<dyn ChunkStreamOpener>,
344    receive_handler: Arc<ReceiveHandler>,
345    config: FileTransferConfig,
346    path_validator: PathValidator,
347    sessions: Arc<RwLock<HashMap<TransferSessionId, TransferSession>>>,
348    persistence: Arc<SessionPersistence>,
349    metrics: Option<Arc<PrometheusMetrics>>,
350}
351
352impl FileTransferServiceImpl {
353    /// Creates a new service and starts internal control handling.
354    ///
355    /// # Errors
356    /// Returns `FileTransferError::Transport` if subscribing to the backend fails.
357    ///
358    /// # Panics
359    /// This method does not panic.
360    pub async fn new(
361        source_node: NodeId,
362        backend: Arc<dyn MessageBackend>,
363        stream_opener: Arc<dyn ChunkStreamOpener>,
364        config: FileTransferConfig,
365    ) -> Result<Self, FileTransferError> {
366        let receiver = backend
367            .subscribe()
368            .await
369            .map_err(|err| FileTransferError::Transport(err.to_string()))?;
370        let control = ControlDispatcher::new(Arc::clone(&backend), receiver);
371        let sessions = Arc::new(RwLock::new(HashMap::new()));
372        let path_validator = PathValidator::new(config.base_path.clone(), false);
373        let persistence = Arc::new(SessionPersistence::new(&config));
374        let metrics = PrometheusMetrics::register().ok().map(Arc::new);
375        let sync_sessions = Arc::new(RwLock::new(HashSet::new()));
376        let receive_handler = Arc::new(ReceiveHandler::new(
377            config.clone(),
378            path_validator.clone(),
379            Arc::clone(&sessions),
380            Some(Arc::clone(&persistence)),
381            metrics.clone(),
382            Arc::clone(&sync_sessions),
383        ));
384
385        let cancel_control = Arc::clone(&control);
386        let cancel_sessions = Arc::clone(&sessions);
387        let cancel_persistence = Arc::clone(&persistence);
388        let cancel_metrics = metrics.clone();
389        let cancel_wait = config.idle_timeout;
390        tokio::spawn(async move {
391            loop {
392                let message = cancel_control
393                    .recv_any_filtered(cancel_wait, |_, msg| {
394                        matches!(msg, FileTransferMessage::Cancel(_))
395                    })
396                    .await;
397                let (session_id, _, message) = match message {
398                    Ok(message) => message,
399                    Err(FileTransferError::Timeout) => continue,
400                    Err(FileTransferError::Transport(_)) => break,
401                    Err(_) => continue,
402                };
403                let FileTransferMessage::Cancel(request) = message else {
404                    continue;
405                };
406                let mut sessions = cancel_sessions.write().await;
407                if let Some(session) = sessions.get_mut(&session_id) {
408                    let was_active = matches!(
409                        session.state,
410                        TransferState::Initializing
411                            | TransferState::InProgress
412                            | TransferState::Paused
413                            | TransferState::Verifying
414                    );
415                    session.state = TransferState::Cancelled;
416                    session.error = Some(request.reason);
417                    session.updated_at = std::time::SystemTime::now();
418                    session.control.set_state(TransferControlState::Cancelled);
419                    if session.options.resumable {
420                        let _ = cancel_persistence.save(session).await;
421                    }
422                    if let Some(metrics) = &cancel_metrics {
423                        metrics.record_transfer(session.kind, "cancelled");
424                        if was_active {
425                            metrics.active_transfers.dec();
426                        }
427                    }
428                }
429            }
430        });
431
432        Ok(FileTransferServiceImpl {
433            source_node,
434            backend,
435            control,
436            stream_opener,
437            receive_handler,
438            config,
439            path_validator,
440            sessions,
441            persistence,
442            metrics,
443        })
444    }
445
446    /// Returns the control dispatcher for sending/receiving control messages.
447    ///
448    /// # Panics
449    /// This method does not panic.
450    pub fn control(&self) -> Arc<ControlDispatcher> {
451        Arc::clone(&self.control)
452    }
453
454    /// Returns the receive handler for incoming transfer data.
455    ///
456    /// # Panics
457    /// This method does not panic.
458    pub fn receive_handler(&self) -> Arc<ReceiveHandler> {
459        Arc::clone(&self.receive_handler)
460    }
461
462    /// Returns the path validator used for file operations.
463    ///
464    /// # Panics
465    /// This method does not panic.
466    pub fn path_validator(&self) -> &PathValidator {
467        &self.path_validator
468    }
469
470    async fn record_session(&self, session: TransferSession) {
471        let mut sessions = self.sessions.write().await;
472        sessions.insert(session.id, session);
473    }
474
475    async fn load_session(
476        &self,
477        session_id: TransferSessionId,
478    ) -> Result<TransferSession, FileTransferError> {
479        if let Some(session) = self.sessions.read().await.get(&session_id).cloned() {
480            return Ok(session);
481        }
482        let session = self.persistence.load(session_id).await?;
483        self.record_session(session.clone()).await;
484        Ok(session)
485    }
486
487    fn select_target(&self, targets: Option<Vec<NodeId>>) -> Result<NodeId, FileTransferError> {
488        let mut candidates = match targets {
489            Some(list) => list,
490            None => self
491                .backend
492                .connected_peers()
493                .into_iter()
494                .map(|(node_id, _)| node_id)
495                .collect(),
496        };
497        candidates.retain(|node_id| *node_id != self.source_node);
498        match candidates.len() {
499            1 => Ok(candidates[0]),
500            0 => Err(FileTransferError::Internal(
501                "no target nodes available for sync".into(),
502            )),
503            _ => Err(FileTransferError::Internal(
504                "sync requires exactly one target node".into(),
505            )),
506        }
507    }
508
509    fn list_filter_match(entry: &FileInfo, options: &ListOptions) -> bool {
510        if options.files_only
511            && !matches!(
512                entry.file_type,
513                alopex_chirps_wire::file_transfer::FileType::File
514            )
515        {
516            return false;
517        }
518        if options.directories_only
519            && !matches!(
520                entry.file_type,
521                alopex_chirps_wire::file_transfer::FileType::Directory
522            )
523        {
524            return false;
525        }
526        if let Some(pattern) = &options.pattern {
527            let name = Path::new(&entry.path)
528                .file_name()
529                .and_then(|name| name.to_str())
530                .unwrap_or("");
531            if !name.contains(pattern) {
532                return false;
533            }
534        }
535        true
536    }
537
538    fn sort_files(files: &mut [FileInfo], sort_by: SortBy) {
539        match sort_by {
540            SortBy::Name => files.sort_by(|a, b| a.path.cmp(&b.path)),
541            SortBy::Size => files.sort_by_key(|entry| entry.size),
542            SortBy::ModifiedTime => files.sort_by_key(|entry| entry.modified_at),
543        }
544    }
545
546    async fn collect_active_transfers(&self) -> Vec<TransferSessionInfo> {
547        let sessions = self.sessions.read().await;
548        sessions
549            .values()
550            .filter(|session| {
551                matches!(
552                    session.state,
553                    TransferState::Initializing | TransferState::InProgress | TransferState::Paused
554                )
555            })
556            .map(TransferSessionInfo::from)
557            .collect()
558    }
559}
560
561#[async_trait]
562impl FileTransferService for FileTransferServiceImpl {
563    async fn send_file(
564        &self,
565        target: NodeId,
566        source_path: &Path,
567        dest_path: &Path,
568        options: TransferOptions,
569    ) -> Result<TransferHandle, FileTransferError> {
570        let result = send_file_with_context(
571            Arc::clone(&self.control),
572            Arc::clone(&self.stream_opener),
573            self.config.clone(),
574            self.source_node,
575            target,
576            source_path,
577            dest_path,
578            options.clone(),
579            crate::session::TransferKind::Send,
580            Some(Arc::clone(&self.sessions)),
581            Some(Arc::clone(&self.persistence)),
582            self.metrics.clone(),
583            None,
584            true,
585        )
586        .await?;
587        Ok(result.handle)
588    }
589
590    async fn broadcast_file(
591        &self,
592        source_path: &Path,
593        dest_path: &Path,
594        options: TransferOptions,
595    ) -> Result<BroadcastHandle, FileTransferError> {
596        let targets: Vec<NodeId> = self
597            .backend
598            .connected_peers()
599            .into_iter()
600            .map(|(node_id, _)| node_id)
601            .filter(|node_id| *node_id != self.source_node)
602            .collect();
603        if targets.is_empty() {
604            return Err(FileTransferError::Internal(
605                "no connected peers available for broadcast".into(),
606            ));
607        }
608        let result = broadcast_file_with_context(
609            Arc::clone(&self.control),
610            Arc::clone(&self.stream_opener),
611            self.config.clone(),
612            self.source_node,
613            targets,
614            source_path,
615            dest_path,
616            options.clone(),
617            Some(Arc::clone(&self.sessions)),
618            Some(Arc::clone(&self.persistence)),
619            self.metrics.clone(),
620        )
621        .await?;
622        Ok(result.handle)
623    }
624
625    async fn sync_file(
626        &self,
627        local_path: &Path,
628        remote_path: &Path,
629        targets: Option<Vec<NodeId>>,
630        options: SyncOptions,
631    ) -> Result<SyncHandle, FileTransferError> {
632        let target = self.select_target(targets)?;
633        sync_file_with_context(
634            Arc::clone(&self.control),
635            Arc::clone(&self.stream_opener),
636            Arc::clone(&self.receive_handler),
637            self.config.clone(),
638            self.source_node,
639            target,
640            local_path,
641            remote_path,
642            options,
643            Some(Arc::clone(&self.sessions)),
644            Some(Arc::clone(&self.persistence)),
645            self.metrics.clone(),
646        )
647        .await
648    }
649
650    async fn exists(&self, target: NodeId, path: &Path) -> Result<bool, FileTransferError> {
651        let session_id = TransferSessionId::new();
652        self.control
653            .send_message(
654                target,
655                session_id,
656                FileTransferMessage::ExistsRequest(ExistsRequest {
657                    path: path.display().to_string(),
658                }),
659            )
660            .await?;
661        let (_, message) = self
662            .control
663            .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
664                matches!(msg, FileTransferMessage::ExistsResponse(_))
665            })
666            .await?;
667        match message {
668            FileTransferMessage::ExistsResponse(response) => Ok(response.exists),
669            _ => Err(FileTransferError::Internal(
670                "unexpected exists response message".into(),
671            )),
672        }
673    }
674
675    async fn remove(
676        &self,
677        target: NodeId,
678        path: &Path,
679        options: RemoveOptions,
680    ) -> Result<(), FileTransferError> {
681        let session_id = TransferSessionId::new();
682        self.control
683            .send_message(
684                target,
685                session_id,
686                FileTransferMessage::RemoveRequest(RemoveRequest {
687                    path: path.display().to_string(),
688                    recursive: options.recursive,
689                }),
690            )
691            .await?;
692        let (_, message) = self
693            .control
694            .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
695                matches!(msg, FileTransferMessage::RemoveResponse(_))
696            })
697            .await?;
698        match message {
699            FileTransferMessage::RemoveResponse(response) => {
700                if response.success {
701                    Ok(())
702                } else {
703                    Err(FileTransferError::Internal(
704                        response.error.unwrap_or_else(|| "remove failed".into()),
705                    ))
706                }
707            }
708            _ => Err(FileTransferError::Internal(
709                "unexpected remove response message".into(),
710            )),
711        }
712    }
713
714    async fn metadata(
715        &self,
716        target: NodeId,
717        path: &Path,
718    ) -> Result<FileMetadata, FileTransferError> {
719        let session_id = TransferSessionId::new();
720        self.control
721            .send_message(
722                target,
723                session_id,
724                FileTransferMessage::MetadataRequest(MetadataRequest {
725                    path: path.display().to_string(),
726                }),
727            )
728            .await?;
729        let (_, message) = self
730            .control
731            .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
732                matches!(msg, FileTransferMessage::MetadataResponse(_))
733            })
734            .await?;
735        match message {
736            FileTransferMessage::MetadataResponse(response) => {
737                if let Some(error) = response.error {
738                    return Err(FileTransferError::Internal(error));
739                }
740                if !response.found {
741                    return Err(FileTransferError::FileNotFound(path.display().to_string()));
742                }
743                let metadata = response.metadata.ok_or_else(|| {
744                    FileTransferError::Internal("metadata response missing metadata".into())
745                })?;
746                let mut metadata = from_wire_file_metadata(metadata);
747                if metadata.size.is_none() {
748                    metadata.size = response.size;
749                }
750                Ok(metadata)
751            }
752            _ => Err(FileTransferError::Internal(
753                "unexpected metadata response message".into(),
754            )),
755        }
756    }
757
758    async fn list_files(
759        &self,
760        target: NodeId,
761        dir_path: &Path,
762        options: ListOptions,
763    ) -> Result<Vec<FileInfo>, FileTransferError> {
764        let session_id = TransferSessionId::new();
765        self.control
766            .send_message(
767                target,
768                session_id,
769                FileTransferMessage::ListRequest(ListRequest {
770                    path: dir_path.display().to_string(),
771                    recursive: options.recursive,
772                    include_hidden: options.include_hidden,
773                }),
774            )
775            .await?;
776        let (_, message) = self
777            .control
778            .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
779                matches!(msg, FileTransferMessage::ListResponse(_))
780            })
781            .await?;
782        let mut files = match message {
783            FileTransferMessage::ListResponse(response) => {
784                if let Some(error) = response.error {
785                    return Err(FileTransferError::Internal(error));
786                }
787                response.files
788            }
789            _ => {
790                return Err(FileTransferError::Internal(
791                    "unexpected list response message".into(),
792                ));
793            }
794        };
795
796        files.retain(|entry| Self::list_filter_match(entry, &options));
797        Self::sort_files(&mut files, options.sort_by);
798        if options.limit > 0 && files.len() > options.limit {
799            files.truncate(options.limit);
800        }
801        Ok(files)
802    }
803
804    async fn cancel_transfer(
805        &self,
806        session_id: TransferSessionId,
807    ) -> Result<(), FileTransferError> {
808        let session = self.load_session(session_id).await?;
809        let targets = if session.target_nodes.is_empty() {
810            vec![session.source_node]
811        } else {
812            session.target_nodes.clone()
813        };
814        for target in targets {
815            self.control
816                .send_message(
817                    target,
818                    session_id,
819                    FileTransferMessage::Cancel(CancelRequest {
820                        reason: "cancelled by user".into(),
821                    }),
822                )
823                .await?;
824        }
825
826        let mut sessions = self.sessions.write().await;
827        if let Some(session) = sessions.get_mut(&session_id) {
828            let was_active = matches!(
829                session.state,
830                TransferState::Initializing
831                    | TransferState::InProgress
832                    | TransferState::Paused
833                    | TransferState::Verifying
834            );
835            if session.state != TransferState::Cancelled {
836                session.transition_to(TransferState::Cancelled)?;
837            }
838            session.control.set_state(TransferControlState::Cancelled);
839            if session.options.resumable {
840                self.persistence.save(session).await?;
841            }
842            if was_active && let Some(metrics) = &self.metrics {
843                metrics.record_transfer(session.kind, "cancelled");
844                metrics.active_transfers.dec();
845            }
846        }
847        Ok(())
848    }
849
850    async fn pause_transfer(&self, session_id: TransferSessionId) -> Result<(), FileTransferError> {
851        let mut sessions = self.sessions.write().await;
852        let session = sessions
853            .get_mut(&session_id)
854            .ok_or(FileTransferError::SessionNotFound(session_id))?;
855        if session.state != TransferState::Paused {
856            session.transition_to(TransferState::Paused)?;
857            session.control.set_state(TransferControlState::Paused);
858            if session.options.resumable {
859                self.persistence.save(session).await?;
860            }
861        }
862        Ok(())
863    }
864
865    async fn resume_transfer(
866        &self,
867        session_id: TransferSessionId,
868    ) -> Result<TransferHandle, FileTransferError> {
869        if self.sessions.read().await.contains_key(&session_id) {
870            let mut sessions = self.sessions.write().await;
871            let session = sessions
872                .get_mut(&session_id)
873                .ok_or(FileTransferError::SessionNotFound(session_id))?;
874            if session.state == TransferState::Paused {
875                session.transition_to(TransferState::InProgress)?;
876                session.control.set_state(TransferControlState::Running);
877            }
878
879            let handle = TransferHandle::new(session.manifest.file_size);
880            let completed_bytes = session
881                .chunk_tracker
882                .completed
883                .iter()
884                .filter_map(|index| session.manifest.chunks.get(*index as usize))
885                .map(|meta| meta.size as u64)
886                .sum();
887            let completed_chunks = session.chunk_tracker.completed.len() as u32;
888            handle
889                .update_progress(completed_bytes, completed_chunks)
890                .await;
891
892            if session.options.resumable {
893                self.persistence.save(session).await?;
894            }
895
896            return Ok(handle);
897        }
898
899        let session = self.persistence.load(session_id).await?;
900        if session.source_node != self.source_node {
901            return Err(FileTransferError::Internal(
902                "resume is only supported for sender sessions".into(),
903            ));
904        }
905        let target =
906            session.target_nodes.first().copied().ok_or_else(|| {
907                FileTransferError::Internal("resume session missing target".into())
908            })?;
909        let source_path = session.source_path.clone();
910        let dest_path = session.dest_path.clone();
911        let result = send_file_with_context(
912            Arc::clone(&self.control),
913            Arc::clone(&self.stream_opener),
914            self.config.clone(),
915            session.source_node,
916            target,
917            &source_path,
918            &dest_path,
919            session.options.clone(),
920            session.kind,
921            Some(Arc::clone(&self.sessions)),
922            Some(Arc::clone(&self.persistence)),
923            self.metrics.clone(),
924            Some(session),
925            true,
926        )
927        .await?;
928
929        Ok(result.handle)
930    }
931
932    fn active_transfers(&self) -> Vec<TransferSessionInfo> {
933        if let Ok(handle) = tokio::runtime::Handle::try_current()
934            && matches!(
935                handle.runtime_flavor(),
936                tokio::runtime::RuntimeFlavor::MultiThread
937            )
938        {
939            return tokio::task::block_in_place(|| {
940                handle.block_on(self.collect_active_transfers())
941            });
942        }
943
944        if let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
945            .enable_all()
946            .build()
947        {
948            return runtime.block_on(self.collect_active_transfers());
949        }
950
951        Vec::new()
952    }
953}