1use percent_encoding::percent_decode_str;
8use std::collections::HashMap;
9use std::fs::{self, File};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15const UPLOAD_SESSION_TIMEOUT: Duration = Duration::from_secs(3600);
18
19#[derive(Debug, thiserror::Error)]
21pub enum ChunkedUploadError {
22 #[error("IO error: {0}")]
23 Io(#[from] io::Error),
24 #[error("Upload session not found: {0}")]
25 SessionNotFound(String),
26 #[error("Invalid chunk: expected {expected}, got {actual}")]
27 InvalidChunk { expected: usize, actual: usize },
28 #[error("Chunk out of order: expected {expected}, got {actual}")]
29 ChunkOutOfOrder { expected: usize, actual: usize },
30 #[error("Upload already completed")]
31 AlreadyCompleted,
32 #[error("Checksum mismatch")]
33 ChecksumMismatch,
34}
35
36#[derive(Debug, Clone)]
38pub struct UploadProgress {
39 pub bytes_uploaded: usize,
41 pub total_bytes: usize,
43 pub percentage: f64,
45 pub chunks_uploaded: usize,
47 pub total_chunks: usize,
49 pub started_at: Instant,
51 pub estimated_time_remaining: Option<f64>,
53 pub upload_speed: f64,
55}
56
57impl UploadProgress {
58 fn new(total_bytes: usize, total_chunks: usize) -> Self {
60 Self {
61 bytes_uploaded: 0,
62 total_bytes,
63 percentage: 0.0,
64 chunks_uploaded: 0,
65 total_chunks,
66 started_at: Instant::now(),
67 estimated_time_remaining: None,
68 upload_speed: 0.0,
69 }
70 }
71
72 fn update(&mut self, chunk_size: usize) {
74 self.chunks_uploaded += 1;
75 self.bytes_uploaded += chunk_size;
76
77 if self.chunks_uploaded >= self.total_chunks {
81 self.total_bytes = self.bytes_uploaded;
82 }
83
84 self.percentage = if self.total_bytes > 0 {
85 (self.bytes_uploaded as f64 / self.total_bytes as f64) * 100.0
86 } else {
87 0.0
88 };
89
90 let elapsed = self.started_at.elapsed().as_secs_f64();
92 if elapsed > 0.0 {
93 self.upload_speed = self.bytes_uploaded as f64 / elapsed;
94
95 let bytes_remaining = self.total_bytes.saturating_sub(self.bytes_uploaded);
97 if self.upload_speed > 0.0 {
98 self.estimated_time_remaining = Some(bytes_remaining as f64 / self.upload_speed);
99 }
100 }
101 }
102
103 pub fn is_complete(&self) -> bool {
105 self.chunks_uploaded >= self.total_chunks
106 }
107
108 pub fn formatted_speed(&self) -> String {
110 if self.upload_speed < 1024.0 {
111 format!("{:.2} B/s", self.upload_speed)
112 } else if self.upload_speed < 1024.0 * 1024.0 {
113 format!("{:.2} KB/s", self.upload_speed / 1024.0)
114 } else {
115 format!("{:.2} MB/s", self.upload_speed / (1024.0 * 1024.0))
116 }
117 }
118
119 pub fn formatted_eta(&self) -> String {
121 match self.estimated_time_remaining {
122 Some(seconds) => {
123 let mins = (seconds / 60.0) as u64;
124 let secs = (seconds % 60.0) as u64;
125 if mins > 0 {
126 format!("{}m {}s", mins, secs)
127 } else {
128 format!("{}s", secs)
129 }
130 }
131 None => "Unknown".to_string(),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
138pub struct ChunkedUploadSession {
139 pub session_id: String,
141 pub filename: String,
143 pub total_size: usize,
145 pub chunk_size: usize,
147 pub total_chunks: usize,
149 pub received_chunks: usize,
151 pub temp_dir: PathBuf,
153 pub completed: bool,
155 progress: UploadProgress,
157 created_at: Instant,
159}
160
161impl ChunkedUploadSession {
162 pub fn new(
166 session_id: String,
167 filename: String,
168 total_size: usize,
169 chunk_size: usize,
170 temp_dir: PathBuf,
171 ) -> Result<Self, ChunkedUploadError> {
172 if chunk_size == 0 {
173 return Err(ChunkedUploadError::InvalidChunk {
174 expected: 1,
175 actual: 0,
176 });
177 }
178 let total_chunks = total_size.div_ceil(chunk_size);
179 Ok(Self {
180 session_id,
181 filename,
182 total_size,
183 chunk_size,
184 total_chunks,
185 received_chunks: 0,
186 temp_dir,
187 completed: false,
188 progress: UploadProgress::new(total_size, total_chunks),
189 created_at: Instant::now(),
190 })
191 }
192
193 pub fn progress(&self) -> f64 {
195 self.progress.percentage
196 }
197
198 pub fn get_progress(&self) -> &UploadProgress {
217 &self.progress
218 }
219
220 #[doc(hidden)]
224 pub fn update_progress(&mut self, chunk_size: usize) {
225 self.progress.update(chunk_size);
226 }
227
228 pub fn is_complete(&self) -> bool {
230 self.completed || self.received_chunks >= self.total_chunks
231 }
232
233 fn chunk_path(&self, chunk_number: usize) -> PathBuf {
238 self.temp_dir
239 .join(format!("{}_{}.chunk", self.session_id, chunk_number))
240 }
241}
242
243pub struct ChunkedUploadManager {
245 sessions: Arc<Mutex<HashMap<String, ChunkedUploadSession>>>,
246 temp_base_dir: PathBuf,
247}
248
249impl ChunkedUploadManager {
250 pub fn new(temp_base_dir: PathBuf) -> Self {
261 Self {
262 sessions: Arc::new(Mutex::new(HashMap::new())),
263 temp_base_dir,
264 }
265 }
266
267 pub fn start_session(
285 &self,
286 session_id: String,
287 filename: String,
288 total_size: usize,
289 chunk_size: usize,
290 ) -> Result<ChunkedUploadSession, ChunkedUploadError> {
291 self.cleanup_expired_sessions();
293
294 let decoded = percent_decode_str(&session_id).decode_utf8_lossy();
299 for candidate in [session_id.as_str(), decoded.as_ref()] {
300 if candidate.is_empty()
301 || candidate.contains('/')
302 || candidate.contains('\\')
303 || candidate.contains('\0')
304 || candidate.contains("..")
305 {
306 return Err(ChunkedUploadError::SessionNotFound(
307 "Invalid session ID".to_string(),
308 ));
309 }
310 }
311 let temp_dir = self.temp_base_dir.join(&session_id);
312 fs::create_dir_all(&temp_dir)?;
313
314 let session = ChunkedUploadSession::new(
315 session_id.clone(),
316 filename,
317 total_size,
318 chunk_size,
319 temp_dir,
320 )?;
321
322 let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
323 sessions.insert(session_id, session.clone());
324
325 Ok(session)
326 }
327
328 pub fn upload_chunk(
343 &self,
344 session_id: &str,
345 chunk_number: usize,
346 data: &[u8],
347 ) -> Result<ChunkedUploadSession, ChunkedUploadError> {
348 self.cleanup_expired_sessions();
350
351 let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
352 let session = sessions
353 .get_mut(session_id)
354 .ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
355
356 if session.completed {
357 return Err(ChunkedUploadError::AlreadyCompleted);
358 }
359
360 if chunk_number >= session.total_chunks {
362 return Err(ChunkedUploadError::InvalidChunk {
363 expected: session.total_chunks - 1,
364 actual: chunk_number,
365 });
366 }
367
368 let chunk_path = session.chunk_path(chunk_number);
370 let mut file = File::create(chunk_path)?;
371 file.write_all(data)?;
372
373 session.received_chunks += 1;
374 session.update_progress(data.len());
375
376 if session.is_complete() {
377 session.completed = true;
378 }
379
380 Ok(session.clone())
381 }
382
383 pub fn assemble_chunks(
395 &self,
396 session_id: &str,
397 output_path: PathBuf,
398 ) -> Result<PathBuf, ChunkedUploadError> {
399 let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
400 let session = sessions
401 .get(session_id)
402 .ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
403
404 if !session.is_complete() {
405 return Err(ChunkedUploadError::InvalidChunk {
406 expected: session.total_chunks,
407 actual: session.received_chunks,
408 });
409 }
410
411 let mut output_file = File::create(&output_path)?;
413
414 for i in 0..session.total_chunks {
416 let chunk_path = session.chunk_path(i);
417 let chunk_data = fs::read(&chunk_path)?;
418 output_file.write_all(&chunk_data)?;
419 }
420
421 Ok(output_path)
422 }
423
424 pub fn cleanup_session(&self, session_id: &str) -> Result<(), ChunkedUploadError> {
436 let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
437 if let Some(session) = sessions.remove(session_id)
438 && session.temp_dir.exists()
439 {
440 fs::remove_dir_all(session.temp_dir)?;
441 }
442 Ok(())
443 }
444
445 pub fn get_session(&self, session_id: &str) -> Option<ChunkedUploadSession> {
447 let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
448 sessions.get(session_id).cloned()
449 }
450
451 pub fn list_sessions(&self) -> Vec<ChunkedUploadSession> {
453 let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
454 sessions.values().cloned().collect()
455 }
456
457 pub fn cleanup_expired_sessions(&self) {
463 let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
464 sessions.retain(|_, session| {
465 let expired = session.created_at.elapsed() >= UPLOAD_SESSION_TIMEOUT;
466 if expired && session.temp_dir.exists() {
467 let _ = fs::remove_dir_all(&session.temp_dir);
468 }
469 !expired
470 });
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn test_session_creation() {
480 let session = ChunkedUploadSession::new(
481 "test123".to_string(),
482 "file.bin".to_string(),
483 1000,
484 100,
485 PathBuf::from("/tmp"),
486 )
487 .unwrap();
488
489 assert_eq!(session.session_id, "test123");
490 assert_eq!(session.filename, "file.bin");
491 assert_eq!(session.total_size, 1000);
492 assert_eq!(session.chunk_size, 100);
493 assert_eq!(session.total_chunks, 10);
494 assert_eq!(session.received_chunks, 0);
495 assert!(!session.completed);
496 }
497
498 #[test]
499 fn test_session_progress() {
500 let mut session = ChunkedUploadSession::new(
501 "test123".to_string(),
502 "file.bin".to_string(),
503 1000,
504 100,
505 PathBuf::from("/tmp"),
506 )
507 .unwrap();
508
509 assert_eq!(session.progress(), 0.0);
510
511 for _ in 0..5 {
513 session.update_progress(100);
514 }
515 assert_eq!(session.progress(), 50.0);
516
517 for _ in 0..5 {
518 session.update_progress(100);
519 }
520 assert_eq!(session.progress(), 100.0);
521 }
522
523 #[test]
524 fn test_manager_creation() {
525 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
526 assert_eq!(manager.list_sessions().len(), 0);
527 }
528
529 #[test]
530 fn test_start_session() {
531 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
532 let session = manager
533 .start_session("session1".to_string(), "file.bin".to_string(), 1000, 100)
534 .unwrap();
535
536 assert_eq!(session.session_id, "session1");
537 assert_eq!(session.total_chunks, 10);
538 assert_eq!(manager.list_sessions().len(), 1);
539 }
540
541 #[test]
542 fn test_upload_chunk() {
543 let temp_dir = PathBuf::from("/tmp/test_chunks_upload");
544 let manager = ChunkedUploadManager::new(temp_dir.clone());
545
546 manager
547 .start_session("session2".to_string(), "file.bin".to_string(), 300, 100)
548 .unwrap();
549
550 let chunk_data = vec![0u8; 100];
551 let result = manager.upload_chunk("session2", 0, &chunk_data);
552 assert!(result.is_ok());
553
554 let session = manager.get_session("session2").unwrap();
555 assert_eq!(session.received_chunks, 1);
556
557 manager.cleanup_session("session2").unwrap();
558 }
559
560 #[test]
561 fn test_invalid_session() {
562 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
563 let chunk_data = vec![0u8; 100];
564 let result = manager.upload_chunk("nonexistent", 0, &chunk_data);
565
566 assert!(result.is_err());
567 if let Err(ChunkedUploadError::SessionNotFound(id)) = result {
568 assert_eq!(id, "nonexistent");
569 } else {
570 panic!("Expected SessionNotFound error");
571 }
572 }
573
574 #[test]
575 fn test_chunk_assembly() {
576 let temp_dir = PathBuf::from("/tmp/test_chunks_assembly");
577 let manager = ChunkedUploadManager::new(temp_dir.clone());
578
579 manager
580 .start_session("session3".to_string(), "file.bin".to_string(), 300, 100)
581 .unwrap();
582
583 for i in 0..3 {
585 let chunk_data = vec![i as u8; 100];
586 manager.upload_chunk("session3", i, &chunk_data).unwrap();
587 }
588
589 let output_path = temp_dir.join("assembled.bin");
590 let result = manager.assemble_chunks("session3", output_path.clone());
591 assert!(result.is_ok());
592
593 assert!(output_path.exists());
594 let content = fs::read(&output_path).unwrap();
595 assert_eq!(content.len(), 300);
596
597 fs::remove_file(output_path).unwrap();
599 manager.cleanup_session("session3").unwrap();
600 }
601
602 #[test]
603 fn test_session_completion() {
604 let temp_dir = PathBuf::from("/tmp/test_chunks_completion");
605 let manager = ChunkedUploadManager::new(temp_dir.clone());
606
607 manager
608 .start_session("session4".to_string(), "file.bin".to_string(), 200, 100)
609 .unwrap();
610
611 let chunk_data = vec![0u8; 100];
612
613 manager.upload_chunk("session4", 0, &chunk_data).unwrap();
614 let session = manager.get_session("session4").unwrap();
615 assert!(!session.is_complete());
616
617 manager.upload_chunk("session4", 1, &chunk_data).unwrap();
618 let session = manager.get_session("session4").unwrap();
619 assert!(session.is_complete());
620
621 manager.cleanup_session("session4").unwrap();
622 }
623
624 #[rstest::rstest]
629 #[case("../../../etc")]
630 #[case("foo/bar")]
631 #[case("foo\\bar")]
632 #[case("null\0byte")]
633 #[case("..")]
634 #[case("..%2f..%2fetc")]
635 #[case("%2e%2e%2f%2e%2e%2f")]
636 #[case("..%2fmalicious")]
637 fn test_start_session_rejects_traversal_in_session_id(#[case] session_id: &str) {
638 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_security"));
640
641 let result =
643 manager.start_session(session_id.to_string(), "file.bin".to_string(), 1000, 100);
644
645 assert!(
647 result.is_err(),
648 "Expected error for session_id: {}",
649 session_id
650 );
651 }
652
653 #[rstest::rstest]
654 fn test_start_session_allows_safe_session_id() {
655 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_safe"));
657
658 let result = manager.start_session(
660 "safe-session_123".to_string(),
661 "file.bin".to_string(),
662 1000,
663 100,
664 );
665
666 assert!(result.is_ok());
668 manager.cleanup_session("safe-session_123").unwrap();
669 }
670
671 #[rstest::rstest]
676 fn test_chunked_upload_session_rejects_zero_chunk_size() {
677 let session_id = "test-zero".to_string();
679 let filename = "file.bin".to_string();
680 let total_size = 1000;
681 let chunk_size = 0;
682
683 let result = ChunkedUploadSession::new(
685 session_id,
686 filename,
687 total_size,
688 chunk_size,
689 PathBuf::from("/tmp"),
690 );
691
692 assert!(result.is_err());
694 if let Err(ChunkedUploadError::InvalidChunk { expected, actual }) = result {
695 assert_eq!(expected, 1);
696 assert_eq!(actual, 0);
697 } else {
698 panic!("Expected InvalidChunk error for zero chunk_size");
699 }
700 }
701
702 #[rstest::rstest]
703 fn test_start_session_rejects_zero_chunk_size() {
704 let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_zero"));
706
707 let result =
709 manager.start_session("session-zero".to_string(), "file.bin".to_string(), 1000, 0);
710
711 assert!(result.is_err());
713 }
714}