alopex_chirps_file_transfer/
progress.rs1use crate::session::TransferState;
2use alopex_chirps_wire::node_id::NodeId;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7use tokio::sync::Mutex;
8
9#[derive(Debug, Clone, Copy, Default)]
11pub struct TransferProgress {
12 pub bytes_transferred: u64,
13 pub total_bytes: u64,
14 pub chunks_completed: u32,
15 pub throughput: f64,
16}
17
18impl TransferProgress {
19 pub fn completion_ratio(&self) -> f64 {
24 if self.total_bytes == 0 {
25 1.0
26 } else {
27 self.bytes_transferred as f64 / self.total_bytes as f64
28 }
29 }
30}
31
32#[derive(Debug)]
33struct ProgressState {
34 progress: TransferProgress,
35 started_at: Instant,
36}
37
38impl ProgressState {
39 fn new(total_bytes: u64) -> Self {
40 ProgressState {
41 progress: TransferProgress {
42 total_bytes,
43 ..TransferProgress::default()
44 },
45 started_at: Instant::now(),
46 }
47 }
48
49 fn update(&mut self, bytes_delta: u64, chunks_delta: u32) {
50 self.progress.bytes_transferred =
51 self.progress.bytes_transferred.saturating_add(bytes_delta);
52 self.progress.chunks_completed =
53 self.progress.chunks_completed.saturating_add(chunks_delta);
54 self.update_throughput();
55 }
56
57 fn set_total_bytes(&mut self, total_bytes: u64) {
58 self.progress.total_bytes = total_bytes;
59 self.update_throughput();
60 }
61
62 fn snapshot(&self) -> TransferProgress {
63 self.progress
64 }
65
66 fn update_throughput(&mut self) {
67 let elapsed = self.started_at.elapsed().as_secs_f64();
68 self.progress.throughput = if elapsed > 0.0 {
69 self.progress.bytes_transferred as f64 / elapsed
70 } else {
71 0.0
72 };
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct TransferHandle {
79 state: Arc<Mutex<ProgressState>>,
80 cancelled: Arc<AtomicBool>,
81}
82
83impl TransferHandle {
84 pub fn new(total_bytes: u64) -> Self {
89 TransferHandle {
90 state: Arc::new(Mutex::new(ProgressState::new(total_bytes))),
91 cancelled: Arc::new(AtomicBool::new(false)),
92 }
93 }
94
95 pub async fn progress(&self) -> TransferProgress {
100 let state = self.state.lock().await;
101 state.snapshot()
102 }
103
104 pub async fn update_progress(&self, bytes_delta: u64, chunks_delta: u32) {
109 let mut state = self.state.lock().await;
110 state.update(bytes_delta, chunks_delta);
111 }
112
113 pub async fn set_total_bytes(&self, total_bytes: u64) {
118 let mut state = self.state.lock().await;
119 state.set_total_bytes(total_bytes);
120 }
121
122 pub fn cancel(&self) {
127 self.cancelled.store(true, Ordering::Relaxed);
128 }
129
130 pub fn is_cancelled(&self) -> bool {
135 self.cancelled.load(Ordering::Relaxed)
136 }
137}
138
139#[derive(Debug, Clone)]
141pub struct NodeTransferStatus {
142 pub progress: TransferProgress,
143 pub state: TransferState,
144 pub error: Option<String>,
145}
146
147#[derive(Debug)]
148struct NodeProgressState {
149 progress: ProgressState,
150 state: TransferState,
151 error: Option<String>,
152}
153
154impl NodeProgressState {
155 fn new(total_bytes: u64) -> Self {
156 NodeProgressState {
157 progress: ProgressState::new(total_bytes),
158 state: TransferState::Initializing,
159 error: None,
160 }
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct BroadcastHandle {
167 state: Arc<Mutex<HashMap<NodeId, NodeProgressState>>>,
168 cancelled: Arc<AtomicBool>,
169}
170
171impl BroadcastHandle {
172 pub fn new(targets: Vec<NodeId>, total_bytes: u64) -> Self {
177 let mut map = HashMap::new();
178 for node_id in targets {
179 map.insert(node_id, NodeProgressState::new(total_bytes));
180 }
181 BroadcastHandle {
182 state: Arc::new(Mutex::new(map)),
183 cancelled: Arc::new(AtomicBool::new(false)),
184 }
185 }
186
187 pub async fn progress(&self) -> HashMap<NodeId, NodeTransferStatus> {
192 let state = self.state.lock().await;
193 state
194 .iter()
195 .map(|(node_id, node_state)| {
196 (
197 *node_id,
198 NodeTransferStatus {
199 progress: node_state.progress.snapshot(),
200 state: node_state.state,
201 error: node_state.error.clone(),
202 },
203 )
204 })
205 .collect()
206 }
207
208 pub async fn update_node_progress(&self, node_id: NodeId, bytes_delta: u64, chunks_delta: u32) {
213 let mut state = self.state.lock().await;
214 if let Some(node_state) = state.get_mut(&node_id) {
215 node_state.progress.update(bytes_delta, chunks_delta);
216 }
217 }
218
219 pub async fn set_node_state(
224 &self,
225 node_id: NodeId,
226 transfer_state: TransferState,
227 error: Option<String>,
228 ) {
229 let mut state = self.state.lock().await;
230 if let Some(node_state) = state.get_mut(&node_id) {
231 node_state.state = transfer_state;
232 node_state.error = error;
233 }
234 }
235
236 pub fn cancel(&self) {
241 self.cancelled.store(true, Ordering::Relaxed);
242 }
243
244 pub fn is_cancelled(&self) -> bool {
249 self.cancelled.load(Ordering::Relaxed)
250 }
251}
252
253#[derive(Debug, Clone)]
255pub struct SyncHandle {
256 state: Arc<Mutex<ProgressState>>,
257 cancelled: Arc<AtomicBool>,
258}
259
260impl SyncHandle {
261 pub fn new(total_bytes: u64) -> Self {
266 SyncHandle {
267 state: Arc::new(Mutex::new(ProgressState::new(total_bytes))),
268 cancelled: Arc::new(AtomicBool::new(false)),
269 }
270 }
271
272 pub async fn progress(&self) -> TransferProgress {
277 let state = self.state.lock().await;
278 state.snapshot()
279 }
280
281 pub async fn update_progress(&self, bytes_delta: u64, chunks_delta: u32) {
286 let mut state = self.state.lock().await;
287 state.update(bytes_delta, chunks_delta);
288 }
289
290 pub async fn set_total_bytes(&self, total_bytes: u64) {
295 let mut state = self.state.lock().await;
296 state.set_total_bytes(total_bytes);
297 }
298
299 pub fn cancel(&self) {
304 self.cancelled.store(true, Ordering::Relaxed);
305 }
306
307 pub fn is_cancelled(&self) -> bool {
312 self.cancelled.load(Ordering::Relaxed)
313 }
314}