1use crate::{Error, Result};
18use futures::FutureExt;
19use sos_core::{ExternalFile, Origin, Paths};
20use sos_protocol::{
21 network_client::NetworkRetry,
22 transfer::{
23 CancelReason, FileOperation, FileSyncClient,
24 FileTransferQueueRequest, ProgressChannel, TransferOperation,
25 },
26 SyncClient,
27};
28use sos_vfs as vfs;
29use std::{
30 collections::hash_map::DefaultHasher,
31 hash::{Hash, Hasher},
32 sync::Arc,
33 time::{Duration, SystemTime},
34};
35use tokio::{
36 sync::{broadcast, mpsc, oneshot, watch, Mutex, RwLock, Semaphore},
37 time,
38};
39
40mod inflight;
41mod operations;
42
43pub use inflight::{
44 InflightNotification, InflightRequest, InflightTransfers,
45};
46use std::collections::{HashSet, VecDeque};
47
48pub type CancelChannel = watch::Sender<CancelReason>;
53
54#[derive(Debug, Clone, Hash, Eq, PartialEq)]
56pub enum TransferError {
57 RetryExhausted,
59 FileMissing,
62 MovedMissing,
64 Canceled(CancelReason),
66}
67
68#[derive(Debug, Clone, Hash, Eq, PartialEq)]
70enum TransferResult {
71 Done,
73 Retry,
75 Fatal(TransferError),
77}
78
79#[derive(Debug, Clone, Hash, Eq, PartialEq)]
81struct TransferOutcome {
82 transfer_id: u64,
83 request_id: u64,
84 file: ExternalFile,
85 operation: TransferOperation,
86 result: TransferResult,
87}
88
89#[derive(Debug)]
91struct TransferFailure {
92 time: SystemTime,
93 file: ExternalFile,
94 operation: TransferOperation,
95}
96
97impl From<TransferFailure> for FileOperation {
98 fn from(value: TransferFailure) -> Self {
99 FileOperation(value.file, value.operation)
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct FileTransferSettings {
106 pub concurrent_requests: usize,
108
109 pub failure_interval: Duration,
112
113 pub failure_expiry: Duration,
116
117 pub retry: NetworkRetry,
122}
123
124impl Default for FileTransferSettings {
125 fn default() -> Self {
126 Self {
127 concurrent_requests: 4,
128
129 #[cfg(debug_assertions)]
130 failure_interval: Duration::from_millis(250),
131 #[cfg(not(debug_assertions))]
132 failure_interval: Duration::from_millis(30000),
133
134 #[cfg(debug_assertions)]
135 failure_expiry: Duration::from_millis(0),
136 #[cfg(not(debug_assertions))]
137 failure_expiry: Duration::from_secs(180),
138
139 #[cfg(debug_assertions)]
142 retry: NetworkRetry::new(4, 0),
143 #[cfg(not(debug_assertions))]
145 retry: NetworkRetry::default(),
146 }
147 }
148}
149
150impl FileTransferSettings {
151 pub fn new() -> Self {
153 Self {
154 concurrent_requests: 4,
155 failure_interval: Duration::from_millis(30000),
156 failure_expiry: Duration::from_secs(180),
157 retry: NetworkRetry::default(),
158 }
159 }
160}
161
162pub(crate) struct FileTransfersHandle {
165 shutdown_tx: mpsc::Sender<()>,
166 shutdown_rx: oneshot::Receiver<()>,
167 pub(crate) queue_tx: mpsc::Sender<FileTransferQueueRequest>,
168}
169
170impl FileTransfersHandle {
171 fn new() -> (
173 Self,
174 mpsc::Receiver<()>,
175 oneshot::Sender<()>,
176 mpsc::Receiver<FileTransferQueueRequest>,
177 ) {
178 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
179 let (ack_tx, ack_rx) = oneshot::channel::<()>();
180 let (queue_tx, queue_rx) =
181 mpsc::channel::<FileTransferQueueRequest>(32);
182
183 (
184 Self {
185 shutdown_tx,
186 shutdown_rx: ack_rx,
187 queue_tx,
188 },
189 shutdown_rx,
190 ack_tx,
191 queue_rx,
192 )
193 }
194
195 pub async fn send(&self, items: FileTransferQueueRequest) {
197 if let Err(error) = self.queue_tx.send(items).await {
198 tracing::warn!(
199 error = ?error,
200 "file_transfers::queue_send_error",
201 );
202 }
203 }
204
205 pub async fn shutdown(self) {
207 if let Err(error) = self.shutdown_tx.send(()).await {
208 tracing::warn!(
209 error = ?error,
210 "file_transfers::shutdown_tx::send_error",
211 );
212 }
213 if let Err(error) = self.shutdown_rx.await {
214 tracing::warn!(
215 error = ?error,
216 "file_transfers::shutdown_tx::recv_error",
217 );
218 }
219 }
220}
221
222pub struct FileTransfers<C>
228where
229 C: SyncClient<Error = sos_protocol::Error>
230 + FileSyncClient<Error = sos_protocol::Error>
231 + Clone
232 + Send
233 + Sync
234 + PartialEq
235 + 'static,
236{
237 clients: Arc<Mutex<Vec<C>>>,
238 settings: Arc<FileTransferSettings>,
239 queue: Arc<RwLock<VecDeque<FileOperation>>>,
240 failures: Arc<Mutex<VecDeque<TransferFailure>>>,
241 pub(super) inflight: Arc<InflightTransfers>,
242}
243
244impl<C> FileTransfers<C>
245where
246 C: SyncClient<Error = sos_protocol::Error>
247 + FileSyncClient<Error = sos_protocol::Error>
248 + Clone
249 + Send
250 + Sync
251 + PartialEq
252 + 'static,
253{
254 pub fn new(clients: Vec<C>, settings: FileTransferSettings) -> Self {
256 let queue = VecDeque::new();
257 let inflight = InflightTransfers::new();
258
259 Self {
260 clients: Arc::new(Mutex::new(clients)),
261 settings: Arc::new(settings),
262 queue: Arc::new(RwLock::new(queue)),
263 failures: Arc::new(Mutex::new(Default::default())),
264 inflight: Arc::new(inflight),
265 }
266 }
267
268 pub(super) async fn add_client(&self, client: C) {
270 let mut writer = self.clients.lock().await;
271 writer.push(client);
272 }
273
274 pub(super) async fn remove_client(&self, client: &C) {
276 let mut writer = self.clients.lock().await;
277 if let Some(pos) = writer.iter().position(|c| c == client) {
278 writer.remove(pos);
279 }
280 }
281
282 pub(super) fn run(&mut self, paths: Arc<Paths>) -> FileTransfersHandle {
284 let (handle, mut shutdown_rx, shutdown_tx, mut queue_rx) =
285 FileTransfersHandle::new();
286
287 let queue_tx = handle.queue_tx.clone();
288
289 let queue_drained = Arc::new(Mutex::new(false));
290
291 let clients = self.clients.clone();
292 let queue = self.queue.clone();
293 let inflight = self.inflight.clone();
294 let cancel_inflight = self.inflight.clone();
295 let settings = self.settings.clone();
296 let failures = self.failures.clone();
297 let semaphore =
298 Arc::new(Semaphore::new(self.settings.concurrent_requests));
299
300 let failure_expiry = settings.failure_expiry;
301 let mut interval = time::interval(settings.failure_interval);
302
303 tokio::task::spawn(async move {
304 loop {
305 tokio::select! {
306 biased;
307 signal = shutdown_rx.recv().fuse() => {
308 if signal.is_some() {
309 tracing::debug!("file_transfers::shutting_down");
310
311 let mut writer = queue.write().await;
314 *writer = Default::default();
315
316 cancel_inflight.cancel_all(CancelReason::Shutdown).await;
318
319 let _ = shutdown_tx.send(());
320 tracing::debug!("file_transfers::shut_down");
321
322 break;
323 }
324 }
325 _ = interval.tick() => {
326 let mut failures = failures.lock().await;
327 let mut items = Vec::new();
328 while let Some(failure) = failures.pop_back() {
329 if let Ok(elapsed) = failure.time.elapsed() {
330 if elapsed >= failure_expiry {
331 items.push(failure.into());
332 } else {
333 failures.push_front(failure);
334 }
335 }
336 }
337
338 if !items.is_empty() {
339 if let Err(error) = queue_tx.send(items).await {
340 tracing::error!(error = ?error, "file_transfers::reinsert");
341 }
342 }
343 }
344 Some(events) = queue_rx.recv() => {
345 let num_clients = {
348 let reader = clients.lock().await;
349 reader.len()
350 };
351
352 if num_clients == 0 {
355 continue;
356 }
357
358 let num_queued = {
359
360 for event in &events {
364 normalize(
365 &event.0,
366 &event.1,
367 queue.clone(),
368 failures.clone(),
369 inflight.clone(),
370 ).await;
371 }
372
373 let mut writer = queue.write().await;
374 for event in events {
375 if !writer.contains(&event) {
376 writer.push_front(event);
377 }
378 }
379 writer.len()
380 };
381
382 let is_running = {
383 let reader = queue_drained.lock().await;
384 *reader
385 };
386
387 tracing::debug!(
388 num_clients = %num_clients,
389 num_queued = %num_queued,
390 is_running = %is_running,
391 "file_transfers::event");
392
393 if num_queued > 0 && !is_running {
396 let clients = {
400 let reader = clients.lock().await;
401 reader.iter().cloned().collect::<Vec<_>>()
402 };
403
404 if !clients.is_empty() {
405 {
406 let mut writer = queue_drained.lock().await;
407 *writer = true;
408 }
409
410 let semaphore = semaphore.clone();
411 let queue = queue.clone();
412 let failures = failures.clone();
413 let inflight = inflight.clone();
414 let settings = settings.clone();
415 let paths = paths.clone();
416 let drained = queue_drained.clone();
417
418 tokio::task::spawn(async move {
422
423 let res = Self::spawn_tasks(
426 paths.clone(),
427 semaphore.clone(),
428 queue.clone(),
429 failures.clone(),
430 inflight.clone(),
431 settings.clone(),
432 clients.clone(),
433 )
434 .await;
435 if let Err(error) = res {
436 tracing::error!(error = ?error);
437 }
438
439 let mut writer = drained.lock().await;
441 *writer = false;
442
443 });
444 }
445
446 }
447 }
448 }
449 }
450
451 Ok::<_, Error>(())
452 });
453
454 handle
455 }
456
457 async fn spawn_tasks(
458 paths: Arc<Paths>,
459 semaphore: Arc<Semaphore>,
460 queue: Arc<RwLock<VecDeque<FileOperation>>>,
461 failures: Arc<Mutex<VecDeque<TransferFailure>>>,
462 inflight: Arc<InflightTransfers>,
463 settings: Arc<FileTransferSettings>,
464 clients: Vec<C>,
465 ) -> Result<()> {
466 let mut remaining = Self::consume_queue(
467 paths.clone(),
468 semaphore.clone(),
469 queue.clone(),
470 failures.clone(),
471 inflight.clone(),
472 settings.clone(),
473 clients.as_slice(),
474 )
475 .await?;
476
477 while let Some(_) = remaining {
478 remaining = Self::consume_queue(
479 paths.clone(),
480 semaphore.clone(),
481 queue.clone(),
482 failures.clone(),
483 inflight.clone(),
484 settings.clone(),
485 clients.as_slice(),
486 )
487 .await?;
488 }
489
490 Ok(())
491 }
492
493 async fn consume_queue(
494 paths: Arc<Paths>,
495 semaphore: Arc<Semaphore>,
496 queue: Arc<RwLock<VecDeque<FileOperation>>>,
497 failures: Arc<Mutex<VecDeque<TransferFailure>>>,
498 inflight: Arc<InflightTransfers>,
499 settings: Arc<FileTransferSettings>,
500 clients: &[C],
501 ) -> Result<Option<()>> {
502 let mut requests = Vec::new();
503 let mut downloads = Vec::new();
504
505 loop {
506 if semaphore.available_permits() == 0 {
508 break;
509 }
510
511 let item = {
512 let mut queue = queue.write().await;
513 queue.pop_back()
514 };
515
516 if item.is_none() {
518 break;
519 }
520
521 let FileOperation(file, op) = item.unwrap();
522
523 tracing::debug!(
525 file = ?file,
526 op = ?op,
527 "file_transfers::queue",
528 );
529
530 match op {
531 TransferOperation::Download => {
534 let inflight = inflight.clone();
535 let settings = settings.clone();
536 let paths = paths.clone();
537 let clients = clients.to_vec().clone();
538 let permit = semaphore.clone();
539 let jh = tokio::task::spawn(async move {
540 let mut results = Vec::new();
541 for client in clients {
542 let _permit = permit.acquire().await.unwrap();
543 let request_id = inflight.request_id();
544
545 let outcome = Self::run_client_operation(
546 request_id,
547 file,
548 op,
549 client.clone(),
550 settings.clone(),
551 paths.clone(),
552 inflight.clone(),
553 )
554 .await?;
555
556 let is_done = matches!(
557 &outcome.result,
558 TransferResult::Done
559 );
560 results.push(outcome);
561 if is_done {
562 break;
563 }
564 }
565 Ok::<_, Error>(results)
566 });
567 downloads.push(jh);
568 }
569 _ => {
571 for client in clients.to_vec() {
572 let inflight = inflight.clone();
573 let settings = settings.clone();
574 let paths = paths.clone();
575 let permit = semaphore.clone();
576 let jh = tokio::task::spawn(async move {
577 let _permit = permit.acquire().await.unwrap();
578 let request_id = inflight.request_id();
579 let outcome = Self::run_client_operation(
580 request_id,
581 file,
582 op,
583 client.clone(),
584 settings.clone(),
585 paths.clone(),
586 inflight.clone(),
587 )
588 .await?;
589
590 Ok::<_, Error>(outcome)
591 });
592 requests.push(jh);
593 }
594 }
595 }
596 }
597
598 let request_paths = paths.clone();
599 let request_queue = queue.clone();
600 let download_inflight = inflight.clone();
601 let download_failures = failures.clone();
602
603 let requests_task = async move {
604 let mut results = Vec::new();
605 for jh in requests {
606 let outcome = jh.await.unwrap()?;
607 if let TransferResult::Done = &outcome.result {
608 let notify = InflightNotification::TransferDone {
609 transfer_id: outcome.transfer_id,
610 request_id: outcome.request_id,
611 };
612 notify_listeners(notify, &inflight.notifications).await;
613 }
614
615 results.push(outcome);
616 }
617
618 let done = results
619 .iter()
620 .all(|o| matches!(o.result, TransferResult::Done));
621
622 if !done {
623 let moved_missing = results
630 .iter()
631 .filter(|o| {
632 matches!(
633 o.result,
634 TransferResult::Fatal(
635 TransferError::MovedMissing
636 )
637 )
638 })
639 .cloned()
640 .collect::<HashSet<_>>();
641 for outcome in moved_missing {
642 if let TransferOperation::Move(dest) = &outcome.operation
643 {
644 let path = request_paths.into_file_path(dest);
645
646 if vfs::try_exists(path).await? {
647 let item = FileOperation(
648 dest.clone(),
649 TransferOperation::Upload,
650 );
651 let mut queue = request_queue.write().await;
652 if !queue.contains(&item) {
653 queue.push_back(item);
654 }
655 }
656 }
657 }
658
659 for (file, op) in results
660 .into_iter()
661 .filter(|o| {
662 matches!(
663 o.result,
664 TransferResult::Fatal(
665 TransferError::RetryExhausted
666 )
667 )
668 })
669 .map(|o| (o.file, o.operation))
670 {
671 let item = TransferFailure {
672 time: SystemTime::now(),
673 file,
674 operation: op,
675 };
676 let mut failures = failures.lock().await;
677 failures.push_front(item);
678 }
679 }
680
681 Ok::<_, Error>(())
682 };
683
684 let downloads_task = async move {
685 let mut results = Vec::new();
686 for jh in downloads {
687 let download_outcomes = jh.await.unwrap()?;
688 let done_requests = download_outcomes
689 .iter()
690 .filter(|o| matches!(o.result, TransferResult::Done))
691 .collect::<Vec<_>>();
692
693 for outcome in done_requests {
694 if let TransferResult::Done = &outcome.result {
695 let notify = InflightNotification::TransferDone {
696 transfer_id: outcome.transfer_id,
697 request_id: outcome.request_id,
698 };
699 notify_listeners(
700 notify,
701 &download_inflight.notifications,
702 )
703 .await;
704 }
705 }
706
707 results.push(download_outcomes);
708 }
709
710 let results = results.into_iter().flatten().collect::<Vec<_>>();
711
712 for (file, operation) in results
713 .into_iter()
714 .filter(|o| {
715 matches!(
716 o.result,
717 TransferResult::Fatal(TransferError::RetryExhausted)
718 )
719 })
720 .map(|o| (o.file, o.operation))
721 {
722 let item = TransferFailure {
723 time: SystemTime::now(),
724 file,
725 operation,
726 };
727 let mut failures = download_failures.lock().await;
728 failures.push_front(item);
729 }
730
731 Ok::<_, Error>(())
732 };
733
734 let results = tokio::join!(requests_task, downloads_task);
735 results.0?;
736 results.1?;
737
738 Ok({
739 let queue = queue.read().await;
740 if queue.is_empty() {
741 None
742 } else {
743 Some(())
744 }
745 })
746 }
747
748 async fn run_client_operation(
749 request_id: u64,
750 file: ExternalFile,
751 op: TransferOperation,
752 client: C,
753 settings: Arc<FileTransferSettings>,
754 paths: Arc<Paths>,
755 inflight_transfers: Arc<InflightTransfers>,
756 ) -> Result<TransferOutcome> {
757 tracing::debug!(
758 request_id = %request_id,
759 op = ?op,
760 "file_transfers::run",
761 );
762
763 let transfer_id = compute_transfer_id(&file, &op, client.origin());
764
765 let (cancel_tx, cancel_rx) =
766 watch::channel::<CancelReason>(Default::default());
767 let progress_tx = match &op {
768 TransferOperation::Upload | TransferOperation::Download => {
769 let (progress_tx, mut progress_rx): (ProgressChannel, _) =
770 mpsc::channel(16);
771
772 let progress_transfers = inflight_transfers.clone();
773
774 tokio::task::spawn(async move {
778 while let Some(event) = progress_rx.recv().await {
779 let notify = InflightNotification::TransferUpdate {
780 request_id,
781 transfer_id,
782 bytes_transferred: event.0,
783 bytes_total: event.1,
784 };
785 notify_listeners(
786 notify,
787 &progress_transfers.notifications,
788 )
789 .await;
790 }
791 });
792
793 Some(progress_tx)
794 }
795 _ => None,
796 };
797
798 {
799 let request = InflightRequest {
800 origin: client.origin().clone(),
801 file,
802 operation: op,
803 cancel: cancel_tx.clone(),
804 };
805
806 tracing::debug!(
807 request_id = %request_id,
808 "inflight_transfer::insert",
809 );
810
811 let notify = InflightNotification::TransferAdded {
812 transfer_id,
813 request_id,
814 origin: request.origin.clone(),
815 file: request.file.clone(),
816 operation: request.operation.clone(),
817 };
818
819 inflight_transfers
820 .insert_transfer(request_id, request)
821 .await;
822
823 notify_listeners(notify, &inflight_transfers.notifications).await;
824 };
825
826 tracing::trace!(
827 op = ?op,
828 url = %client.origin().url(),
829 "file_transfer"
830 );
831
832 let retry = settings.retry.clone_reset();
833 let result = match &op {
834 TransferOperation::Upload => {
835 let operation = operations::UploadOperation::new(
836 client,
837 paths,
838 transfer_id,
839 request_id,
840 inflight_transfers.clone(),
841 retry,
842 cancel_tx,
843 );
844 operation
845 .run(&file, progress_tx.unwrap(), cancel_rx)
846 .await?
847 }
848 TransferOperation::Download => {
849 let operation = operations::DownloadOperation::new(
850 client,
851 paths,
852 transfer_id,
853 request_id,
854 inflight_transfers.clone(),
855 retry,
856 cancel_tx,
857 );
858 operation
859 .run(&file, progress_tx.unwrap(), cancel_rx)
860 .await?
861 }
862 TransferOperation::Delete => {
863 let operation = operations::DeleteOperation::new(
864 client,
865 transfer_id,
866 request_id,
867 inflight_transfers.clone(),
868 retry,
869 cancel_tx,
870 );
871 operation.run(&file).await?
872 }
873 TransferOperation::Move(dest) => {
874 let operation = operations::MoveOperation::new(
875 client,
876 transfer_id,
877 request_id,
878 inflight_transfers.clone(),
879 retry,
880 cancel_tx,
881 );
882 operation.run(&file, dest).await?
883 }
884 };
885
886 {
887 tracing::debug!(
888 request_id = %request_id,
889 "inflight_transfer::remove",
890 );
891
892 inflight_transfers.remove_transfer(&request_id).await;
893 }
894
895 if let TransferResult::Fatal(reason) = &result {
896 let notify = InflightNotification::TransferError {
897 transfer_id,
898 request_id,
899 reason: reason.clone(),
900 };
901 notify_listeners(notify, &inflight_transfers.notifications).await;
902 }
903
904 Ok(TransferOutcome {
905 transfer_id,
906 request_id,
907 file,
908 operation: op,
909 result,
910 })
911 }
912}
913
914async fn notify_listeners(
915 notify: InflightNotification,
916 notifier: &broadcast::Sender<InflightNotification>,
917) {
918 if notifier.receiver_count() > 0 {
919 let _ = notifier.send(notify);
920 }
921}
922
923fn compute_transfer_id(
929 file: &ExternalFile,
930 operation: &TransferOperation,
931 origin: &Origin,
932) -> u64 {
933 let mut hasher = DefaultHasher::new();
934 file.hash(&mut hasher);
935 operation.hash(&mut hasher);
936 origin.hash(&mut hasher);
937 hasher.finish()
938}
939
940async fn normalize(
941 file: &ExternalFile,
942 operation: &TransferOperation,
943 queue: Arc<RwLock<VecDeque<FileOperation>>>,
944 failures: Arc<Mutex<VecDeque<TransferFailure>>>,
945 inflight: Arc<InflightTransfers>,
946) {
947 match operation {
948 TransferOperation::Delete | TransferOperation::Move(_) => {
949 tracing::debug!(
950 op = ?operation,
951 "file_transfers::normalize_operation",
952 );
953
954 let mut queue = queue.write().await;
956 queue.retain(|item| {
957 let is_transfer_op = matches!(
958 item.1,
959 TransferOperation::Upload | TransferOperation::Download
960 );
961 if &item.0 == file && is_transfer_op {
962 false
963 } else {
964 true
965 }
966 });
967
968 let mut failures = failures.lock().await;
970 failures.retain(|failure| {
971 let is_transfer_op = matches!(
972 failure.operation,
973 TransferOperation::Upload | TransferOperation::Download
974 );
975 if &failure.file == file && is_transfer_op {
976 false
977 } else {
978 true
979 }
980 });
981
982 inflight.cancel_active_transfers(file).await;
985 }
986 _ => {}
987 }
988}