sos_net/account/file_transfers/
inflight.rs

1//! Tracks inflight file transfer requests.
2use sos_core::{ExternalFile, Origin};
3use sos_protocol::transfer::{CancelReason, TransferOperation};
4use std::{
5    collections::HashMap,
6    sync::{
7        atomic::{AtomicU64, Ordering},
8        Arc,
9    },
10};
11use tokio::sync::{broadcast, RwLock};
12
13use super::{CancelChannel, TransferError};
14
15/// Notification for inflight transfers.
16#[derive(Debug, Clone)]
17pub enum InflightNotification {
18    /// Notify a transfer was added.
19    TransferAdded {
20        /// Transfer identifier.
21        transfer_id: u64,
22        /// Request identifier.
23        request_id: u64,
24        /// Server origin.
25        origin: Origin,
26        /// File information.
27        file: ExternalFile,
28        /// Transfer operation.
29        operation: TransferOperation,
30    },
31    /// Notify a transfer was updated with progress information.
32    ///
33    /// This notification is only sent for uploads and downloads.
34    TransferUpdate {
35        /// Transfer identifier.
36        transfer_id: u64,
37        /// Request identifier.
38        request_id: u64,
39        /// Bytes transferred.
40        bytes_transferred: u64,
41        /// Bytes total.
42        bytes_total: Option<u64>,
43    },
44    /// Notify a transfer is being retried.
45    TransferRetry {
46        /// Transfer identifier.
47        transfer_id: u64,
48        /// Request identifier.
49        request_id: u64,
50        /// Retry number.
51        retry: u32,
52        /// Maximum number of retries.
53        maximum: u32,
54    },
55    /// Notify a transfer is stopped due to an error.
56    TransferError {
57        /// Transfer identifier.
58        transfer_id: u64,
59        /// Request identifier.
60        request_id: u64,
61        /// Error reason.
62        reason: TransferError,
63    },
64    /// Notify a transfer was completed.
65    TransferDone {
66        /// Transfer identifier.
67        transfer_id: u64,
68        /// Request identifier.
69        request_id: u64,
70    },
71}
72
73/// Inflight file transfer.
74#[derive(Debug)]
75pub struct InflightRequest {
76    /// Server origin.
77    pub origin: Origin,
78    /// External file information.
79    pub file: ExternalFile,
80    /// Transfer operation.
81    pub operation: TransferOperation,
82    /// Cancel channel for uploads and downloads.
83    pub cancel: CancelChannel,
84}
85
86impl InflightRequest {
87    /// Cancel the inflight request.
88    pub async fn cancel(self, reason: CancelReason) -> bool {
89        self.cancel.send(reason).is_ok()
90    }
91}
92
93/// Collection of pending transfers.
94pub struct InflightTransfers {
95    inflight: Arc<RwLock<HashMap<u64, InflightRequest>>>,
96    request_id: AtomicU64,
97    pub(super) notifications: broadcast::Sender<InflightNotification>,
98}
99
100impl InflightTransfers {
101    /// Create new pending transfers.
102    pub(crate) fn new() -> Self {
103        let (notifications, _) = broadcast::channel(2048);
104        Self {
105            inflight: Arc::new(RwLock::new(Default::default())),
106            request_id: AtomicU64::new(1),
107            notifications,
108        }
109    }
110
111    /// Cancel all inflight transfers.
112    pub async fn cancel_all(&self, reason: CancelReason) {
113        let mut writer = self.inflight.write().await;
114        for (id, request) in writer.drain() {
115            tracing::info!(
116                request_id = %id,
117                op = ?request.operation,
118                "inflight::cancel",
119            );
120            request.cancel(reason.clone()).await;
121        }
122    }
123
124    /// Cancel a single inflight transfer.
125    pub async fn cancel_one(
126        &self,
127        request_id: &u64,
128        reason: CancelReason,
129    ) -> bool {
130        let mut writer = self.inflight.write().await;
131        if let Some(req) = writer.remove(request_id) {
132            req.cancel(reason).await
133        } else {
134            false
135        }
136    }
137
138    /// Inflight notifications channel.
139    pub fn notifications(&self) -> &broadcast::Sender<InflightNotification> {
140        &self.notifications
141    }
142
143    /// Determine if the inflight transfers is empty.
144    pub async fn is_empty(&self) -> bool {
145        let queue = self.inflight.read().await;
146        queue.is_empty()
147    }
148
149    /// Cancel inflight upload or download transfers for the
150    /// given file.
151    pub(super) async fn cancel_active_transfers(&self, file: &ExternalFile) {
152        let cancelations = {
153            let mut cancelations = Vec::new();
154            let inflight = self.inflight.read().await;
155            for (request_id, transfer) in &*inflight {
156                let is_transfer_op = matches!(
157                    transfer.operation,
158                    TransferOperation::Upload | TransferOperation::Download
159                );
160                if &transfer.file == file && is_transfer_op {
161                    cancelations.push(*request_id);
162                }
163            }
164            cancelations
165        };
166
167        for request_id in &cancelations {
168            self.cancel_one(request_id, CancelReason::Aborted).await;
169        }
170    }
171
172    /// Next request id.
173    pub(super) fn request_id(&self) -> u64 {
174        self.request_id.fetch_add(1, Ordering::SeqCst)
175    }
176
177    pub(super) async fn insert_transfer(
178        &self,
179        request_id: u64,
180        request: InflightRequest,
181    ) {
182        let mut inflight = self.inflight.write().await;
183        inflight.insert(request_id, request);
184    }
185
186    pub(super) async fn remove_transfer(&self, request_id: &u64) {
187        let mut inflight = self.inflight.write().await;
188        inflight.remove(request_id);
189    }
190}