openigtlink_rust/io/
partial_transfer.rs

1//! Partial message transfer for large messages
2//!
3//! Supports chunked transfer of large messages (images, video) with
4//! resume capability and progress tracking.
5
6use crate::error::{IgtlError, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::{debug, info, trace, warn};
11
12/// Configuration for partial message transfer
13#[derive(Debug, Clone)]
14pub struct TransferConfig {
15    /// Size of each chunk in bytes
16    pub chunk_size: usize,
17    /// Whether to allow resume after interruption
18    pub allow_resume: bool,
19    /// Timeout for transfer session (None = no timeout)
20    pub timeout_secs: Option<u64>,
21}
22
23impl Default for TransferConfig {
24    fn default() -> Self {
25        Self {
26            chunk_size: 65536, // 64KB chunks
27            allow_resume: true,
28            timeout_secs: Some(300), // 5 minutes
29        }
30    }
31}
32
33/// Unique identifier for a transfer session
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct TransferId(u64);
36
37impl TransferId {
38    pub fn new(id: u64) -> Self {
39        Self(id)
40    }
41
42    pub fn value(&self) -> u64 {
43        self.0
44    }
45}
46
47/// State of a partial transfer session
48#[derive(Debug, Clone)]
49pub enum TransferState {
50    /// Transfer is in progress
51    InProgress {
52        /// Bytes transferred so far
53        bytes_transferred: usize,
54        /// Total bytes to transfer
55        total_bytes: usize,
56        /// Current chunk index
57        chunk_index: usize,
58    },
59    /// Transfer completed successfully
60    Completed {
61        /// Total bytes transferred
62        total_bytes: usize,
63    },
64    /// Transfer was interrupted
65    Interrupted {
66        /// Bytes transferred before interruption
67        bytes_transferred: usize,
68        /// Total bytes
69        total_bytes: usize,
70        /// Can resume from this point
71        resumable: bool,
72    },
73    /// Transfer failed
74    Failed {
75        /// Error message
76        error: String,
77    },
78}
79
80impl TransferState {
81    /// Get progress percentage (0.0 - 1.0)
82    pub fn progress(&self) -> f64 {
83        match self {
84            Self::InProgress {
85                bytes_transferred,
86                total_bytes,
87                ..
88            } => {
89                if *total_bytes > 0 {
90                    (*bytes_transferred as f64) / (*total_bytes as f64)
91                } else {
92                    0.0
93                }
94            }
95            Self::Completed { .. } => 1.0,
96            Self::Interrupted {
97                bytes_transferred,
98                total_bytes,
99                ..
100            } => {
101                if *total_bytes > 0 {
102                    (*bytes_transferred as f64) / (*total_bytes as f64)
103                } else {
104                    0.0
105                }
106            }
107            Self::Failed { .. } => 0.0,
108        }
109    }
110
111    /// Check if transfer is complete
112    pub fn is_complete(&self) -> bool {
113        matches!(self, Self::Completed { .. })
114    }
115
116    /// Check if transfer can be resumed
117    pub fn is_resumable(&self) -> bool {
118        matches!(
119            self,
120            Self::Interrupted {
121                resumable: true,
122                ..
123            }
124        )
125    }
126}
127
128/// Information about a transfer session
129#[derive(Debug, Clone)]
130pub struct TransferInfo {
131    pub id: TransferId,
132    pub state: TransferState,
133    pub config: TransferConfig,
134    pub started_at: std::time::Instant,
135    pub updated_at: std::time::Instant,
136}
137
138impl TransferInfo {
139    /// Get elapsed time since transfer started
140    pub fn elapsed(&self) -> std::time::Duration {
141        self.started_at.elapsed()
142    }
143
144    /// Get time since last update
145    pub fn idle_time(&self) -> std::time::Duration {
146        self.updated_at.elapsed()
147    }
148
149    /// Calculate transfer speed in bytes/sec
150    pub fn speed_bps(&self) -> f64 {
151        match &self.state {
152            TransferState::InProgress {
153                bytes_transferred, ..
154            }
155            | TransferState::Interrupted {
156                bytes_transferred, ..
157            } => {
158                let elapsed_secs = self.elapsed().as_secs_f64();
159                if elapsed_secs > 0.0 {
160                    (*bytes_transferred as f64) / elapsed_secs
161                } else {
162                    0.0
163                }
164            }
165            TransferState::Completed { total_bytes } => {
166                let elapsed_secs = self.elapsed().as_secs_f64();
167                if elapsed_secs > 0.0 {
168                    (*total_bytes as f64) / elapsed_secs
169                } else {
170                    0.0
171                }
172            }
173            TransferState::Failed { .. } => 0.0,
174        }
175    }
176}
177
178/// Manager for partial message transfers
179pub struct PartialTransferManager {
180    transfers: Arc<Mutex<HashMap<TransferId, TransferInfo>>>,
181    next_id: Arc<Mutex<u64>>,
182    config: TransferConfig,
183}
184
185impl PartialTransferManager {
186    /// Create a new transfer manager with default configuration
187    pub fn new() -> Self {
188        Self::with_config(TransferConfig::default())
189    }
190
191    /// Create a new transfer manager with custom configuration
192    pub fn with_config(config: TransferConfig) -> Self {
193        info!(
194            chunk_size = config.chunk_size,
195            allow_resume = config.allow_resume,
196            timeout_secs = ?config.timeout_secs,
197            "Creating partial transfer manager"
198        );
199        Self {
200            transfers: Arc::new(Mutex::new(HashMap::new())),
201            next_id: Arc::new(Mutex::new(1)),
202            config,
203        }
204    }
205
206    /// Start a new transfer session
207    ///
208    /// # Arguments
209    /// * `total_bytes` - Total size of data to transfer
210    ///
211    /// # Returns
212    /// Transfer ID for the new session
213    pub async fn start_transfer(&self, total_bytes: usize) -> Result<TransferId> {
214        let mut next_id = self.next_id.lock().await;
215        let id = TransferId(*next_id);
216        *next_id += 1;
217        drop(next_id);
218
219        info!(
220            transfer_id = id.value(),
221            total_bytes = total_bytes,
222            chunk_size = self.config.chunk_size,
223            "Starting new transfer"
224        );
225
226        let now = std::time::Instant::now();
227        let info = TransferInfo {
228            id,
229            state: TransferState::InProgress {
230                bytes_transferred: 0,
231                total_bytes,
232                chunk_index: 0,
233            },
234            config: self.config.clone(),
235            started_at: now,
236            updated_at: now,
237        };
238
239        self.transfers.lock().await.insert(id, info);
240        Ok(id)
241    }
242
243    /// Update transfer progress
244    ///
245    /// # Arguments
246    /// * `id` - Transfer ID
247    /// * `bytes_transferred` - Total bytes transferred so far
248    /// * `chunk_index` - Current chunk index
249    pub async fn update_progress(
250        &self,
251        id: TransferId,
252        bytes_transferred: usize,
253        chunk_index: usize,
254    ) -> Result<()> {
255        let mut transfers = self.transfers.lock().await;
256
257        let info = transfers.get_mut(&id).ok_or_else(|| {
258            warn!(transfer_id = id.value(), "Transfer not found");
259            IgtlError::Io(std::io::Error::new(
260                std::io::ErrorKind::NotFound,
261                "Transfer not found",
262            ))
263        })?;
264
265        if let TransferState::InProgress { total_bytes, .. } = info.state {
266            let progress = (bytes_transferred as f64 / total_bytes as f64) * 100.0;
267            trace!(
268                transfer_id = id.value(),
269                bytes_transferred = bytes_transferred,
270                total_bytes = total_bytes,
271                chunk_index = chunk_index,
272                progress_pct = format!("{:.1}%", progress),
273                "Transfer progress updated"
274            );
275            info.state = TransferState::InProgress {
276                bytes_transferred,
277                total_bytes,
278                chunk_index,
279            };
280            info.updated_at = std::time::Instant::now();
281        } else {
282            warn!(transfer_id = id.value(), "Transfer is not in progress");
283            return Err(IgtlError::Io(std::io::Error::new(
284                std::io::ErrorKind::InvalidInput,
285                "Transfer is not in progress",
286            )));
287        }
288
289        Ok(())
290    }
291
292    /// Mark transfer as completed
293    pub async fn complete_transfer(&self, id: TransferId) -> Result<()> {
294        let mut transfers = self.transfers.lock().await;
295
296        let info = transfers.get_mut(&id).ok_or_else(|| {
297            warn!(transfer_id = id.value(), "Transfer not found");
298            IgtlError::Io(std::io::Error::new(
299                std::io::ErrorKind::NotFound,
300                "Transfer not found",
301            ))
302        })?;
303
304        if let TransferState::InProgress { total_bytes, .. } = info.state {
305            let elapsed = info.elapsed().as_secs_f64();
306            let speed_mbps = if elapsed > 0.0 {
307                (total_bytes as f64) / elapsed / 1_000_000.0
308            } else {
309                0.0
310            };
311            info!(
312                transfer_id = id.value(),
313                total_bytes = total_bytes,
314                elapsed_secs = format!("{:.2}", elapsed),
315                speed_mbps = format!("{:.2}", speed_mbps),
316                "Transfer completed"
317            );
318            info.state = TransferState::Completed { total_bytes };
319            info.updated_at = std::time::Instant::now();
320        }
321
322        Ok(())
323    }
324
325    /// Interrupt transfer (can be resumed if configured)
326    pub async fn interrupt_transfer(&self, id: TransferId) -> Result<()> {
327        let mut transfers = self.transfers.lock().await;
328
329        let info = transfers.get_mut(&id).ok_or_else(|| {
330            warn!(transfer_id = id.value(), "Transfer not found");
331            IgtlError::Io(std::io::Error::new(
332                std::io::ErrorKind::NotFound,
333                "Transfer not found",
334            ))
335        })?;
336
337        if let TransferState::InProgress {
338            bytes_transferred,
339            total_bytes,
340            ..
341        } = info.state
342        {
343            let resumable = info.config.allow_resume;
344            warn!(
345                transfer_id = id.value(),
346                bytes_transferred = bytes_transferred,
347                total_bytes = total_bytes,
348                resumable = resumable,
349                "Transfer interrupted"
350            );
351            info.state = TransferState::Interrupted {
352                bytes_transferred,
353                total_bytes,
354                resumable,
355            };
356            info.updated_at = std::time::Instant::now();
357        }
358
359        Ok(())
360    }
361
362    /// Resume an interrupted transfer
363    pub async fn resume_transfer(&self, id: TransferId) -> Result<usize> {
364        let mut transfers = self.transfers.lock().await;
365
366        let info = transfers.get_mut(&id).ok_or_else(|| {
367            warn!(transfer_id = id.value(), "Transfer not found");
368            IgtlError::Io(std::io::Error::new(
369                std::io::ErrorKind::NotFound,
370                "Transfer not found",
371            ))
372        })?;
373
374        match info.state {
375            TransferState::Interrupted {
376                bytes_transferred,
377                total_bytes,
378                resumable: true,
379            } => {
380                let chunk_index = bytes_transferred / info.config.chunk_size;
381                info!(
382                    transfer_id = id.value(),
383                    resuming_from = bytes_transferred,
384                    total_bytes = total_bytes,
385                    chunk_index = chunk_index,
386                    "Resuming transfer"
387                );
388                info.state = TransferState::InProgress {
389                    bytes_transferred,
390                    total_bytes,
391                    chunk_index,
392                };
393                info.updated_at = std::time::Instant::now();
394                Ok(bytes_transferred)
395            }
396            TransferState::Interrupted {
397                resumable: false, ..
398            } => {
399                warn!(transfer_id = id.value(), "Transfer is not resumable");
400                Err(IgtlError::Io(std::io::Error::new(
401                    std::io::ErrorKind::InvalidInput,
402                    "Transfer is not resumable",
403                )))
404            }
405            _ => {
406                warn!(transfer_id = id.value(), "Transfer is not interrupted");
407                Err(IgtlError::Io(std::io::Error::new(
408                    std::io::ErrorKind::InvalidInput,
409                    "Transfer is not interrupted",
410                )))
411            }
412        }
413    }
414
415    /// Fail a transfer with error message
416    pub async fn fail_transfer(&self, id: TransferId, error: String) -> Result<()> {
417        let mut transfers = self.transfers.lock().await;
418
419        if let Some(info) = transfers.get_mut(&id) {
420            warn!(
421                transfer_id = id.value(),
422                error = %error,
423                "Transfer failed"
424            );
425            info.state = TransferState::Failed { error };
426            info.updated_at = std::time::Instant::now();
427        }
428
429        Ok(())
430    }
431
432    /// Get transfer information
433    pub async fn get_transfer(&self, id: TransferId) -> Option<TransferInfo> {
434        self.transfers.lock().await.get(&id).cloned()
435    }
436
437    /// Get all active transfers
438    pub async fn active_transfers(&self) -> Vec<TransferInfo> {
439        self.transfers
440            .lock()
441            .await
442            .values()
443            .filter(|info| matches!(info.state, TransferState::InProgress { .. }))
444            .cloned()
445            .collect()
446    }
447
448    /// Remove completed or failed transfers
449    pub async fn cleanup_completed(&self) {
450        self.transfers.lock().await.retain(|_, info| {
451            !matches!(
452                info.state,
453                TransferState::Completed { .. } | TransferState::Failed { .. }
454            )
455        });
456    }
457
458    /// Remove transfers that have timed out
459    pub async fn cleanup_timed_out(&self) {
460        let config = &self.config;
461        if let Some(timeout_secs) = config.timeout_secs {
462            let timeout = std::time::Duration::from_secs(timeout_secs);
463            let mut transfers = self.transfers.lock().await;
464            let before_count = transfers.len();
465            transfers.retain(|id, info| {
466                let keep = info.idle_time() < timeout;
467                if !keep {
468                    debug!(
469                        transfer_id = id.value(),
470                        idle_time_secs = info.idle_time().as_secs(),
471                        "Removing timed out transfer"
472                    );
473                }
474                keep
475            });
476            let removed = before_count - transfers.len();
477            if removed > 0 {
478                info!(removed_count = removed, "Cleaned up timed out transfers");
479            }
480        }
481    }
482}
483
484impl Default for PartialTransferManager {
485    fn default() -> Self {
486        Self::new()
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[tokio::test]
495    async fn test_start_transfer() {
496        let manager = PartialTransferManager::new();
497        let id = manager.start_transfer(1000000).await.unwrap();
498
499        let info = manager.get_transfer(id).await.unwrap();
500        assert!(matches!(info.state, TransferState::InProgress { .. }));
501    }
502
503    #[tokio::test]
504    async fn test_update_progress() {
505        let manager = PartialTransferManager::new();
506        let id = manager.start_transfer(1000000).await.unwrap();
507
508        manager.update_progress(id, 500000, 5).await.unwrap();
509
510        let info = manager.get_transfer(id).await.unwrap();
511        assert_eq!(info.state.progress(), 0.5);
512    }
513
514    #[tokio::test]
515    async fn test_complete_transfer() {
516        let manager = PartialTransferManager::new();
517        let id = manager.start_transfer(1000000).await.unwrap();
518
519        manager.update_progress(id, 1000000, 10).await.unwrap();
520        manager.complete_transfer(id).await.unwrap();
521
522        let info = manager.get_transfer(id).await.unwrap();
523        assert!(info.state.is_complete());
524        assert_eq!(info.state.progress(), 1.0);
525    }
526
527    #[tokio::test]
528    async fn test_interrupt_and_resume() {
529        let manager = PartialTransferManager::new();
530        let id = manager.start_transfer(1000000).await.unwrap();
531
532        manager.update_progress(id, 500000, 5).await.unwrap();
533        manager.interrupt_transfer(id).await.unwrap();
534
535        let info = manager.get_transfer(id).await.unwrap();
536        assert!(info.state.is_resumable());
537
538        let resumed_at = manager.resume_transfer(id).await.unwrap();
539        assert_eq!(resumed_at, 500000);
540
541        let info = manager.get_transfer(id).await.unwrap();
542        assert!(matches!(info.state, TransferState::InProgress { .. }));
543    }
544
545    #[tokio::test]
546    async fn test_fail_transfer() {
547        let manager = PartialTransferManager::new();
548        let id = manager.start_transfer(1000000).await.unwrap();
549
550        manager
551            .fail_transfer(id, "Network error".to_string())
552            .await
553            .unwrap();
554
555        let info = manager.get_transfer(id).await.unwrap();
556        assert!(matches!(info.state, TransferState::Failed { .. }));
557    }
558
559    #[tokio::test]
560    async fn test_active_transfers() {
561        let manager = PartialTransferManager::new();
562
563        let id1 = manager.start_transfer(1000000).await.unwrap();
564        let id2 = manager.start_transfer(2000000).await.unwrap();
565        let id3 = manager.start_transfer(3000000).await.unwrap();
566
567        manager.complete_transfer(id1).await.unwrap();
568        manager.interrupt_transfer(id2).await.unwrap();
569
570        let active = manager.active_transfers().await;
571        assert_eq!(active.len(), 1);
572        assert_eq!(active[0].id, id3);
573    }
574
575    #[tokio::test]
576    async fn test_cleanup_completed() {
577        let manager = PartialTransferManager::new();
578
579        let id1 = manager.start_transfer(1000000).await.unwrap();
580        let id2 = manager.start_transfer(2000000).await.unwrap();
581
582        manager.complete_transfer(id1).await.unwrap();
583
584        manager.cleanup_completed().await;
585
586        assert!(manager.get_transfer(id1).await.is_none());
587        assert!(manager.get_transfer(id2).await.is_some());
588    }
589
590    #[tokio::test]
591    async fn test_transfer_speed() {
592        let manager = PartialTransferManager::new();
593        let id = manager.start_transfer(1000000).await.unwrap();
594
595        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
596        manager.update_progress(id, 500000, 5).await.unwrap();
597
598        let info = manager.get_transfer(id).await.unwrap();
599        let speed = info.speed_bps();
600
601        // Should be around 5 MB/s (500000 bytes in ~0.1 sec)
602        assert!(speed > 1_000_000.0); // At least 1 MB/s
603    }
604}