1use crate::error::{IgtlError, Result};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tracing::{debug, info, trace, warn};
11
12#[derive(Debug, Clone)]
14pub struct TransferConfig {
15 pub chunk_size: usize,
17 pub allow_resume: bool,
19 pub timeout_secs: Option<u64>,
21}
22
23impl Default for TransferConfig {
24 fn default() -> Self {
25 Self {
26 chunk_size: 65536, allow_resume: true,
28 timeout_secs: Some(300), }
30 }
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct TransferId(u64);
36
37impl TransferId {
38 pub fn new(id: u64) -> Self {
39 Self(id)
40 }
41
42 pub fn value(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(Debug, Clone)]
49pub enum TransferState {
50 InProgress {
52 bytes_transferred: usize,
54 total_bytes: usize,
56 chunk_index: usize,
58 },
59 Completed {
61 total_bytes: usize,
63 },
64 Interrupted {
66 bytes_transferred: usize,
68 total_bytes: usize,
70 resumable: bool,
72 },
73 Failed {
75 error: String,
77 },
78}
79
80impl TransferState {
81 pub fn progress(&self) -> f64 {
83 match self {
84 Self::InProgress { bytes_transferred, total_bytes, .. } => {
85 if *total_bytes > 0 {
86 (*bytes_transferred as f64) / (*total_bytes as f64)
87 } else {
88 0.0
89 }
90 }
91 Self::Completed { .. } => 1.0,
92 Self::Interrupted { bytes_transferred, total_bytes, .. } => {
93 if *total_bytes > 0 {
94 (*bytes_transferred as f64) / (*total_bytes as f64)
95 } else {
96 0.0
97 }
98 }
99 Self::Failed { .. } => 0.0,
100 }
101 }
102
103 pub fn is_complete(&self) -> bool {
105 matches!(self, Self::Completed { .. })
106 }
107
108 pub fn is_resumable(&self) -> bool {
110 matches!(self, Self::Interrupted { resumable: true, .. })
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct TransferInfo {
117 pub id: TransferId,
118 pub state: TransferState,
119 pub config: TransferConfig,
120 pub started_at: std::time::Instant,
121 pub updated_at: std::time::Instant,
122}
123
124impl TransferInfo {
125 pub fn elapsed(&self) -> std::time::Duration {
127 self.started_at.elapsed()
128 }
129
130 pub fn idle_time(&self) -> std::time::Duration {
132 self.updated_at.elapsed()
133 }
134
135 pub fn speed_bps(&self) -> f64 {
137 match &self.state {
138 TransferState::InProgress { bytes_transferred, .. }
139 | TransferState::Interrupted { bytes_transferred, .. } => {
140 let elapsed_secs = self.elapsed().as_secs_f64();
141 if elapsed_secs > 0.0 {
142 (*bytes_transferred as f64) / elapsed_secs
143 } else {
144 0.0
145 }
146 }
147 TransferState::Completed { total_bytes } => {
148 let elapsed_secs = self.elapsed().as_secs_f64();
149 if elapsed_secs > 0.0 {
150 (*total_bytes as f64) / elapsed_secs
151 } else {
152 0.0
153 }
154 }
155 TransferState::Failed { .. } => 0.0,
156 }
157 }
158}
159
160pub struct PartialTransferManager {
162 transfers: Arc<Mutex<HashMap<TransferId, TransferInfo>>>,
163 next_id: Arc<Mutex<u64>>,
164 config: TransferConfig,
165}
166
167impl PartialTransferManager {
168 pub fn new() -> Self {
170 Self::with_config(TransferConfig::default())
171 }
172
173 pub fn with_config(config: TransferConfig) -> Self {
175 info!(
176 chunk_size = config.chunk_size,
177 allow_resume = config.allow_resume,
178 timeout_secs = ?config.timeout_secs,
179 "Creating partial transfer manager"
180 );
181 Self {
182 transfers: Arc::new(Mutex::new(HashMap::new())),
183 next_id: Arc::new(Mutex::new(1)),
184 config,
185 }
186 }
187
188 pub async fn start_transfer(&self, total_bytes: usize) -> Result<TransferId> {
196 let mut next_id = self.next_id.lock().await;
197 let id = TransferId(*next_id);
198 *next_id += 1;
199 drop(next_id);
200
201 info!(
202 transfer_id = id.value(),
203 total_bytes = total_bytes,
204 chunk_size = self.config.chunk_size,
205 "Starting new transfer"
206 );
207
208 let now = std::time::Instant::now();
209 let info = TransferInfo {
210 id,
211 state: TransferState::InProgress {
212 bytes_transferred: 0,
213 total_bytes,
214 chunk_index: 0,
215 },
216 config: self.config.clone(),
217 started_at: now,
218 updated_at: now,
219 };
220
221 self.transfers.lock().await.insert(id, info);
222 Ok(id)
223 }
224
225 pub async fn update_progress(
232 &self,
233 id: TransferId,
234 bytes_transferred: usize,
235 chunk_index: usize,
236 ) -> Result<()> {
237 let mut transfers = self.transfers.lock().await;
238
239 let info = transfers.get_mut(&id).ok_or_else(|| {
240 warn!(transfer_id = id.value(), "Transfer not found");
241 IgtlError::Io(std::io::Error::new(
242 std::io::ErrorKind::NotFound,
243 "Transfer not found",
244 ))
245 })?;
246
247 if let TransferState::InProgress { total_bytes, .. } = info.state {
248 let progress = (bytes_transferred as f64 / total_bytes as f64) * 100.0;
249 trace!(
250 transfer_id = id.value(),
251 bytes_transferred = bytes_transferred,
252 total_bytes = total_bytes,
253 chunk_index = chunk_index,
254 progress_pct = format!("{:.1}%", progress),
255 "Transfer progress updated"
256 );
257 info.state = TransferState::InProgress {
258 bytes_transferred,
259 total_bytes,
260 chunk_index,
261 };
262 info.updated_at = std::time::Instant::now();
263 } else {
264 warn!(transfer_id = id.value(), "Transfer is not in progress");
265 return Err(IgtlError::Io(std::io::Error::new(
266 std::io::ErrorKind::InvalidInput,
267 "Transfer is not in progress",
268 )));
269 }
270
271 Ok(())
272 }
273
274 pub async fn complete_transfer(&self, id: TransferId) -> Result<()> {
276 let mut transfers = self.transfers.lock().await;
277
278 let info = transfers.get_mut(&id).ok_or_else(|| {
279 warn!(transfer_id = id.value(), "Transfer not found");
280 IgtlError::Io(std::io::Error::new(
281 std::io::ErrorKind::NotFound,
282 "Transfer not found",
283 ))
284 })?;
285
286 if let TransferState::InProgress { total_bytes, .. } = info.state {
287 let elapsed = info.elapsed().as_secs_f64();
288 let speed_mbps = if elapsed > 0.0 {
289 (total_bytes as f64) / elapsed / 1_000_000.0
290 } else {
291 0.0
292 };
293 info!(
294 transfer_id = id.value(),
295 total_bytes = total_bytes,
296 elapsed_secs = format!("{:.2}", elapsed),
297 speed_mbps = format!("{:.2}", speed_mbps),
298 "Transfer completed"
299 );
300 info.state = TransferState::Completed { total_bytes };
301 info.updated_at = std::time::Instant::now();
302 }
303
304 Ok(())
305 }
306
307 pub async fn interrupt_transfer(&self, id: TransferId) -> Result<()> {
309 let mut transfers = self.transfers.lock().await;
310
311 let info = transfers.get_mut(&id).ok_or_else(|| {
312 warn!(transfer_id = id.value(), "Transfer not found");
313 IgtlError::Io(std::io::Error::new(
314 std::io::ErrorKind::NotFound,
315 "Transfer not found",
316 ))
317 })?;
318
319 if let TransferState::InProgress {
320 bytes_transferred,
321 total_bytes,
322 ..
323 } = info.state
324 {
325 let resumable = info.config.allow_resume;
326 warn!(
327 transfer_id = id.value(),
328 bytes_transferred = bytes_transferred,
329 total_bytes = total_bytes,
330 resumable = resumable,
331 "Transfer interrupted"
332 );
333 info.state = TransferState::Interrupted {
334 bytes_transferred,
335 total_bytes,
336 resumable,
337 };
338 info.updated_at = std::time::Instant::now();
339 }
340
341 Ok(())
342 }
343
344 pub async fn resume_transfer(&self, id: TransferId) -> Result<usize> {
346 let mut transfers = self.transfers.lock().await;
347
348 let info = transfers.get_mut(&id).ok_or_else(|| {
349 warn!(transfer_id = id.value(), "Transfer not found");
350 IgtlError::Io(std::io::Error::new(
351 std::io::ErrorKind::NotFound,
352 "Transfer not found",
353 ))
354 })?;
355
356 match info.state {
357 TransferState::Interrupted {
358 bytes_transferred,
359 total_bytes,
360 resumable: true,
361 } => {
362 let chunk_index = bytes_transferred / info.config.chunk_size;
363 info!(
364 transfer_id = id.value(),
365 resuming_from = bytes_transferred,
366 total_bytes = total_bytes,
367 chunk_index = chunk_index,
368 "Resuming transfer"
369 );
370 info.state = TransferState::InProgress {
371 bytes_transferred,
372 total_bytes,
373 chunk_index,
374 };
375 info.updated_at = std::time::Instant::now();
376 Ok(bytes_transferred)
377 }
378 TransferState::Interrupted { resumable: false, .. } => {
379 warn!(transfer_id = id.value(), "Transfer is not resumable");
380 Err(IgtlError::Io(
381 std::io::Error::new(
382 std::io::ErrorKind::InvalidInput,
383 "Transfer is not resumable",
384 ),
385 ))
386 }
387 _ => {
388 warn!(transfer_id = id.value(), "Transfer is not interrupted");
389 Err(IgtlError::Io(std::io::Error::new(
390 std::io::ErrorKind::InvalidInput,
391 "Transfer is not interrupted",
392 )))
393 }
394 }
395 }
396
397 pub async fn fail_transfer(&self, id: TransferId, error: String) -> Result<()> {
399 let mut transfers = self.transfers.lock().await;
400
401 if let Some(info) = transfers.get_mut(&id) {
402 warn!(
403 transfer_id = id.value(),
404 error = %error,
405 "Transfer failed"
406 );
407 info.state = TransferState::Failed { error };
408 info.updated_at = std::time::Instant::now();
409 }
410
411 Ok(())
412 }
413
414 pub async fn get_transfer(&self, id: TransferId) -> Option<TransferInfo> {
416 self.transfers.lock().await.get(&id).cloned()
417 }
418
419 pub async fn active_transfers(&self) -> Vec<TransferInfo> {
421 self.transfers
422 .lock()
423 .await
424 .values()
425 .filter(|info| matches!(info.state, TransferState::InProgress { .. }))
426 .cloned()
427 .collect()
428 }
429
430 pub async fn cleanup_completed(&self) {
432 self.transfers.lock().await.retain(|_, info| {
433 !matches!(
434 info.state,
435 TransferState::Completed { .. } | TransferState::Failed { .. }
436 )
437 });
438 }
439
440 pub async fn cleanup_timed_out(&self) {
442 let config = &self.config;
443 if let Some(timeout_secs) = config.timeout_secs {
444 let timeout = std::time::Duration::from_secs(timeout_secs);
445 let mut transfers = self.transfers.lock().await;
446 let before_count = transfers.len();
447 transfers.retain(|id, info| {
448 let keep = info.idle_time() < timeout;
449 if !keep {
450 debug!(
451 transfer_id = id.value(),
452 idle_time_secs = info.idle_time().as_secs(),
453 "Removing timed out transfer"
454 );
455 }
456 keep
457 });
458 let removed = before_count - transfers.len();
459 if removed > 0 {
460 info!(removed_count = removed, "Cleaned up timed out transfers");
461 }
462 }
463 }
464}
465
466impl Default for PartialTransferManager {
467 fn default() -> Self {
468 Self::new()
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 #[tokio::test]
477 async fn test_start_transfer() {
478 let manager = PartialTransferManager::new();
479 let id = manager.start_transfer(1000000).await.unwrap();
480
481 let info = manager.get_transfer(id).await.unwrap();
482 assert!(matches!(info.state, TransferState::InProgress { .. }));
483 }
484
485 #[tokio::test]
486 async fn test_update_progress() {
487 let manager = PartialTransferManager::new();
488 let id = manager.start_transfer(1000000).await.unwrap();
489
490 manager.update_progress(id, 500000, 5).await.unwrap();
491
492 let info = manager.get_transfer(id).await.unwrap();
493 assert_eq!(info.state.progress(), 0.5);
494 }
495
496 #[tokio::test]
497 async fn test_complete_transfer() {
498 let manager = PartialTransferManager::new();
499 let id = manager.start_transfer(1000000).await.unwrap();
500
501 manager.update_progress(id, 1000000, 10).await.unwrap();
502 manager.complete_transfer(id).await.unwrap();
503
504 let info = manager.get_transfer(id).await.unwrap();
505 assert!(info.state.is_complete());
506 assert_eq!(info.state.progress(), 1.0);
507 }
508
509 #[tokio::test]
510 async fn test_interrupt_and_resume() {
511 let manager = PartialTransferManager::new();
512 let id = manager.start_transfer(1000000).await.unwrap();
513
514 manager.update_progress(id, 500000, 5).await.unwrap();
515 manager.interrupt_transfer(id).await.unwrap();
516
517 let info = manager.get_transfer(id).await.unwrap();
518 assert!(info.state.is_resumable());
519
520 let resumed_at = manager.resume_transfer(id).await.unwrap();
521 assert_eq!(resumed_at, 500000);
522
523 let info = manager.get_transfer(id).await.unwrap();
524 assert!(matches!(info.state, TransferState::InProgress { .. }));
525 }
526
527 #[tokio::test]
528 async fn test_fail_transfer() {
529 let manager = PartialTransferManager::new();
530 let id = manager.start_transfer(1000000).await.unwrap();
531
532 manager
533 .fail_transfer(id, "Network error".to_string())
534 .await
535 .unwrap();
536
537 let info = manager.get_transfer(id).await.unwrap();
538 assert!(matches!(info.state, TransferState::Failed { .. }));
539 }
540
541 #[tokio::test]
542 async fn test_active_transfers() {
543 let manager = PartialTransferManager::new();
544
545 let id1 = manager.start_transfer(1000000).await.unwrap();
546 let id2 = manager.start_transfer(2000000).await.unwrap();
547 let id3 = manager.start_transfer(3000000).await.unwrap();
548
549 manager.complete_transfer(id1).await.unwrap();
550 manager.interrupt_transfer(id2).await.unwrap();
551
552 let active = manager.active_transfers().await;
553 assert_eq!(active.len(), 1);
554 assert_eq!(active[0].id, id3);
555 }
556
557 #[tokio::test]
558 async fn test_cleanup_completed() {
559 let manager = PartialTransferManager::new();
560
561 let id1 = manager.start_transfer(1000000).await.unwrap();
562 let id2 = manager.start_transfer(2000000).await.unwrap();
563
564 manager.complete_transfer(id1).await.unwrap();
565
566 manager.cleanup_completed().await;
567
568 assert!(manager.get_transfer(id1).await.is_none());
569 assert!(manager.get_transfer(id2).await.is_some());
570 }
571
572 #[tokio::test]
573 async fn test_transfer_speed() {
574 let manager = PartialTransferManager::new();
575 let id = manager.start_transfer(1000000).await.unwrap();
576
577 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
578 manager.update_progress(id, 500000, 5).await.unwrap();
579
580 let info = manager.get_transfer(id).await.unwrap();
581 let speed = info.speed_bps();
582
583 assert!(speed > 1_000_000.0); }
586}