1use crate::TransferSessionId;
2use crate::config::FileTransferConfig;
3use crate::error::FileTransferError;
4use crate::manifest::FileMetadata;
5use crate::metrics::PrometheusMetrics;
6use crate::ops::ChunkStreamOpener;
7use crate::ops::broadcast::broadcast_file_with_context;
8use crate::ops::conversions::from_wire_file_metadata;
9use crate::ops::send::send_file_with_context;
10use crate::ops::sync::sync_file_with_context;
11use crate::ops::{ControlDispatcher, ReceiveHandler};
12use crate::options::{ListOptions, RemoveOptions, SortBy, SyncOptions, TransferOptions};
13use crate::path::PathValidator;
14use crate::persistence::SessionPersistence;
15use crate::progress::{BroadcastHandle, SyncHandle, TransferHandle};
16use crate::session::{TransferControlState, TransferSession, TransferSessionInfo, TransferState};
17use alopex_chirps_core::backend::MessageBackend;
18use alopex_chirps_wire::file_transfer::{
19 CancelRequest, ExistsRequest, FileInfo, FileTransferMessage, ListRequest, MetadataRequest,
20 RemoveRequest,
21};
22use alopex_chirps_wire::node_id::NodeId;
23use async_trait::async_trait;
24use std::collections::{HashMap, HashSet};
25use std::path::Path;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29#[async_trait]
31pub trait FileTransferService: Send + Sync {
32 async fn send_file(
62 &self,
63 target: NodeId,
64 source_path: &Path,
65 dest_path: &Path,
66 options: TransferOptions,
67 ) -> Result<TransferHandle, FileTransferError>;
68
69 async fn broadcast_file(
96 &self,
97 source_path: &Path,
98 dest_path: &Path,
99 options: TransferOptions,
100 ) -> Result<BroadcastHandle, FileTransferError>;
101
102 async fn sync_file(
132 &self,
133 local_path: &Path,
134 remote_path: &Path,
135 targets: Option<Vec<NodeId>>,
136 options: SyncOptions,
137 ) -> Result<SyncHandle, FileTransferError>;
138
139 async fn exists(&self, target: NodeId, path: &Path) -> Result<bool, FileTransferError>;
163 async fn remove(
188 &self,
189 target: NodeId,
190 path: &Path,
191 options: RemoveOptions,
192 ) -> Result<(), FileTransferError>;
193 async fn metadata(
217 &self,
218 target: NodeId,
219 path: &Path,
220 ) -> Result<FileMetadata, FileTransferError>;
221 async fn list_files(
247 &self,
248 target: NodeId,
249 dir_path: &Path,
250 options: ListOptions,
251 ) -> Result<Vec<FileInfo>, FileTransferError>;
252
253 async fn cancel_transfer(&self, session_id: TransferSessionId)
274 -> Result<(), FileTransferError>;
275 async fn pause_transfer(&self, session_id: TransferSessionId) -> Result<(), FileTransferError>;
296 async fn resume_transfer(
318 &self,
319 session_id: TransferSessionId,
320 ) -> Result<TransferHandle, FileTransferError>;
321 fn active_transfers(&self) -> Vec<TransferSessionInfo>;
336}
337
338pub struct FileTransferServiceImpl {
340 source_node: NodeId,
341 backend: Arc<dyn MessageBackend>,
342 control: Arc<ControlDispatcher>,
343 stream_opener: Arc<dyn ChunkStreamOpener>,
344 receive_handler: Arc<ReceiveHandler>,
345 config: FileTransferConfig,
346 path_validator: PathValidator,
347 sessions: Arc<RwLock<HashMap<TransferSessionId, TransferSession>>>,
348 persistence: Arc<SessionPersistence>,
349 metrics: Option<Arc<PrometheusMetrics>>,
350}
351
352impl FileTransferServiceImpl {
353 pub async fn new(
361 source_node: NodeId,
362 backend: Arc<dyn MessageBackend>,
363 stream_opener: Arc<dyn ChunkStreamOpener>,
364 config: FileTransferConfig,
365 ) -> Result<Self, FileTransferError> {
366 let receiver = backend
367 .subscribe()
368 .await
369 .map_err(|err| FileTransferError::Transport(err.to_string()))?;
370 let control = ControlDispatcher::new(Arc::clone(&backend), receiver);
371 let sessions = Arc::new(RwLock::new(HashMap::new()));
372 let path_validator = PathValidator::new(config.base_path.clone(), false);
373 let persistence = Arc::new(SessionPersistence::new(&config));
374 let metrics = PrometheusMetrics::register().ok().map(Arc::new);
375 let sync_sessions = Arc::new(RwLock::new(HashSet::new()));
376 let receive_handler = Arc::new(ReceiveHandler::new(
377 config.clone(),
378 path_validator.clone(),
379 Arc::clone(&sessions),
380 Some(Arc::clone(&persistence)),
381 metrics.clone(),
382 Arc::clone(&sync_sessions),
383 ));
384
385 let cancel_control = Arc::clone(&control);
386 let cancel_sessions = Arc::clone(&sessions);
387 let cancel_persistence = Arc::clone(&persistence);
388 let cancel_metrics = metrics.clone();
389 let cancel_wait = config.idle_timeout;
390 tokio::spawn(async move {
391 loop {
392 let message = cancel_control
393 .recv_any_filtered(cancel_wait, |_, msg| {
394 matches!(msg, FileTransferMessage::Cancel(_))
395 })
396 .await;
397 let (session_id, _, message) = match message {
398 Ok(message) => message,
399 Err(FileTransferError::Timeout) => continue,
400 Err(FileTransferError::Transport(_)) => break,
401 Err(_) => continue,
402 };
403 let FileTransferMessage::Cancel(request) = message else {
404 continue;
405 };
406 let mut sessions = cancel_sessions.write().await;
407 if let Some(session) = sessions.get_mut(&session_id) {
408 let was_active = matches!(
409 session.state,
410 TransferState::Initializing
411 | TransferState::InProgress
412 | TransferState::Paused
413 | TransferState::Verifying
414 );
415 session.state = TransferState::Cancelled;
416 session.error = Some(request.reason);
417 session.updated_at = std::time::SystemTime::now();
418 session.control.set_state(TransferControlState::Cancelled);
419 if session.options.resumable {
420 let _ = cancel_persistence.save(session).await;
421 }
422 if let Some(metrics) = &cancel_metrics {
423 metrics.record_transfer(session.kind, "cancelled");
424 if was_active {
425 metrics.active_transfers.dec();
426 }
427 }
428 }
429 }
430 });
431
432 Ok(FileTransferServiceImpl {
433 source_node,
434 backend,
435 control,
436 stream_opener,
437 receive_handler,
438 config,
439 path_validator,
440 sessions,
441 persistence,
442 metrics,
443 })
444 }
445
446 pub fn control(&self) -> Arc<ControlDispatcher> {
451 Arc::clone(&self.control)
452 }
453
454 pub fn receive_handler(&self) -> Arc<ReceiveHandler> {
459 Arc::clone(&self.receive_handler)
460 }
461
462 pub fn path_validator(&self) -> &PathValidator {
467 &self.path_validator
468 }
469
470 async fn record_session(&self, session: TransferSession) {
471 let mut sessions = self.sessions.write().await;
472 sessions.insert(session.id, session);
473 }
474
475 async fn load_session(
476 &self,
477 session_id: TransferSessionId,
478 ) -> Result<TransferSession, FileTransferError> {
479 if let Some(session) = self.sessions.read().await.get(&session_id).cloned() {
480 return Ok(session);
481 }
482 let session = self.persistence.load(session_id).await?;
483 self.record_session(session.clone()).await;
484 Ok(session)
485 }
486
487 fn select_target(&self, targets: Option<Vec<NodeId>>) -> Result<NodeId, FileTransferError> {
488 let mut candidates = match targets {
489 Some(list) => list,
490 None => self
491 .backend
492 .connected_peers()
493 .into_iter()
494 .map(|(node_id, _)| node_id)
495 .collect(),
496 };
497 candidates.retain(|node_id| *node_id != self.source_node);
498 match candidates.len() {
499 1 => Ok(candidates[0]),
500 0 => Err(FileTransferError::Internal(
501 "no target nodes available for sync".into(),
502 )),
503 _ => Err(FileTransferError::Internal(
504 "sync requires exactly one target node".into(),
505 )),
506 }
507 }
508
509 fn list_filter_match(entry: &FileInfo, options: &ListOptions) -> bool {
510 if options.files_only
511 && !matches!(
512 entry.file_type,
513 alopex_chirps_wire::file_transfer::FileType::File
514 )
515 {
516 return false;
517 }
518 if options.directories_only
519 && !matches!(
520 entry.file_type,
521 alopex_chirps_wire::file_transfer::FileType::Directory
522 )
523 {
524 return false;
525 }
526 if let Some(pattern) = &options.pattern {
527 let name = Path::new(&entry.path)
528 .file_name()
529 .and_then(|name| name.to_str())
530 .unwrap_or("");
531 if !name.contains(pattern) {
532 return false;
533 }
534 }
535 true
536 }
537
538 fn sort_files(files: &mut [FileInfo], sort_by: SortBy) {
539 match sort_by {
540 SortBy::Name => files.sort_by(|a, b| a.path.cmp(&b.path)),
541 SortBy::Size => files.sort_by_key(|entry| entry.size),
542 SortBy::ModifiedTime => files.sort_by_key(|entry| entry.modified_at),
543 }
544 }
545
546 async fn collect_active_transfers(&self) -> Vec<TransferSessionInfo> {
547 let sessions = self.sessions.read().await;
548 sessions
549 .values()
550 .filter(|session| {
551 matches!(
552 session.state,
553 TransferState::Initializing | TransferState::InProgress | TransferState::Paused
554 )
555 })
556 .map(TransferSessionInfo::from)
557 .collect()
558 }
559}
560
561#[async_trait]
562impl FileTransferService for FileTransferServiceImpl {
563 async fn send_file(
564 &self,
565 target: NodeId,
566 source_path: &Path,
567 dest_path: &Path,
568 options: TransferOptions,
569 ) -> Result<TransferHandle, FileTransferError> {
570 let result = send_file_with_context(
571 Arc::clone(&self.control),
572 Arc::clone(&self.stream_opener),
573 self.config.clone(),
574 self.source_node,
575 target,
576 source_path,
577 dest_path,
578 options.clone(),
579 crate::session::TransferKind::Send,
580 Some(Arc::clone(&self.sessions)),
581 Some(Arc::clone(&self.persistence)),
582 self.metrics.clone(),
583 None,
584 true,
585 )
586 .await?;
587 Ok(result.handle)
588 }
589
590 async fn broadcast_file(
591 &self,
592 source_path: &Path,
593 dest_path: &Path,
594 options: TransferOptions,
595 ) -> Result<BroadcastHandle, FileTransferError> {
596 let targets: Vec<NodeId> = self
597 .backend
598 .connected_peers()
599 .into_iter()
600 .map(|(node_id, _)| node_id)
601 .filter(|node_id| *node_id != self.source_node)
602 .collect();
603 if targets.is_empty() {
604 return Err(FileTransferError::Internal(
605 "no connected peers available for broadcast".into(),
606 ));
607 }
608 let result = broadcast_file_with_context(
609 Arc::clone(&self.control),
610 Arc::clone(&self.stream_opener),
611 self.config.clone(),
612 self.source_node,
613 targets,
614 source_path,
615 dest_path,
616 options.clone(),
617 Some(Arc::clone(&self.sessions)),
618 Some(Arc::clone(&self.persistence)),
619 self.metrics.clone(),
620 )
621 .await?;
622 Ok(result.handle)
623 }
624
625 async fn sync_file(
626 &self,
627 local_path: &Path,
628 remote_path: &Path,
629 targets: Option<Vec<NodeId>>,
630 options: SyncOptions,
631 ) -> Result<SyncHandle, FileTransferError> {
632 let target = self.select_target(targets)?;
633 sync_file_with_context(
634 Arc::clone(&self.control),
635 Arc::clone(&self.stream_opener),
636 Arc::clone(&self.receive_handler),
637 self.config.clone(),
638 self.source_node,
639 target,
640 local_path,
641 remote_path,
642 options,
643 Some(Arc::clone(&self.sessions)),
644 Some(Arc::clone(&self.persistence)),
645 self.metrics.clone(),
646 )
647 .await
648 }
649
650 async fn exists(&self, target: NodeId, path: &Path) -> Result<bool, FileTransferError> {
651 let session_id = TransferSessionId::new();
652 self.control
653 .send_message(
654 target,
655 session_id,
656 FileTransferMessage::ExistsRequest(ExistsRequest {
657 path: path.display().to_string(),
658 }),
659 )
660 .await?;
661 let (_, message) = self
662 .control
663 .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
664 matches!(msg, FileTransferMessage::ExistsResponse(_))
665 })
666 .await?;
667 match message {
668 FileTransferMessage::ExistsResponse(response) => Ok(response.exists),
669 _ => Err(FileTransferError::Internal(
670 "unexpected exists response message".into(),
671 )),
672 }
673 }
674
675 async fn remove(
676 &self,
677 target: NodeId,
678 path: &Path,
679 options: RemoveOptions,
680 ) -> Result<(), FileTransferError> {
681 let session_id = TransferSessionId::new();
682 self.control
683 .send_message(
684 target,
685 session_id,
686 FileTransferMessage::RemoveRequest(RemoveRequest {
687 path: path.display().to_string(),
688 recursive: options.recursive,
689 }),
690 )
691 .await?;
692 let (_, message) = self
693 .control
694 .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
695 matches!(msg, FileTransferMessage::RemoveResponse(_))
696 })
697 .await?;
698 match message {
699 FileTransferMessage::RemoveResponse(response) => {
700 if response.success {
701 Ok(())
702 } else {
703 Err(FileTransferError::Internal(
704 response.error.unwrap_or_else(|| "remove failed".into()),
705 ))
706 }
707 }
708 _ => Err(FileTransferError::Internal(
709 "unexpected remove response message".into(),
710 )),
711 }
712 }
713
714 async fn metadata(
715 &self,
716 target: NodeId,
717 path: &Path,
718 ) -> Result<FileMetadata, FileTransferError> {
719 let session_id = TransferSessionId::new();
720 self.control
721 .send_message(
722 target,
723 session_id,
724 FileTransferMessage::MetadataRequest(MetadataRequest {
725 path: path.display().to_string(),
726 }),
727 )
728 .await?;
729 let (_, message) = self
730 .control
731 .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
732 matches!(msg, FileTransferMessage::MetadataResponse(_))
733 })
734 .await?;
735 match message {
736 FileTransferMessage::MetadataResponse(response) => {
737 if let Some(error) = response.error {
738 return Err(FileTransferError::Internal(error));
739 }
740 if !response.found {
741 return Err(FileTransferError::FileNotFound(path.display().to_string()));
742 }
743 let metadata = response.metadata.ok_or_else(|| {
744 FileTransferError::Internal("metadata response missing metadata".into())
745 })?;
746 let mut metadata = from_wire_file_metadata(metadata);
747 if metadata.size.is_none() {
748 metadata.size = response.size;
749 }
750 Ok(metadata)
751 }
752 _ => Err(FileTransferError::Internal(
753 "unexpected metadata response message".into(),
754 )),
755 }
756 }
757
758 async fn list_files(
759 &self,
760 target: NodeId,
761 dir_path: &Path,
762 options: ListOptions,
763 ) -> Result<Vec<FileInfo>, FileTransferError> {
764 let session_id = TransferSessionId::new();
765 self.control
766 .send_message(
767 target,
768 session_id,
769 FileTransferMessage::ListRequest(ListRequest {
770 path: dir_path.display().to_string(),
771 recursive: options.recursive,
772 include_hidden: options.include_hidden,
773 }),
774 )
775 .await?;
776 let (_, message) = self
777 .control
778 .recv_filtered(session_id, self.config.manifest_timeout, |msg| {
779 matches!(msg, FileTransferMessage::ListResponse(_))
780 })
781 .await?;
782 let mut files = match message {
783 FileTransferMessage::ListResponse(response) => {
784 if let Some(error) = response.error {
785 return Err(FileTransferError::Internal(error));
786 }
787 response.files
788 }
789 _ => {
790 return Err(FileTransferError::Internal(
791 "unexpected list response message".into(),
792 ));
793 }
794 };
795
796 files.retain(|entry| Self::list_filter_match(entry, &options));
797 Self::sort_files(&mut files, options.sort_by);
798 if options.limit > 0 && files.len() > options.limit {
799 files.truncate(options.limit);
800 }
801 Ok(files)
802 }
803
804 async fn cancel_transfer(
805 &self,
806 session_id: TransferSessionId,
807 ) -> Result<(), FileTransferError> {
808 let session = self.load_session(session_id).await?;
809 let targets = if session.target_nodes.is_empty() {
810 vec![session.source_node]
811 } else {
812 session.target_nodes.clone()
813 };
814 for target in targets {
815 self.control
816 .send_message(
817 target,
818 session_id,
819 FileTransferMessage::Cancel(CancelRequest {
820 reason: "cancelled by user".into(),
821 }),
822 )
823 .await?;
824 }
825
826 let mut sessions = self.sessions.write().await;
827 if let Some(session) = sessions.get_mut(&session_id) {
828 let was_active = matches!(
829 session.state,
830 TransferState::Initializing
831 | TransferState::InProgress
832 | TransferState::Paused
833 | TransferState::Verifying
834 );
835 if session.state != TransferState::Cancelled {
836 session.transition_to(TransferState::Cancelled)?;
837 }
838 session.control.set_state(TransferControlState::Cancelled);
839 if session.options.resumable {
840 self.persistence.save(session).await?;
841 }
842 if was_active && let Some(metrics) = &self.metrics {
843 metrics.record_transfer(session.kind, "cancelled");
844 metrics.active_transfers.dec();
845 }
846 }
847 Ok(())
848 }
849
850 async fn pause_transfer(&self, session_id: TransferSessionId) -> Result<(), FileTransferError> {
851 let mut sessions = self.sessions.write().await;
852 let session = sessions
853 .get_mut(&session_id)
854 .ok_or(FileTransferError::SessionNotFound(session_id))?;
855 if session.state != TransferState::Paused {
856 session.transition_to(TransferState::Paused)?;
857 session.control.set_state(TransferControlState::Paused);
858 if session.options.resumable {
859 self.persistence.save(session).await?;
860 }
861 }
862 Ok(())
863 }
864
865 async fn resume_transfer(
866 &self,
867 session_id: TransferSessionId,
868 ) -> Result<TransferHandle, FileTransferError> {
869 if self.sessions.read().await.contains_key(&session_id) {
870 let mut sessions = self.sessions.write().await;
871 let session = sessions
872 .get_mut(&session_id)
873 .ok_or(FileTransferError::SessionNotFound(session_id))?;
874 if session.state == TransferState::Paused {
875 session.transition_to(TransferState::InProgress)?;
876 session.control.set_state(TransferControlState::Running);
877 }
878
879 let handle = TransferHandle::new(session.manifest.file_size);
880 let completed_bytes = session
881 .chunk_tracker
882 .completed
883 .iter()
884 .filter_map(|index| session.manifest.chunks.get(*index as usize))
885 .map(|meta| meta.size as u64)
886 .sum();
887 let completed_chunks = session.chunk_tracker.completed.len() as u32;
888 handle
889 .update_progress(completed_bytes, completed_chunks)
890 .await;
891
892 if session.options.resumable {
893 self.persistence.save(session).await?;
894 }
895
896 return Ok(handle);
897 }
898
899 let session = self.persistence.load(session_id).await?;
900 if session.source_node != self.source_node {
901 return Err(FileTransferError::Internal(
902 "resume is only supported for sender sessions".into(),
903 ));
904 }
905 let target =
906 session.target_nodes.first().copied().ok_or_else(|| {
907 FileTransferError::Internal("resume session missing target".into())
908 })?;
909 let source_path = session.source_path.clone();
910 let dest_path = session.dest_path.clone();
911 let result = send_file_with_context(
912 Arc::clone(&self.control),
913 Arc::clone(&self.stream_opener),
914 self.config.clone(),
915 session.source_node,
916 target,
917 &source_path,
918 &dest_path,
919 session.options.clone(),
920 session.kind,
921 Some(Arc::clone(&self.sessions)),
922 Some(Arc::clone(&self.persistence)),
923 self.metrics.clone(),
924 Some(session),
925 true,
926 )
927 .await?;
928
929 Ok(result.handle)
930 }
931
932 fn active_transfers(&self) -> Vec<TransferSessionInfo> {
933 if let Ok(handle) = tokio::runtime::Handle::try_current()
934 && matches!(
935 handle.runtime_flavor(),
936 tokio::runtime::RuntimeFlavor::MultiThread
937 )
938 {
939 return tokio::task::block_in_place(|| {
940 handle.block_on(self.collect_active_transfers())
941 });
942 }
943
944 if let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
945 .enable_all()
946 .build()
947 {
948 return runtime.block_on(self.collect_active_transfers());
949 }
950
951 Vec::new()
952 }
953}