sos_net/account/file_transfers/
mod.rs

1//! Queue file transfer operations and spawn tasks to
2//! perform the file transfer operations.
3//!
4//! Transfer events are received on a channel
5//! and added to a queue of pending operations which are
6//! monitored and consumed into inflight transfers.
7//!
8//! When an error occurs that may be recoverable
9//! the queued operation is moved to a failures queue
10//! which is polled periodically to see if there are
11//! failed transfers that may be retried. If a failed transfer
12//! has expired and may be retried it is moved back into the
13//! pending transfers queue.
14//!
15//! Requests are limited to the `concurrent_requests` setting guarded
16//! by a semaphore and notifications are sent via [InflightTransfers].
17use crate::{Error, Result};
18use futures::FutureExt;
19use sos_core::{ExternalFile, Origin, Paths};
20use sos_protocol::{
21    network_client::NetworkRetry,
22    transfer::{
23        CancelReason, FileOperation, FileSyncClient,
24        FileTransferQueueRequest, ProgressChannel, TransferOperation,
25    },
26    SyncClient,
27};
28use sos_vfs as vfs;
29use std::{
30    collections::hash_map::DefaultHasher,
31    hash::{Hash, Hasher},
32    sync::Arc,
33    time::{Duration, SystemTime},
34};
35use tokio::{
36    sync::{broadcast, mpsc, oneshot, watch, Mutex, RwLock, Semaphore},
37    time,
38};
39
40mod inflight;
41mod operations;
42
43pub use inflight::{
44    InflightNotification, InflightRequest, InflightTransfers,
45};
46use std::collections::{HashSet, VecDeque};
47
48/// Channel used to cancel uploads and downloads.
49///
50/// The boolean flag indicates whether the cancellation was
51/// requested by the user or not.
52pub type CancelChannel = watch::Sender<CancelReason>;
53
54/// Reason for a transfer error notification.
55#[derive(Debug, Clone, Hash, Eq, PartialEq)]
56pub enum TransferError {
57    /// Error when network retries are exhausted.
58    RetryExhausted,
59    /// Error when a file that is the target of
60    /// an upload or download is no longer on disc.
61    FileMissing,
62    /// Error when the target file for a move operation is missing.
63    MovedMissing,
64    /// Transfer was canceled.
65    Canceled(CancelReason),
66}
67
68/// Result of a file transfer operation.
69#[derive(Debug, Clone, Hash, Eq, PartialEq)]
70enum TransferResult {
71    /// Transfer completed across all clients.
72    Done,
73    /// Operation failed but can be retried.
74    Retry,
75    /// Fatal error prevents the operation from being retried.
76    Fatal(TransferError),
77}
78
79/// Outcome of an attempted transfer operation.
80#[derive(Debug, Clone, Hash, Eq, PartialEq)]
81struct TransferOutcome {
82    transfer_id: u64,
83    request_id: u64,
84    file: ExternalFile,
85    operation: TransferOperation,
86    result: TransferResult,
87}
88
89/// Logs a failed transfer attempt.
90#[derive(Debug)]
91struct TransferFailure {
92    time: SystemTime,
93    file: ExternalFile,
94    operation: TransferOperation,
95}
96
97impl From<TransferFailure> for FileOperation {
98    fn from(value: TransferFailure) -> Self {
99        FileOperation(value.file, value.operation)
100    }
101}
102
103/// Settings for file transfer operations.
104#[derive(Debug, Clone)]
105pub struct FileTransferSettings {
106    /// Number of concurrent requests.
107    pub concurrent_requests: usize,
108
109    /// Duration to poll the failure queue
110    /// for expired failures.
111    pub failure_interval: Duration,
112
113    /// Duration after which failed transfers are
114    /// re-inserted back into the transfers queue.
115    pub failure_expiry: Duration,
116
117    /// Settings for network retry.
118    ///
119    /// Network retry here applies to each individual
120    /// file transfer operation.
121    pub retry: NetworkRetry,
122}
123
124impl Default for FileTransferSettings {
125    fn default() -> Self {
126        Self {
127            concurrent_requests: 4,
128
129            #[cfg(debug_assertions)]
130            failure_interval: Duration::from_millis(250),
131            #[cfg(not(debug_assertions))]
132            failure_interval: Duration::from_millis(30000),
133
134            #[cfg(debug_assertions)]
135            failure_expiry: Duration::from_millis(0),
136            #[cfg(not(debug_assertions))]
137            failure_expiry: Duration::from_secs(180),
138
139            // Disable retry for test specs so they
140            // execute fast.
141            #[cfg(debug_assertions)]
142            retry: NetworkRetry::new(4, 0),
143            // In production use default values
144            #[cfg(not(debug_assertions))]
145            retry: NetworkRetry::default(),
146        }
147    }
148}
149
150impl FileTransferSettings {
151    /// Create production file transfer settings.
152    pub fn new() -> Self {
153        Self {
154            concurrent_requests: 4,
155            failure_interval: Duration::from_millis(30000),
156            failure_expiry: Duration::from_secs(180),
157            retry: NetworkRetry::default(),
158        }
159    }
160}
161
162/// Handle that can be used to shutdown the file transfers event
163/// loop and send events to the queue.
164pub(crate) struct FileTransfersHandle {
165    shutdown_tx: mpsc::Sender<()>,
166    shutdown_rx: oneshot::Receiver<()>,
167    pub(crate) queue_tx: mpsc::Sender<FileTransferQueueRequest>,
168}
169
170impl FileTransfersHandle {
171    /// Create a new handle.
172    fn new() -> (
173        Self,
174        mpsc::Receiver<()>,
175        oneshot::Sender<()>,
176        mpsc::Receiver<FileTransferQueueRequest>,
177    ) {
178        let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
179        let (ack_tx, ack_rx) = oneshot::channel::<()>();
180        let (queue_tx, queue_rx) =
181            mpsc::channel::<FileTransferQueueRequest>(32);
182
183        (
184            Self {
185                shutdown_tx,
186                shutdown_rx: ack_rx,
187                queue_tx,
188            },
189            shutdown_rx,
190            ack_tx,
191            queue_rx,
192        )
193    }
194
195    /// Send a collection of items to be added to the queue.
196    pub async fn send(&self, items: FileTransferQueueRequest) {
197        if let Err(error) = self.queue_tx.send(items).await {
198            tracing::warn!(
199                error = ?error,
200                "file_transfers::queue_send_error",
201            );
202        }
203    }
204
205    /// Shutdown the file transfers loop.
206    pub async fn shutdown(self) {
207        if let Err(error) = self.shutdown_tx.send(()).await {
208            tracing::warn!(
209                error = ?error,
210                "file_transfers::shutdown_tx::send_error",
211            );
212        }
213        if let Err(error) = self.shutdown_rx.await {
214            tracing::warn!(
215                error = ?error,
216                "file_transfers::shutdown_tx::recv_error",
217            );
218        }
219    }
220}
221
222/// Transfers files to multiple clients.
223///
224/// Reads operations from the queue, executes them on
225/// the list of clients and removes from the queue only
226/// when each operation has been completed on every client.
227pub struct FileTransfers<C>
228where
229    C: SyncClient<Error = sos_protocol::Error>
230        + FileSyncClient<Error = sos_protocol::Error>
231        + Clone
232        + Send
233        + Sync
234        + PartialEq
235        + 'static,
236{
237    clients: Arc<Mutex<Vec<C>>>,
238    settings: Arc<FileTransferSettings>,
239    queue: Arc<RwLock<VecDeque<FileOperation>>>,
240    failures: Arc<Mutex<VecDeque<TransferFailure>>>,
241    pub(super) inflight: Arc<InflightTransfers>,
242}
243
244impl<C> FileTransfers<C>
245where
246    C: SyncClient<Error = sos_protocol::Error>
247        + FileSyncClient<Error = sos_protocol::Error>
248        + Clone
249        + Send
250        + Sync
251        + PartialEq
252        + 'static,
253{
254    /// Create new file transfers manager.
255    pub fn new(clients: Vec<C>, settings: FileTransferSettings) -> Self {
256        let queue = VecDeque::new();
257        let inflight = InflightTransfers::new();
258
259        Self {
260            clients: Arc::new(Mutex::new(clients)),
261            settings: Arc::new(settings),
262            queue: Arc::new(RwLock::new(queue)),
263            failures: Arc::new(Mutex::new(Default::default())),
264            inflight: Arc::new(inflight),
265        }
266    }
267
268    /// Add a client target for file transfer operations.
269    pub(super) async fn add_client(&self, client: C) {
270        let mut writer = self.clients.lock().await;
271        writer.push(client);
272    }
273
274    /// Remove a client target for file transfer operations.
275    pub(super) async fn remove_client(&self, client: &C) {
276        let mut writer = self.clients.lock().await;
277        if let Some(pos) = writer.iter().position(|c| c == client) {
278            writer.remove(pos);
279        }
280    }
281
282    /// Spawn a task to transfer file operations.
283    pub(super) fn run(&mut self, paths: Arc<Paths>) -> FileTransfersHandle {
284        let (handle, mut shutdown_rx, shutdown_tx, mut queue_rx) =
285            FileTransfersHandle::new();
286
287        let queue_tx = handle.queue_tx.clone();
288
289        let queue_drained = Arc::new(Mutex::new(false));
290
291        let clients = self.clients.clone();
292        let queue = self.queue.clone();
293        let inflight = self.inflight.clone();
294        let cancel_inflight = self.inflight.clone();
295        let settings = self.settings.clone();
296        let failures = self.failures.clone();
297        let semaphore =
298            Arc::new(Semaphore::new(self.settings.concurrent_requests));
299
300        let failure_expiry = settings.failure_expiry;
301        let mut interval = time::interval(settings.failure_interval);
302
303        tokio::task::spawn(async move {
304            loop {
305                tokio::select! {
306                    biased;
307                    signal = shutdown_rx.recv().fuse() => {
308                        if signal.is_some() {
309                            tracing::debug!("file_transfers::shutting_down");
310
311                            // Clear the queue to break the main
312                            // task loop
313                            let mut writer = queue.write().await;
314                            *writer = Default::default();
315
316                            // Cancel any inflight transfers
317                            cancel_inflight.cancel_all(CancelReason::Shutdown).await;
318
319                            let _ = shutdown_tx.send(());
320                            tracing::debug!("file_transfers::shut_down");
321
322                            break;
323                        }
324                    }
325                    _ = interval.tick() => {
326                        let mut failures = failures.lock().await;
327                        let mut items = Vec::new();
328                        while let Some(failure) = failures.pop_back() {
329                            if let Ok(elapsed) = failure.time.elapsed() {
330                                if elapsed >= failure_expiry {
331                                    items.push(failure.into());
332                                } else {
333                                    failures.push_front(failure);
334                                }
335                            }
336                        }
337
338                        if !items.is_empty() {
339                            if let Err(error) = queue_tx.send(items).await {
340                                tracing::error!(error = ?error, "file_transfers::reinsert");
341                            }
342                        }
343                    }
344                    Some(events) = queue_rx.recv() => {
345                        // println!("queue events: {}", events.len());
346
347                        let num_clients = {
348                            let reader = clients.lock().await;
349                            reader.len()
350                        };
351
352                        // If we don't have any clients no point
353                        // queuing anything
354                        if num_clients == 0 {
355                            continue;
356                        }
357
358                        let num_queued = {
359
360                            // Normalize for delete and move events
361                            // that should cancel any existing upload
362                            // or download operations
363                            for event in &events {
364                                normalize(
365                                    &event.0,
366                                    &event.1,
367                                    queue.clone(),
368                                    failures.clone(),
369                                    inflight.clone(),
370                                ).await;
371                            }
372
373                            let mut writer = queue.write().await;
374                            for event in events {
375                                if !writer.contains(&event) {
376                                  writer.push_front(event);
377                                }
378                            }
379                            writer.len()
380                        };
381
382                        let is_running = {
383                          let reader = queue_drained.lock().await;
384                          *reader
385                        };
386
387                        tracing::debug!(
388                            num_clients = %num_clients,
389                            num_queued = %num_queued,
390                            is_running = %is_running,
391                            "file_transfers::event");
392
393                        // Guard to ensure there is only one spawned
394                        // task consuming the queue at a time
395                        if num_queued > 0 && !is_running {
396                          // Clone of the current client list which
397                          // will remain fixed until the current queue
398                          // is completely drained
399                          let clients = {
400                            let reader = clients.lock().await;
401                            reader.iter().cloned().collect::<Vec<_>>()
402                          };
403
404                          if !clients.is_empty() {
405                              {
406                                let mut writer = queue_drained.lock().await;
407                                *writer = true;
408                              }
409
410                              let semaphore = semaphore.clone();
411                              let queue = queue.clone();
412                              let failures = failures.clone();
413                              let inflight = inflight.clone();
414                              let settings = settings.clone();
415                              let paths = paths.clone();
416                              let drained = queue_drained.clone();
417
418                              // We must not block here otherwise we can't cancel
419                              // whilst there are inflight requests as this branch
420                              // of the select would block the cancel branch
421                              tokio::task::spawn(async move {
422
423                                // This will complete when the queue
424                                // is empty
425                                let res = Self::spawn_tasks(
426                                    paths.clone(),
427                                    semaphore.clone(),
428                                    queue.clone(),
429                                    failures.clone(),
430                                    inflight.clone(),
431                                    settings.clone(),
432                                    clients.clone(),
433                                )
434                                .await;
435                                if let Err(error) = res {
436                                  tracing::error!(error = ?error);
437                                }
438
439                                // Reset so we can spawn another task
440                                let mut writer = drained.lock().await;
441                                *writer = false;
442
443                              });
444                          }
445
446                        }
447                    }
448                }
449            }
450
451            Ok::<_, Error>(())
452        });
453
454        handle
455    }
456
457    async fn spawn_tasks(
458        paths: Arc<Paths>,
459        semaphore: Arc<Semaphore>,
460        queue: Arc<RwLock<VecDeque<FileOperation>>>,
461        failures: Arc<Mutex<VecDeque<TransferFailure>>>,
462        inflight: Arc<InflightTransfers>,
463        settings: Arc<FileTransferSettings>,
464        clients: Vec<C>,
465    ) -> Result<()> {
466        let mut remaining = Self::consume_queue(
467            paths.clone(),
468            semaphore.clone(),
469            queue.clone(),
470            failures.clone(),
471            inflight.clone(),
472            settings.clone(),
473            clients.as_slice(),
474        )
475        .await?;
476
477        while let Some(_) = remaining {
478            remaining = Self::consume_queue(
479                paths.clone(),
480                semaphore.clone(),
481                queue.clone(),
482                failures.clone(),
483                inflight.clone(),
484                settings.clone(),
485                clients.as_slice(),
486            )
487            .await?;
488        }
489
490        Ok(())
491    }
492
493    async fn consume_queue(
494        paths: Arc<Paths>,
495        semaphore: Arc<Semaphore>,
496        queue: Arc<RwLock<VecDeque<FileOperation>>>,
497        failures: Arc<Mutex<VecDeque<TransferFailure>>>,
498        inflight: Arc<InflightTransfers>,
499        settings: Arc<FileTransferSettings>,
500        clients: &[C],
501    ) -> Result<Option<()>> {
502        let mut requests = Vec::new();
503        let mut downloads = Vec::new();
504
505        loop {
506            // Concurrency limit reached
507            if semaphore.available_permits() == 0 {
508                break;
509            }
510
511            let item = {
512                let mut queue = queue.write().await;
513                queue.pop_back()
514            };
515
516            // No more items in the queue
517            if item.is_none() {
518                break;
519            }
520
521            let FileOperation(file, op) = item.unwrap();
522
523            // println!("process: {:#?}", op);
524            tracing::debug!(
525              file = ?file,
526              op = ?op,
527              "file_transfers::queue",
528            );
529
530            match op {
531                // Downloads are a special case that can complete
532                // on the first successful operation
533                TransferOperation::Download => {
534                    let inflight = inflight.clone();
535                    let settings = settings.clone();
536                    let paths = paths.clone();
537                    let clients = clients.to_vec().clone();
538                    let permit = semaphore.clone();
539                    let jh = tokio::task::spawn(async move {
540                        let mut results = Vec::new();
541                        for client in clients {
542                            let _permit = permit.acquire().await.unwrap();
543                            let request_id = inflight.request_id();
544
545                            let outcome = Self::run_client_operation(
546                                request_id,
547                                file,
548                                op,
549                                client.clone(),
550                                settings.clone(),
551                                paths.clone(),
552                                inflight.clone(),
553                            )
554                            .await?;
555
556                            let is_done = matches!(
557                                &outcome.result,
558                                TransferResult::Done
559                            );
560                            results.push(outcome);
561                            if is_done {
562                                break;
563                            }
564                        }
565                        Ok::<_, Error>(results)
566                    });
567                    downloads.push(jh);
568                }
569                // Other operations must complete on all clients
570                _ => {
571                    for client in clients.to_vec() {
572                        let inflight = inflight.clone();
573                        let settings = settings.clone();
574                        let paths = paths.clone();
575                        let permit = semaphore.clone();
576                        let jh = tokio::task::spawn(async move {
577                            let _permit = permit.acquire().await.unwrap();
578                            let request_id = inflight.request_id();
579                            let outcome = Self::run_client_operation(
580                                request_id,
581                                file,
582                                op,
583                                client.clone(),
584                                settings.clone(),
585                                paths.clone(),
586                                inflight.clone(),
587                            )
588                            .await?;
589
590                            Ok::<_, Error>(outcome)
591                        });
592                        requests.push(jh);
593                    }
594                }
595            }
596        }
597
598        let request_paths = paths.clone();
599        let request_queue = queue.clone();
600        let download_inflight = inflight.clone();
601        let download_failures = failures.clone();
602
603        let requests_task = async move {
604            let mut results = Vec::new();
605            for jh in requests {
606                let outcome = jh.await.unwrap()?;
607                if let TransferResult::Done = &outcome.result {
608                    let notify = InflightNotification::TransferDone {
609                        transfer_id: outcome.transfer_id,
610                        request_id: outcome.request_id,
611                    };
612                    notify_listeners(notify, &inflight.notifications).await;
613                }
614
615                results.push(outcome);
616            }
617
618            let done = results
619                .iter()
620                .all(|o| matches!(o.result, TransferResult::Done));
621
622            if !done {
623                // println!("result: {:#?}", results);
624
625                // If we attempt a move but the source file
626                // of the move is missing on the target server
627                // and we have the destination locally
628                // on disc then we mutate it into an upload operation
629                let moved_missing = results
630                    .iter()
631                    .filter(|o| {
632                        matches!(
633                            o.result,
634                            TransferResult::Fatal(
635                                TransferError::MovedMissing
636                            )
637                        )
638                    })
639                    .cloned()
640                    .collect::<HashSet<_>>();
641                for outcome in moved_missing {
642                    if let TransferOperation::Move(dest) = &outcome.operation
643                    {
644                        let path = request_paths.into_file_path(dest);
645
646                        if vfs::try_exists(path).await? {
647                            let item = FileOperation(
648                                dest.clone(),
649                                TransferOperation::Upload,
650                            );
651                            let mut queue = request_queue.write().await;
652                            if !queue.contains(&item) {
653                                queue.push_back(item);
654                            }
655                        }
656                    }
657                }
658
659                for (file, op) in results
660                    .into_iter()
661                    .filter(|o| {
662                        matches!(
663                            o.result,
664                            TransferResult::Fatal(
665                                TransferError::RetryExhausted
666                            )
667                        )
668                    })
669                    .map(|o| (o.file, o.operation))
670                {
671                    let item = TransferFailure {
672                        time: SystemTime::now(),
673                        file,
674                        operation: op,
675                    };
676                    let mut failures = failures.lock().await;
677                    failures.push_front(item);
678                }
679            }
680
681            Ok::<_, Error>(())
682        };
683
684        let downloads_task = async move {
685            let mut results = Vec::new();
686            for jh in downloads {
687                let download_outcomes = jh.await.unwrap()?;
688                let done_requests = download_outcomes
689                    .iter()
690                    .filter(|o| matches!(o.result, TransferResult::Done))
691                    .collect::<Vec<_>>();
692
693                for outcome in done_requests {
694                    if let TransferResult::Done = &outcome.result {
695                        let notify = InflightNotification::TransferDone {
696                            transfer_id: outcome.transfer_id,
697                            request_id: outcome.request_id,
698                        };
699                        notify_listeners(
700                            notify,
701                            &download_inflight.notifications,
702                        )
703                        .await;
704                    }
705                }
706
707                results.push(download_outcomes);
708            }
709
710            let results = results.into_iter().flatten().collect::<Vec<_>>();
711
712            for (file, operation) in results
713                .into_iter()
714                .filter(|o| {
715                    matches!(
716                        o.result,
717                        TransferResult::Fatal(TransferError::RetryExhausted)
718                    )
719                })
720                .map(|o| (o.file, o.operation))
721            {
722                let item = TransferFailure {
723                    time: SystemTime::now(),
724                    file,
725                    operation,
726                };
727                let mut failures = download_failures.lock().await;
728                failures.push_front(item);
729            }
730
731            Ok::<_, Error>(())
732        };
733
734        let results = tokio::join!(requests_task, downloads_task);
735        results.0?;
736        results.1?;
737
738        Ok({
739            let queue = queue.read().await;
740            if queue.is_empty() {
741                None
742            } else {
743                Some(())
744            }
745        })
746    }
747
748    async fn run_client_operation(
749        request_id: u64,
750        file: ExternalFile,
751        op: TransferOperation,
752        client: C,
753        settings: Arc<FileTransferSettings>,
754        paths: Arc<Paths>,
755        inflight_transfers: Arc<InflightTransfers>,
756    ) -> Result<TransferOutcome> {
757        tracing::debug!(
758          request_id = %request_id,
759          op = ?op,
760          "file_transfers::run",
761        );
762
763        let transfer_id = compute_transfer_id(&file, &op, client.origin());
764
765        let (cancel_tx, cancel_rx) =
766            watch::channel::<CancelReason>(Default::default());
767        let progress_tx = match &op {
768            TransferOperation::Upload | TransferOperation::Download => {
769                let (progress_tx, mut progress_rx): (ProgressChannel, _) =
770                    mpsc::channel(16);
771
772                let progress_transfers = inflight_transfers.clone();
773
774                // Proxt the progress information for an individual
775                // upload or download to the inflight transfers
776                // notification channel
777                tokio::task::spawn(async move {
778                    while let Some(event) = progress_rx.recv().await {
779                        let notify = InflightNotification::TransferUpdate {
780                            request_id,
781                            transfer_id,
782                            bytes_transferred: event.0,
783                            bytes_total: event.1,
784                        };
785                        notify_listeners(
786                            notify,
787                            &progress_transfers.notifications,
788                        )
789                        .await;
790                    }
791                });
792
793                Some(progress_tx)
794            }
795            _ => None,
796        };
797
798        {
799            let request = InflightRequest {
800                origin: client.origin().clone(),
801                file,
802                operation: op,
803                cancel: cancel_tx.clone(),
804            };
805
806            tracing::debug!(
807                request_id = %request_id,
808                "inflight_transfer::insert",
809            );
810
811            let notify = InflightNotification::TransferAdded {
812                transfer_id,
813                request_id,
814                origin: request.origin.clone(),
815                file: request.file.clone(),
816                operation: request.operation.clone(),
817            };
818
819            inflight_transfers
820                .insert_transfer(request_id, request)
821                .await;
822
823            notify_listeners(notify, &inflight_transfers.notifications).await;
824        };
825
826        tracing::trace!(
827          op = ?op,
828          url = %client.origin().url(),
829          "file_transfer"
830        );
831
832        let retry = settings.retry.clone_reset();
833        let result = match &op {
834            TransferOperation::Upload => {
835                let operation = operations::UploadOperation::new(
836                    client,
837                    paths,
838                    transfer_id,
839                    request_id,
840                    inflight_transfers.clone(),
841                    retry,
842                    cancel_tx,
843                );
844                operation
845                    .run(&file, progress_tx.unwrap(), cancel_rx)
846                    .await?
847            }
848            TransferOperation::Download => {
849                let operation = operations::DownloadOperation::new(
850                    client,
851                    paths,
852                    transfer_id,
853                    request_id,
854                    inflight_transfers.clone(),
855                    retry,
856                    cancel_tx,
857                );
858                operation
859                    .run(&file, progress_tx.unwrap(), cancel_rx)
860                    .await?
861            }
862            TransferOperation::Delete => {
863                let operation = operations::DeleteOperation::new(
864                    client,
865                    transfer_id,
866                    request_id,
867                    inflight_transfers.clone(),
868                    retry,
869                    cancel_tx,
870                );
871                operation.run(&file).await?
872            }
873            TransferOperation::Move(dest) => {
874                let operation = operations::MoveOperation::new(
875                    client,
876                    transfer_id,
877                    request_id,
878                    inflight_transfers.clone(),
879                    retry,
880                    cancel_tx,
881                );
882                operation.run(&file, dest).await?
883            }
884        };
885
886        {
887            tracing::debug!(
888                request_id = %request_id,
889                "inflight_transfer::remove",
890            );
891
892            inflight_transfers.remove_transfer(&request_id).await;
893        }
894
895        if let TransferResult::Fatal(reason) = &result {
896            let notify = InflightNotification::TransferError {
897                transfer_id,
898                request_id,
899                reason: reason.clone(),
900            };
901            notify_listeners(notify, &inflight_transfers.notifications).await;
902        }
903
904        Ok(TransferOutcome {
905            transfer_id,
906            request_id,
907            file,
908            operation: op,
909            result,
910        })
911    }
912}
913
914async fn notify_listeners(
915    notify: InflightNotification,
916    notifier: &broadcast::Sender<InflightNotification>,
917) {
918    if notifier.receiver_count() > 0 {
919        let _ = notifier.send(notify);
920    }
921}
922
923/// Compute an id for the transfer.
924///
925/// Transfer identifiers are stable across requests
926/// so callers can use this to identify transfers that
927/// may already have requests running.
928fn compute_transfer_id(
929    file: &ExternalFile,
930    operation: &TransferOperation,
931    origin: &Origin,
932) -> u64 {
933    let mut hasher = DefaultHasher::new();
934    file.hash(&mut hasher);
935    operation.hash(&mut hasher);
936    origin.hash(&mut hasher);
937    hasher.finish()
938}
939
940async fn normalize(
941    file: &ExternalFile,
942    operation: &TransferOperation,
943    queue: Arc<RwLock<VecDeque<FileOperation>>>,
944    failures: Arc<Mutex<VecDeque<TransferFailure>>>,
945    inflight: Arc<InflightTransfers>,
946) {
947    match operation {
948        TransferOperation::Delete | TransferOperation::Move(_) => {
949            tracing::debug!(
950              op = ?operation,
951              "file_transfers::normalize_operation",
952            );
953
954            // Remove from the pending queue
955            let mut queue = queue.write().await;
956            queue.retain(|item| {
957                let is_transfer_op = matches!(
958                    item.1,
959                    TransferOperation::Upload | TransferOperation::Download
960                );
961                if &item.0 == file && is_transfer_op {
962                    false
963                } else {
964                    true
965                }
966            });
967
968            // Remove from the failures queue
969            let mut failures = failures.lock().await;
970            failures.retain(|failure| {
971                let is_transfer_op = matches!(
972                    failure.operation,
973                    TransferOperation::Upload | TransferOperation::Download
974                );
975                if &failure.file == file && is_transfer_op {
976                    false
977                } else {
978                    true
979                }
980            });
981
982            // Notmalize inflight transfers which will cancel
983            // any existing uploads/downloads
984            inflight.cancel_active_transfers(file).await;
985        }
986        _ => {}
987    }
988}