openigtlink_rust/io/
partial_transfer.rs

1//! Partial message transfer for large messages
2//!
3//! Supports chunked transfer of large messages (images, video) with
4//! resume capability and progress tracking.
5
6use crate::error::{IgtlError, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::{debug, info, trace, warn};
11
12/// Configuration for partial message transfer
13#[derive(Debug, Clone)]
14pub struct TransferConfig {
15    /// Size of each chunk in bytes
16    pub chunk_size: usize,
17    /// Whether to allow resume after interruption
18    pub allow_resume: bool,
19    /// Timeout for transfer session (None = no timeout)
20    pub timeout_secs: Option<u64>,
21}
22
23impl Default for TransferConfig {
24    fn default() -> Self {
25        Self {
26            chunk_size: 65536,      // 64KB chunks
27            allow_resume: true,
28            timeout_secs: Some(300), // 5 minutes
29        }
30    }
31}
32
33/// Unique identifier for a transfer session
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct TransferId(u64);
36
37impl TransferId {
38    pub fn new(id: u64) -> Self {
39        Self(id)
40    }
41
42    pub fn value(&self) -> u64 {
43        self.0
44    }
45}
46
47/// State of a partial transfer session
48#[derive(Debug, Clone)]
49pub enum TransferState {
50    /// Transfer is in progress
51    InProgress {
52        /// Bytes transferred so far
53        bytes_transferred: usize,
54        /// Total bytes to transfer
55        total_bytes: usize,
56        /// Current chunk index
57        chunk_index: usize,
58    },
59    /// Transfer completed successfully
60    Completed {
61        /// Total bytes transferred
62        total_bytes: usize,
63    },
64    /// Transfer was interrupted
65    Interrupted {
66        /// Bytes transferred before interruption
67        bytes_transferred: usize,
68        /// Total bytes
69        total_bytes: usize,
70        /// Can resume from this point
71        resumable: bool,
72    },
73    /// Transfer failed
74    Failed {
75        /// Error message
76        error: String,
77    },
78}
79
80impl TransferState {
81    /// Get progress percentage (0.0 - 1.0)
82    pub fn progress(&self) -> f64 {
83        match self {
84            Self::InProgress { bytes_transferred, total_bytes, .. } => {
85                if *total_bytes > 0 {
86                    (*bytes_transferred as f64) / (*total_bytes as f64)
87                } else {
88                    0.0
89                }
90            }
91            Self::Completed { .. } => 1.0,
92            Self::Interrupted { bytes_transferred, total_bytes, .. } => {
93                if *total_bytes > 0 {
94                    (*bytes_transferred as f64) / (*total_bytes as f64)
95                } else {
96                    0.0
97                }
98            }
99            Self::Failed { .. } => 0.0,
100        }
101    }
102
103    /// Check if transfer is complete
104    pub fn is_complete(&self) -> bool {
105        matches!(self, Self::Completed { .. })
106    }
107
108    /// Check if transfer can be resumed
109    pub fn is_resumable(&self) -> bool {
110        matches!(self, Self::Interrupted { resumable: true, .. })
111    }
112}
113
114/// Information about a transfer session
115#[derive(Debug, Clone)]
116pub struct TransferInfo {
117    pub id: TransferId,
118    pub state: TransferState,
119    pub config: TransferConfig,
120    pub started_at: std::time::Instant,
121    pub updated_at: std::time::Instant,
122}
123
124impl TransferInfo {
125    /// Get elapsed time since transfer started
126    pub fn elapsed(&self) -> std::time::Duration {
127        self.started_at.elapsed()
128    }
129
130    /// Get time since last update
131    pub fn idle_time(&self) -> std::time::Duration {
132        self.updated_at.elapsed()
133    }
134
135    /// Calculate transfer speed in bytes/sec
136    pub fn speed_bps(&self) -> f64 {
137        match &self.state {
138            TransferState::InProgress { bytes_transferred, .. }
139            | TransferState::Interrupted { bytes_transferred, .. } => {
140                let elapsed_secs = self.elapsed().as_secs_f64();
141                if elapsed_secs > 0.0 {
142                    (*bytes_transferred as f64) / elapsed_secs
143                } else {
144                    0.0
145                }
146            }
147            TransferState::Completed { total_bytes } => {
148                let elapsed_secs = self.elapsed().as_secs_f64();
149                if elapsed_secs > 0.0 {
150                    (*total_bytes as f64) / elapsed_secs
151                } else {
152                    0.0
153                }
154            }
155            TransferState::Failed { .. } => 0.0,
156        }
157    }
158}
159
160/// Manager for partial message transfers
161pub struct PartialTransferManager {
162    transfers: Arc<Mutex<HashMap<TransferId, TransferInfo>>>,
163    next_id: Arc<Mutex<u64>>,
164    config: TransferConfig,
165}
166
167impl PartialTransferManager {
168    /// Create a new transfer manager with default configuration
169    pub fn new() -> Self {
170        Self::with_config(TransferConfig::default())
171    }
172
173    /// Create a new transfer manager with custom configuration
174    pub fn with_config(config: TransferConfig) -> Self {
175        info!(
176            chunk_size = config.chunk_size,
177            allow_resume = config.allow_resume,
178            timeout_secs = ?config.timeout_secs,
179            "Creating partial transfer manager"
180        );
181        Self {
182            transfers: Arc::new(Mutex::new(HashMap::new())),
183            next_id: Arc::new(Mutex::new(1)),
184            config,
185        }
186    }
187
188    /// Start a new transfer session
189    ///
190    /// # Arguments
191    /// * `total_bytes` - Total size of data to transfer
192    ///
193    /// # Returns
194    /// Transfer ID for the new session
195    pub async fn start_transfer(&self, total_bytes: usize) -> Result<TransferId> {
196        let mut next_id = self.next_id.lock().await;
197        let id = TransferId(*next_id);
198        *next_id += 1;
199        drop(next_id);
200
201        info!(
202            transfer_id = id.value(),
203            total_bytes = total_bytes,
204            chunk_size = self.config.chunk_size,
205            "Starting new transfer"
206        );
207
208        let now = std::time::Instant::now();
209        let info = TransferInfo {
210            id,
211            state: TransferState::InProgress {
212                bytes_transferred: 0,
213                total_bytes,
214                chunk_index: 0,
215            },
216            config: self.config.clone(),
217            started_at: now,
218            updated_at: now,
219        };
220
221        self.transfers.lock().await.insert(id, info);
222        Ok(id)
223    }
224
225    /// Update transfer progress
226    ///
227    /// # Arguments
228    /// * `id` - Transfer ID
229    /// * `bytes_transferred` - Total bytes transferred so far
230    /// * `chunk_index` - Current chunk index
231    pub async fn update_progress(
232        &self,
233        id: TransferId,
234        bytes_transferred: usize,
235        chunk_index: usize,
236    ) -> Result<()> {
237        let mut transfers = self.transfers.lock().await;
238
239        let info = transfers.get_mut(&id).ok_or_else(|| {
240            warn!(transfer_id = id.value(), "Transfer not found");
241            IgtlError::Io(std::io::Error::new(
242                std::io::ErrorKind::NotFound,
243                "Transfer not found",
244            ))
245        })?;
246
247        if let TransferState::InProgress { total_bytes, .. } = info.state {
248            let progress = (bytes_transferred as f64 / total_bytes as f64) * 100.0;
249            trace!(
250                transfer_id = id.value(),
251                bytes_transferred = bytes_transferred,
252                total_bytes = total_bytes,
253                chunk_index = chunk_index,
254                progress_pct = format!("{:.1}%", progress),
255                "Transfer progress updated"
256            );
257            info.state = TransferState::InProgress {
258                bytes_transferred,
259                total_bytes,
260                chunk_index,
261            };
262            info.updated_at = std::time::Instant::now();
263        } else {
264            warn!(transfer_id = id.value(), "Transfer is not in progress");
265            return Err(IgtlError::Io(std::io::Error::new(
266                std::io::ErrorKind::InvalidInput,
267                "Transfer is not in progress",
268            )));
269        }
270
271        Ok(())
272    }
273
274    /// Mark transfer as completed
275    pub async fn complete_transfer(&self, id: TransferId) -> Result<()> {
276        let mut transfers = self.transfers.lock().await;
277
278        let info = transfers.get_mut(&id).ok_or_else(|| {
279            warn!(transfer_id = id.value(), "Transfer not found");
280            IgtlError::Io(std::io::Error::new(
281                std::io::ErrorKind::NotFound,
282                "Transfer not found",
283            ))
284        })?;
285
286        if let TransferState::InProgress { total_bytes, .. } = info.state {
287            let elapsed = info.elapsed().as_secs_f64();
288            let speed_mbps = if elapsed > 0.0 {
289                (total_bytes as f64) / elapsed / 1_000_000.0
290            } else {
291                0.0
292            };
293            info!(
294                transfer_id = id.value(),
295                total_bytes = total_bytes,
296                elapsed_secs = format!("{:.2}", elapsed),
297                speed_mbps = format!("{:.2}", speed_mbps),
298                "Transfer completed"
299            );
300            info.state = TransferState::Completed { total_bytes };
301            info.updated_at = std::time::Instant::now();
302        }
303
304        Ok(())
305    }
306
307    /// Interrupt transfer (can be resumed if configured)
308    pub async fn interrupt_transfer(&self, id: TransferId) -> Result<()> {
309        let mut transfers = self.transfers.lock().await;
310
311        let info = transfers.get_mut(&id).ok_or_else(|| {
312            warn!(transfer_id = id.value(), "Transfer not found");
313            IgtlError::Io(std::io::Error::new(
314                std::io::ErrorKind::NotFound,
315                "Transfer not found",
316            ))
317        })?;
318
319        if let TransferState::InProgress {
320            bytes_transferred,
321            total_bytes,
322            ..
323        } = info.state
324        {
325            let resumable = info.config.allow_resume;
326            warn!(
327                transfer_id = id.value(),
328                bytes_transferred = bytes_transferred,
329                total_bytes = total_bytes,
330                resumable = resumable,
331                "Transfer interrupted"
332            );
333            info.state = TransferState::Interrupted {
334                bytes_transferred,
335                total_bytes,
336                resumable,
337            };
338            info.updated_at = std::time::Instant::now();
339        }
340
341        Ok(())
342    }
343
344    /// Resume an interrupted transfer
345    pub async fn resume_transfer(&self, id: TransferId) -> Result<usize> {
346        let mut transfers = self.transfers.lock().await;
347
348        let info = transfers.get_mut(&id).ok_or_else(|| {
349            warn!(transfer_id = id.value(), "Transfer not found");
350            IgtlError::Io(std::io::Error::new(
351                std::io::ErrorKind::NotFound,
352                "Transfer not found",
353            ))
354        })?;
355
356        match info.state {
357            TransferState::Interrupted {
358                bytes_transferred,
359                total_bytes,
360                resumable: true,
361            } => {
362                let chunk_index = bytes_transferred / info.config.chunk_size;
363                info!(
364                    transfer_id = id.value(),
365                    resuming_from = bytes_transferred,
366                    total_bytes = total_bytes,
367                    chunk_index = chunk_index,
368                    "Resuming transfer"
369                );
370                info.state = TransferState::InProgress {
371                    bytes_transferred,
372                    total_bytes,
373                    chunk_index,
374                };
375                info.updated_at = std::time::Instant::now();
376                Ok(bytes_transferred)
377            }
378            TransferState::Interrupted { resumable: false, .. } => {
379                warn!(transfer_id = id.value(), "Transfer is not resumable");
380                Err(IgtlError::Io(
381                    std::io::Error::new(
382                        std::io::ErrorKind::InvalidInput,
383                        "Transfer is not resumable",
384                    ),
385                ))
386            }
387            _ => {
388                warn!(transfer_id = id.value(), "Transfer is not interrupted");
389                Err(IgtlError::Io(std::io::Error::new(
390                    std::io::ErrorKind::InvalidInput,
391                    "Transfer is not interrupted",
392                )))
393            }
394        }
395    }
396
397    /// Fail a transfer with error message
398    pub async fn fail_transfer(&self, id: TransferId, error: String) -> Result<()> {
399        let mut transfers = self.transfers.lock().await;
400
401        if let Some(info) = transfers.get_mut(&id) {
402            warn!(
403                transfer_id = id.value(),
404                error = %error,
405                "Transfer failed"
406            );
407            info.state = TransferState::Failed { error };
408            info.updated_at = std::time::Instant::now();
409        }
410
411        Ok(())
412    }
413
414    /// Get transfer information
415    pub async fn get_transfer(&self, id: TransferId) -> Option<TransferInfo> {
416        self.transfers.lock().await.get(&id).cloned()
417    }
418
419    /// Get all active transfers
420    pub async fn active_transfers(&self) -> Vec<TransferInfo> {
421        self.transfers
422            .lock()
423            .await
424            .values()
425            .filter(|info| matches!(info.state, TransferState::InProgress { .. }))
426            .cloned()
427            .collect()
428    }
429
430    /// Remove completed or failed transfers
431    pub async fn cleanup_completed(&self) {
432        self.transfers.lock().await.retain(|_, info| {
433            !matches!(
434                info.state,
435                TransferState::Completed { .. } | TransferState::Failed { .. }
436            )
437        });
438    }
439
440    /// Remove transfers that have timed out
441    pub async fn cleanup_timed_out(&self) {
442        let config = &self.config;
443        if let Some(timeout_secs) = config.timeout_secs {
444            let timeout = std::time::Duration::from_secs(timeout_secs);
445            let mut transfers = self.transfers.lock().await;
446            let before_count = transfers.len();
447            transfers.retain(|id, info| {
448                let keep = info.idle_time() < timeout;
449                if !keep {
450                    debug!(
451                        transfer_id = id.value(),
452                        idle_time_secs = info.idle_time().as_secs(),
453                        "Removing timed out transfer"
454                    );
455                }
456                keep
457            });
458            let removed = before_count - transfers.len();
459            if removed > 0 {
460                info!(removed_count = removed, "Cleaned up timed out transfers");
461            }
462        }
463    }
464}
465
466impl Default for PartialTransferManager {
467    fn default() -> Self {
468        Self::new()
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    #[tokio::test]
477    async fn test_start_transfer() {
478        let manager = PartialTransferManager::new();
479        let id = manager.start_transfer(1000000).await.unwrap();
480
481        let info = manager.get_transfer(id).await.unwrap();
482        assert!(matches!(info.state, TransferState::InProgress { .. }));
483    }
484
485    #[tokio::test]
486    async fn test_update_progress() {
487        let manager = PartialTransferManager::new();
488        let id = manager.start_transfer(1000000).await.unwrap();
489
490        manager.update_progress(id, 500000, 5).await.unwrap();
491
492        let info = manager.get_transfer(id).await.unwrap();
493        assert_eq!(info.state.progress(), 0.5);
494    }
495
496    #[tokio::test]
497    async fn test_complete_transfer() {
498        let manager = PartialTransferManager::new();
499        let id = manager.start_transfer(1000000).await.unwrap();
500
501        manager.update_progress(id, 1000000, 10).await.unwrap();
502        manager.complete_transfer(id).await.unwrap();
503
504        let info = manager.get_transfer(id).await.unwrap();
505        assert!(info.state.is_complete());
506        assert_eq!(info.state.progress(), 1.0);
507    }
508
509    #[tokio::test]
510    async fn test_interrupt_and_resume() {
511        let manager = PartialTransferManager::new();
512        let id = manager.start_transfer(1000000).await.unwrap();
513
514        manager.update_progress(id, 500000, 5).await.unwrap();
515        manager.interrupt_transfer(id).await.unwrap();
516
517        let info = manager.get_transfer(id).await.unwrap();
518        assert!(info.state.is_resumable());
519
520        let resumed_at = manager.resume_transfer(id).await.unwrap();
521        assert_eq!(resumed_at, 500000);
522
523        let info = manager.get_transfer(id).await.unwrap();
524        assert!(matches!(info.state, TransferState::InProgress { .. }));
525    }
526
527    #[tokio::test]
528    async fn test_fail_transfer() {
529        let manager = PartialTransferManager::new();
530        let id = manager.start_transfer(1000000).await.unwrap();
531
532        manager
533            .fail_transfer(id, "Network error".to_string())
534            .await
535            .unwrap();
536
537        let info = manager.get_transfer(id).await.unwrap();
538        assert!(matches!(info.state, TransferState::Failed { .. }));
539    }
540
541    #[tokio::test]
542    async fn test_active_transfers() {
543        let manager = PartialTransferManager::new();
544
545        let id1 = manager.start_transfer(1000000).await.unwrap();
546        let id2 = manager.start_transfer(2000000).await.unwrap();
547        let id3 = manager.start_transfer(3000000).await.unwrap();
548
549        manager.complete_transfer(id1).await.unwrap();
550        manager.interrupt_transfer(id2).await.unwrap();
551
552        let active = manager.active_transfers().await;
553        assert_eq!(active.len(), 1);
554        assert_eq!(active[0].id, id3);
555    }
556
557    #[tokio::test]
558    async fn test_cleanup_completed() {
559        let manager = PartialTransferManager::new();
560
561        let id1 = manager.start_transfer(1000000).await.unwrap();
562        let id2 = manager.start_transfer(2000000).await.unwrap();
563
564        manager.complete_transfer(id1).await.unwrap();
565
566        manager.cleanup_completed().await;
567
568        assert!(manager.get_transfer(id1).await.is_none());
569        assert!(manager.get_transfer(id2).await.is_some());
570    }
571
572    #[tokio::test]
573    async fn test_transfer_speed() {
574        let manager = PartialTransferManager::new();
575        let id = manager.start_transfer(1000000).await.unwrap();
576
577        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
578        manager.update_progress(id, 500000, 5).await.unwrap();
579
580        let info = manager.get_transfer(id).await.unwrap();
581        let speed = info.speed_bps();
582
583        // Should be around 5 MB/s (500000 bytes in ~0.1 sec)
584        assert!(speed > 1_000_000.0); // At least 1 MB/s
585    }
586}