alopex_chirps_file_transfer/
progress.rs

1use crate::session::TransferState;
2use alopex_chirps_wire::node_id::NodeId;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Instant;
7use tokio::sync::Mutex;
8
9/// Snapshot of transfer progress and throughput.
10#[derive(Debug, Clone, Copy, Default)]
11pub struct TransferProgress {
12    pub bytes_transferred: u64,
13    pub total_bytes: u64,
14    pub chunks_completed: u32,
15    pub throughput: f64,
16}
17
18impl TransferProgress {
19    /// Returns completion ratio in `[0.0, 1.0]`.
20    ///
21    /// # Panics
22    /// This method does not panic.
23    pub fn completion_ratio(&self) -> f64 {
24        if self.total_bytes == 0 {
25            1.0
26        } else {
27            self.bytes_transferred as f64 / self.total_bytes as f64
28        }
29    }
30}
31
32#[derive(Debug)]
33struct ProgressState {
34    progress: TransferProgress,
35    started_at: Instant,
36}
37
38impl ProgressState {
39    fn new(total_bytes: u64) -> Self {
40        ProgressState {
41            progress: TransferProgress {
42                total_bytes,
43                ..TransferProgress::default()
44            },
45            started_at: Instant::now(),
46        }
47    }
48
49    fn update(&mut self, bytes_delta: u64, chunks_delta: u32) {
50        self.progress.bytes_transferred =
51            self.progress.bytes_transferred.saturating_add(bytes_delta);
52        self.progress.chunks_completed =
53            self.progress.chunks_completed.saturating_add(chunks_delta);
54        self.update_throughput();
55    }
56
57    fn set_total_bytes(&mut self, total_bytes: u64) {
58        self.progress.total_bytes = total_bytes;
59        self.update_throughput();
60    }
61
62    fn snapshot(&self) -> TransferProgress {
63        self.progress
64    }
65
66    fn update_throughput(&mut self) {
67        let elapsed = self.started_at.elapsed().as_secs_f64();
68        self.progress.throughput = if elapsed > 0.0 {
69            self.progress.bytes_transferred as f64 / elapsed
70        } else {
71            0.0
72        };
73    }
74}
75
76/// Handle for tracking and controlling a single transfer.
77#[derive(Debug, Clone)]
78pub struct TransferHandle {
79    state: Arc<Mutex<ProgressState>>,
80    cancelled: Arc<AtomicBool>,
81}
82
83impl TransferHandle {
84    /// Creates a new handle with the provided total byte count.
85    ///
86    /// # Panics
87    /// This method does not panic.
88    pub fn new(total_bytes: u64) -> Self {
89        TransferHandle {
90            state: Arc::new(Mutex::new(ProgressState::new(total_bytes))),
91            cancelled: Arc::new(AtomicBool::new(false)),
92        }
93    }
94
95    /// Returns the latest progress snapshot.
96    ///
97    /// # Panics
98    /// This method does not panic.
99    pub async fn progress(&self) -> TransferProgress {
100        let state = self.state.lock().await;
101        state.snapshot()
102    }
103
104    /// Applies a progress delta for bytes and chunks.
105    ///
106    /// # Panics
107    /// This method does not panic.
108    pub async fn update_progress(&self, bytes_delta: u64, chunks_delta: u32) {
109        let mut state = self.state.lock().await;
110        state.update(bytes_delta, chunks_delta);
111    }
112
113    /// Updates the total bytes for the transfer.
114    ///
115    /// # Panics
116    /// This method does not panic.
117    pub async fn set_total_bytes(&self, total_bytes: u64) {
118        let mut state = self.state.lock().await;
119        state.set_total_bytes(total_bytes);
120    }
121
122    /// Requests cancellation of the transfer.
123    ///
124    /// # Panics
125    /// This method does not panic.
126    pub fn cancel(&self) {
127        self.cancelled.store(true, Ordering::Relaxed);
128    }
129
130    /// Returns true when cancellation has been requested.
131    ///
132    /// # Panics
133    /// This method does not panic.
134    pub fn is_cancelled(&self) -> bool {
135        self.cancelled.load(Ordering::Relaxed)
136    }
137}
138
139/// Per-node status snapshot for broadcast transfers.
140#[derive(Debug, Clone)]
141pub struct NodeTransferStatus {
142    pub progress: TransferProgress,
143    pub state: TransferState,
144    pub error: Option<String>,
145}
146
147#[derive(Debug)]
148struct NodeProgressState {
149    progress: ProgressState,
150    state: TransferState,
151    error: Option<String>,
152}
153
154impl NodeProgressState {
155    fn new(total_bytes: u64) -> Self {
156        NodeProgressState {
157            progress: ProgressState::new(total_bytes),
158            state: TransferState::Initializing,
159            error: None,
160        }
161    }
162}
163
164/// Handle for tracking broadcast progress across multiple nodes.
165#[derive(Debug, Clone)]
166pub struct BroadcastHandle {
167    state: Arc<Mutex<HashMap<NodeId, NodeProgressState>>>,
168    cancelled: Arc<AtomicBool>,
169}
170
171impl BroadcastHandle {
172    /// Creates a broadcast handle for the target list.
173    ///
174    /// # Panics
175    /// This method does not panic.
176    pub fn new(targets: Vec<NodeId>, total_bytes: u64) -> Self {
177        let mut map = HashMap::new();
178        for node_id in targets {
179            map.insert(node_id, NodeProgressState::new(total_bytes));
180        }
181        BroadcastHandle {
182            state: Arc::new(Mutex::new(map)),
183            cancelled: Arc::new(AtomicBool::new(false)),
184        }
185    }
186
187    /// Returns a snapshot of progress for all targets.
188    ///
189    /// # Panics
190    /// This method does not panic.
191    pub async fn progress(&self) -> HashMap<NodeId, NodeTransferStatus> {
192        let state = self.state.lock().await;
193        state
194            .iter()
195            .map(|(node_id, node_state)| {
196                (
197                    *node_id,
198                    NodeTransferStatus {
199                        progress: node_state.progress.snapshot(),
200                        state: node_state.state,
201                        error: node_state.error.clone(),
202                    },
203                )
204            })
205            .collect()
206    }
207
208    /// Updates progress for a specific target.
209    ///
210    /// # Panics
211    /// This method does not panic.
212    pub async fn update_node_progress(&self, node_id: NodeId, bytes_delta: u64, chunks_delta: u32) {
213        let mut state = self.state.lock().await;
214        if let Some(node_state) = state.get_mut(&node_id) {
215            node_state.progress.update(bytes_delta, chunks_delta);
216        }
217    }
218
219    /// Updates the transfer state for a specific target.
220    ///
221    /// # Panics
222    /// This method does not panic.
223    pub async fn set_node_state(
224        &self,
225        node_id: NodeId,
226        transfer_state: TransferState,
227        error: Option<String>,
228    ) {
229        let mut state = self.state.lock().await;
230        if let Some(node_state) = state.get_mut(&node_id) {
231            node_state.state = transfer_state;
232            node_state.error = error;
233        }
234    }
235
236    /// Requests cancellation of the broadcast operation.
237    ///
238    /// # Panics
239    /// This method does not panic.
240    pub fn cancel(&self) {
241        self.cancelled.store(true, Ordering::Relaxed);
242    }
243
244    /// Returns true when cancellation has been requested.
245    ///
246    /// # Panics
247    /// This method does not panic.
248    pub fn is_cancelled(&self) -> bool {
249        self.cancelled.load(Ordering::Relaxed)
250    }
251}
252
253/// Handle for tracking sync progress.
254#[derive(Debug, Clone)]
255pub struct SyncHandle {
256    state: Arc<Mutex<ProgressState>>,
257    cancelled: Arc<AtomicBool>,
258}
259
260impl SyncHandle {
261    /// Creates a new sync handle with the provided total byte count.
262    ///
263    /// # Panics
264    /// This method does not panic.
265    pub fn new(total_bytes: u64) -> Self {
266        SyncHandle {
267            state: Arc::new(Mutex::new(ProgressState::new(total_bytes))),
268            cancelled: Arc::new(AtomicBool::new(false)),
269        }
270    }
271
272    /// Returns the latest progress snapshot.
273    ///
274    /// # Panics
275    /// This method does not panic.
276    pub async fn progress(&self) -> TransferProgress {
277        let state = self.state.lock().await;
278        state.snapshot()
279    }
280
281    /// Applies a progress delta for bytes and chunks.
282    ///
283    /// # Panics
284    /// This method does not panic.
285    pub async fn update_progress(&self, bytes_delta: u64, chunks_delta: u32) {
286        let mut state = self.state.lock().await;
287        state.update(bytes_delta, chunks_delta);
288    }
289
290    /// Updates the total bytes for the sync.
291    ///
292    /// # Panics
293    /// This method does not panic.
294    pub async fn set_total_bytes(&self, total_bytes: u64) {
295        let mut state = self.state.lock().await;
296        state.set_total_bytes(total_bytes);
297    }
298
299    /// Requests cancellation of the sync.
300    ///
301    /// # Panics
302    /// This method does not panic.
303    pub fn cancel(&self) {
304        self.cancelled.store(true, Ordering::Relaxed);
305    }
306
307    /// Returns true when cancellation has been requested.
308    ///
309    /// # Panics
310    /// This method does not panic.
311    pub fn is_cancelled(&self) -> bool {
312        self.cancelled.load(Ordering::Relaxed)
313    }
314}