sos_net/account/file_transfers/
inflight.rs1use sos_core::{ExternalFile, Origin};
3use sos_protocol::transfer::{CancelReason, TransferOperation};
4use std::{
5 collections::HashMap,
6 sync::{
7 atomic::{AtomicU64, Ordering},
8 Arc,
9 },
10};
11use tokio::sync::{broadcast, RwLock};
12
13use super::{CancelChannel, TransferError};
14
15#[derive(Debug, Clone)]
17pub enum InflightNotification {
18 TransferAdded {
20 transfer_id: u64,
22 request_id: u64,
24 origin: Origin,
26 file: ExternalFile,
28 operation: TransferOperation,
30 },
31 TransferUpdate {
35 transfer_id: u64,
37 request_id: u64,
39 bytes_transferred: u64,
41 bytes_total: Option<u64>,
43 },
44 TransferRetry {
46 transfer_id: u64,
48 request_id: u64,
50 retry: u32,
52 maximum: u32,
54 },
55 TransferError {
57 transfer_id: u64,
59 request_id: u64,
61 reason: TransferError,
63 },
64 TransferDone {
66 transfer_id: u64,
68 request_id: u64,
70 },
71}
72
73#[derive(Debug)]
75pub struct InflightRequest {
76 pub origin: Origin,
78 pub file: ExternalFile,
80 pub operation: TransferOperation,
82 pub cancel: CancelChannel,
84}
85
86impl InflightRequest {
87 pub async fn cancel(self, reason: CancelReason) -> bool {
89 self.cancel.send(reason).is_ok()
90 }
91}
92
93pub struct InflightTransfers {
95 inflight: Arc<RwLock<HashMap<u64, InflightRequest>>>,
96 request_id: AtomicU64,
97 pub(super) notifications: broadcast::Sender<InflightNotification>,
98}
99
100impl InflightTransfers {
101 pub(crate) fn new() -> Self {
103 let (notifications, _) = broadcast::channel(2048);
104 Self {
105 inflight: Arc::new(RwLock::new(Default::default())),
106 request_id: AtomicU64::new(1),
107 notifications,
108 }
109 }
110
111 pub async fn cancel_all(&self, reason: CancelReason) {
113 let mut writer = self.inflight.write().await;
114 for (id, request) in writer.drain() {
115 tracing::info!(
116 request_id = %id,
117 op = ?request.operation,
118 "inflight::cancel",
119 );
120 request.cancel(reason.clone()).await;
121 }
122 }
123
124 pub async fn cancel_one(
126 &self,
127 request_id: &u64,
128 reason: CancelReason,
129 ) -> bool {
130 let mut writer = self.inflight.write().await;
131 if let Some(req) = writer.remove(request_id) {
132 req.cancel(reason).await
133 } else {
134 false
135 }
136 }
137
138 pub fn notifications(&self) -> &broadcast::Sender<InflightNotification> {
140 &self.notifications
141 }
142
143 pub async fn is_empty(&self) -> bool {
145 let queue = self.inflight.read().await;
146 queue.is_empty()
147 }
148
149 pub(super) async fn cancel_active_transfers(&self, file: &ExternalFile) {
152 let cancelations = {
153 let mut cancelations = Vec::new();
154 let inflight = self.inflight.read().await;
155 for (request_id, transfer) in &*inflight {
156 let is_transfer_op = matches!(
157 transfer.operation,
158 TransferOperation::Upload | TransferOperation::Download
159 );
160 if &transfer.file == file && is_transfer_op {
161 cancelations.push(*request_id);
162 }
163 }
164 cancelations
165 };
166
167 for request_id in &cancelations {
168 self.cancel_one(request_id, CancelReason::Aborted).await;
169 }
170 }
171
172 pub(super) fn request_id(&self) -> u64 {
174 self.request_id.fetch_add(1, Ordering::SeqCst)
175 }
176
177 pub(super) async fn insert_transfer(
178 &self,
179 request_id: u64,
180 request: InflightRequest,
181 ) {
182 let mut inflight = self.inflight.write().await;
183 inflight.insert(request_id, request);
184 }
185
186 pub(super) async fn remove_transfer(&self, request_id: &u64) {
187 let mut inflight = self.inflight.write().await;
188 inflight.remove(request_id);
189 }
190}