1use crate::error::{IgtlError, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::{debug, info, trace, warn};
11
12#[derive(Debug, Clone)]
14pub struct TransferConfig {
15 pub chunk_size: usize,
17 pub allow_resume: bool,
19 pub timeout_secs: Option<u64>,
21}
22
23impl Default for TransferConfig {
24 fn default() -> Self {
25 Self {
26 chunk_size: 65536, allow_resume: true,
28 timeout_secs: Some(300), }
30 }
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct TransferId(u64);
36
37impl TransferId {
38 pub fn new(id: u64) -> Self {
39 Self(id)
40 }
41
42 pub fn value(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(Debug, Clone)]
49pub enum TransferState {
50 InProgress {
52 bytes_transferred: usize,
54 total_bytes: usize,
56 chunk_index: usize,
58 },
59 Completed {
61 total_bytes: usize,
63 },
64 Interrupted {
66 bytes_transferred: usize,
68 total_bytes: usize,
70 resumable: bool,
72 },
73 Failed {
75 error: String,
77 },
78}
79
80impl TransferState {
81 pub fn progress(&self) -> f64 {
83 match self {
84 Self::InProgress {
85 bytes_transferred,
86 total_bytes,
87 ..
88 } => {
89 if *total_bytes > 0 {
90 (*bytes_transferred as f64) / (*total_bytes as f64)
91 } else {
92 0.0
93 }
94 }
95 Self::Completed { .. } => 1.0,
96 Self::Interrupted {
97 bytes_transferred,
98 total_bytes,
99 ..
100 } => {
101 if *total_bytes > 0 {
102 (*bytes_transferred as f64) / (*total_bytes as f64)
103 } else {
104 0.0
105 }
106 }
107 Self::Failed { .. } => 0.0,
108 }
109 }
110
111 pub fn is_complete(&self) -> bool {
113 matches!(self, Self::Completed { .. })
114 }
115
116 pub fn is_resumable(&self) -> bool {
118 matches!(
119 self,
120 Self::Interrupted {
121 resumable: true,
122 ..
123 }
124 )
125 }
126}
127
128#[derive(Debug, Clone)]
130pub struct TransferInfo {
131 pub id: TransferId,
132 pub state: TransferState,
133 pub config: TransferConfig,
134 pub started_at: std::time::Instant,
135 pub updated_at: std::time::Instant,
136}
137
138impl TransferInfo {
139 pub fn elapsed(&self) -> std::time::Duration {
141 self.started_at.elapsed()
142 }
143
144 pub fn idle_time(&self) -> std::time::Duration {
146 self.updated_at.elapsed()
147 }
148
149 pub fn speed_bps(&self) -> f64 {
151 match &self.state {
152 TransferState::InProgress {
153 bytes_transferred, ..
154 }
155 | TransferState::Interrupted {
156 bytes_transferred, ..
157 } => {
158 let elapsed_secs = self.elapsed().as_secs_f64();
159 if elapsed_secs > 0.0 {
160 (*bytes_transferred as f64) / elapsed_secs
161 } else {
162 0.0
163 }
164 }
165 TransferState::Completed { total_bytes } => {
166 let elapsed_secs = self.elapsed().as_secs_f64();
167 if elapsed_secs > 0.0 {
168 (*total_bytes as f64) / elapsed_secs
169 } else {
170 0.0
171 }
172 }
173 TransferState::Failed { .. } => 0.0,
174 }
175 }
176}
177
178pub struct PartialTransferManager {
180 transfers: Arc<Mutex<HashMap<TransferId, TransferInfo>>>,
181 next_id: Arc<Mutex<u64>>,
182 config: TransferConfig,
183}
184
185impl PartialTransferManager {
186 pub fn new() -> Self {
188 Self::with_config(TransferConfig::default())
189 }
190
191 pub fn with_config(config: TransferConfig) -> Self {
193 info!(
194 chunk_size = config.chunk_size,
195 allow_resume = config.allow_resume,
196 timeout_secs = ?config.timeout_secs,
197 "Creating partial transfer manager"
198 );
199 Self {
200 transfers: Arc::new(Mutex::new(HashMap::new())),
201 next_id: Arc::new(Mutex::new(1)),
202 config,
203 }
204 }
205
206 pub async fn start_transfer(&self, total_bytes: usize) -> Result<TransferId> {
214 let mut next_id = self.next_id.lock().await;
215 let id = TransferId(*next_id);
216 *next_id += 1;
217 drop(next_id);
218
219 info!(
220 transfer_id = id.value(),
221 total_bytes = total_bytes,
222 chunk_size = self.config.chunk_size,
223 "Starting new transfer"
224 );
225
226 let now = std::time::Instant::now();
227 let info = TransferInfo {
228 id,
229 state: TransferState::InProgress {
230 bytes_transferred: 0,
231 total_bytes,
232 chunk_index: 0,
233 },
234 config: self.config.clone(),
235 started_at: now,
236 updated_at: now,
237 };
238
239 self.transfers.lock().await.insert(id, info);
240 Ok(id)
241 }
242
243 pub async fn update_progress(
250 &self,
251 id: TransferId,
252 bytes_transferred: usize,
253 chunk_index: usize,
254 ) -> Result<()> {
255 let mut transfers = self.transfers.lock().await;
256
257 let info = transfers.get_mut(&id).ok_or_else(|| {
258 warn!(transfer_id = id.value(), "Transfer not found");
259 IgtlError::Io(std::io::Error::new(
260 std::io::ErrorKind::NotFound,
261 "Transfer not found",
262 ))
263 })?;
264
265 if let TransferState::InProgress { total_bytes, .. } = info.state {
266 let progress = (bytes_transferred as f64 / total_bytes as f64) * 100.0;
267 trace!(
268 transfer_id = id.value(),
269 bytes_transferred = bytes_transferred,
270 total_bytes = total_bytes,
271 chunk_index = chunk_index,
272 progress_pct = format!("{:.1}%", progress),
273 "Transfer progress updated"
274 );
275 info.state = TransferState::InProgress {
276 bytes_transferred,
277 total_bytes,
278 chunk_index,
279 };
280 info.updated_at = std::time::Instant::now();
281 } else {
282 warn!(transfer_id = id.value(), "Transfer is not in progress");
283 return Err(IgtlError::Io(std::io::Error::new(
284 std::io::ErrorKind::InvalidInput,
285 "Transfer is not in progress",
286 )));
287 }
288
289 Ok(())
290 }
291
292 pub async fn complete_transfer(&self, id: TransferId) -> Result<()> {
294 let mut transfers = self.transfers.lock().await;
295
296 let info = transfers.get_mut(&id).ok_or_else(|| {
297 warn!(transfer_id = id.value(), "Transfer not found");
298 IgtlError::Io(std::io::Error::new(
299 std::io::ErrorKind::NotFound,
300 "Transfer not found",
301 ))
302 })?;
303
304 if let TransferState::InProgress { total_bytes, .. } = info.state {
305 let elapsed = info.elapsed().as_secs_f64();
306 let speed_mbps = if elapsed > 0.0 {
307 (total_bytes as f64) / elapsed / 1_000_000.0
308 } else {
309 0.0
310 };
311 info!(
312 transfer_id = id.value(),
313 total_bytes = total_bytes,
314 elapsed_secs = format!("{:.2}", elapsed),
315 speed_mbps = format!("{:.2}", speed_mbps),
316 "Transfer completed"
317 );
318 info.state = TransferState::Completed { total_bytes };
319 info.updated_at = std::time::Instant::now();
320 }
321
322 Ok(())
323 }
324
325 pub async fn interrupt_transfer(&self, id: TransferId) -> Result<()> {
327 let mut transfers = self.transfers.lock().await;
328
329 let info = transfers.get_mut(&id).ok_or_else(|| {
330 warn!(transfer_id = id.value(), "Transfer not found");
331 IgtlError::Io(std::io::Error::new(
332 std::io::ErrorKind::NotFound,
333 "Transfer not found",
334 ))
335 })?;
336
337 if let TransferState::InProgress {
338 bytes_transferred,
339 total_bytes,
340 ..
341 } = info.state
342 {
343 let resumable = info.config.allow_resume;
344 warn!(
345 transfer_id = id.value(),
346 bytes_transferred = bytes_transferred,
347 total_bytes = total_bytes,
348 resumable = resumable,
349 "Transfer interrupted"
350 );
351 info.state = TransferState::Interrupted {
352 bytes_transferred,
353 total_bytes,
354 resumable,
355 };
356 info.updated_at = std::time::Instant::now();
357 }
358
359 Ok(())
360 }
361
362 pub async fn resume_transfer(&self, id: TransferId) -> Result<usize> {
364 let mut transfers = self.transfers.lock().await;
365
366 let info = transfers.get_mut(&id).ok_or_else(|| {
367 warn!(transfer_id = id.value(), "Transfer not found");
368 IgtlError::Io(std::io::Error::new(
369 std::io::ErrorKind::NotFound,
370 "Transfer not found",
371 ))
372 })?;
373
374 match info.state {
375 TransferState::Interrupted {
376 bytes_transferred,
377 total_bytes,
378 resumable: true,
379 } => {
380 let chunk_index = bytes_transferred / info.config.chunk_size;
381 info!(
382 transfer_id = id.value(),
383 resuming_from = bytes_transferred,
384 total_bytes = total_bytes,
385 chunk_index = chunk_index,
386 "Resuming transfer"
387 );
388 info.state = TransferState::InProgress {
389 bytes_transferred,
390 total_bytes,
391 chunk_index,
392 };
393 info.updated_at = std::time::Instant::now();
394 Ok(bytes_transferred)
395 }
396 TransferState::Interrupted {
397 resumable: false, ..
398 } => {
399 warn!(transfer_id = id.value(), "Transfer is not resumable");
400 Err(IgtlError::Io(std::io::Error::new(
401 std::io::ErrorKind::InvalidInput,
402 "Transfer is not resumable",
403 )))
404 }
405 _ => {
406 warn!(transfer_id = id.value(), "Transfer is not interrupted");
407 Err(IgtlError::Io(std::io::Error::new(
408 std::io::ErrorKind::InvalidInput,
409 "Transfer is not interrupted",
410 )))
411 }
412 }
413 }
414
415 pub async fn fail_transfer(&self, id: TransferId, error: String) -> Result<()> {
417 let mut transfers = self.transfers.lock().await;
418
419 if let Some(info) = transfers.get_mut(&id) {
420 warn!(
421 transfer_id = id.value(),
422 error = %error,
423 "Transfer failed"
424 );
425 info.state = TransferState::Failed { error };
426 info.updated_at = std::time::Instant::now();
427 }
428
429 Ok(())
430 }
431
432 pub async fn get_transfer(&self, id: TransferId) -> Option<TransferInfo> {
434 self.transfers.lock().await.get(&id).cloned()
435 }
436
437 pub async fn active_transfers(&self) -> Vec<TransferInfo> {
439 self.transfers
440 .lock()
441 .await
442 .values()
443 .filter(|info| matches!(info.state, TransferState::InProgress { .. }))
444 .cloned()
445 .collect()
446 }
447
448 pub async fn cleanup_completed(&self) {
450 self.transfers.lock().await.retain(|_, info| {
451 !matches!(
452 info.state,
453 TransferState::Completed { .. } | TransferState::Failed { .. }
454 )
455 });
456 }
457
458 pub async fn cleanup_timed_out(&self) {
460 let config = &self.config;
461 if let Some(timeout_secs) = config.timeout_secs {
462 let timeout = std::time::Duration::from_secs(timeout_secs);
463 let mut transfers = self.transfers.lock().await;
464 let before_count = transfers.len();
465 transfers.retain(|id, info| {
466 let keep = info.idle_time() < timeout;
467 if !keep {
468 debug!(
469 transfer_id = id.value(),
470 idle_time_secs = info.idle_time().as_secs(),
471 "Removing timed out transfer"
472 );
473 }
474 keep
475 });
476 let removed = before_count - transfers.len();
477 if removed > 0 {
478 info!(removed_count = removed, "Cleaned up timed out transfers");
479 }
480 }
481 }
482}
483
484impl Default for PartialTransferManager {
485 fn default() -> Self {
486 Self::new()
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[tokio::test]
495 async fn test_start_transfer() {
496 let manager = PartialTransferManager::new();
497 let id = manager.start_transfer(1000000).await.unwrap();
498
499 let info = manager.get_transfer(id).await.unwrap();
500 assert!(matches!(info.state, TransferState::InProgress { .. }));
501 }
502
503 #[tokio::test]
504 async fn test_update_progress() {
505 let manager = PartialTransferManager::new();
506 let id = manager.start_transfer(1000000).await.unwrap();
507
508 manager.update_progress(id, 500000, 5).await.unwrap();
509
510 let info = manager.get_transfer(id).await.unwrap();
511 assert_eq!(info.state.progress(), 0.5);
512 }
513
514 #[tokio::test]
515 async fn test_complete_transfer() {
516 let manager = PartialTransferManager::new();
517 let id = manager.start_transfer(1000000).await.unwrap();
518
519 manager.update_progress(id, 1000000, 10).await.unwrap();
520 manager.complete_transfer(id).await.unwrap();
521
522 let info = manager.get_transfer(id).await.unwrap();
523 assert!(info.state.is_complete());
524 assert_eq!(info.state.progress(), 1.0);
525 }
526
527 #[tokio::test]
528 async fn test_interrupt_and_resume() {
529 let manager = PartialTransferManager::new();
530 let id = manager.start_transfer(1000000).await.unwrap();
531
532 manager.update_progress(id, 500000, 5).await.unwrap();
533 manager.interrupt_transfer(id).await.unwrap();
534
535 let info = manager.get_transfer(id).await.unwrap();
536 assert!(info.state.is_resumable());
537
538 let resumed_at = manager.resume_transfer(id).await.unwrap();
539 assert_eq!(resumed_at, 500000);
540
541 let info = manager.get_transfer(id).await.unwrap();
542 assert!(matches!(info.state, TransferState::InProgress { .. }));
543 }
544
545 #[tokio::test]
546 async fn test_fail_transfer() {
547 let manager = PartialTransferManager::new();
548 let id = manager.start_transfer(1000000).await.unwrap();
549
550 manager
551 .fail_transfer(id, "Network error".to_string())
552 .await
553 .unwrap();
554
555 let info = manager.get_transfer(id).await.unwrap();
556 assert!(matches!(info.state, TransferState::Failed { .. }));
557 }
558
559 #[tokio::test]
560 async fn test_active_transfers() {
561 let manager = PartialTransferManager::new();
562
563 let id1 = manager.start_transfer(1000000).await.unwrap();
564 let id2 = manager.start_transfer(2000000).await.unwrap();
565 let id3 = manager.start_transfer(3000000).await.unwrap();
566
567 manager.complete_transfer(id1).await.unwrap();
568 manager.interrupt_transfer(id2).await.unwrap();
569
570 let active = manager.active_transfers().await;
571 assert_eq!(active.len(), 1);
572 assert_eq!(active[0].id, id3);
573 }
574
575 #[tokio::test]
576 async fn test_cleanup_completed() {
577 let manager = PartialTransferManager::new();
578
579 let id1 = manager.start_transfer(1000000).await.unwrap();
580 let id2 = manager.start_transfer(2000000).await.unwrap();
581
582 manager.complete_transfer(id1).await.unwrap();
583
584 manager.cleanup_completed().await;
585
586 assert!(manager.get_transfer(id1).await.is_none());
587 assert!(manager.get_transfer(id2).await.is_some());
588 }
589
590 #[tokio::test]
591 async fn test_transfer_speed() {
592 let manager = PartialTransferManager::new();
593 let id = manager.start_transfer(1000000).await.unwrap();
594
595 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
596 manager.update_progress(id, 500000, 5).await.unwrap();
597
598 let info = manager.get_transfer(id).await.unwrap();
599 let speed = info.speed_bps();
600
601 assert!(speed > 1_000_000.0); }
604}