Skip to main content

reinhardt_http/
chunked_upload.rs

1//! Chunked upload handling for large files
2//!
3//! This module provides functionality for handling large file uploads
4//! by splitting them into manageable chunks, supporting resumable uploads,
5//! and assembling chunks back into complete files.
6
7use percent_encoding::percent_decode_str;
8use std::collections::HashMap;
9use std::fs::{self, File};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15/// Default session timeout for chunked uploads (1 hour).
16/// Sessions older than this are considered stale and eligible for cleanup.
17const UPLOAD_SESSION_TIMEOUT: Duration = Duration::from_secs(3600);
18
19/// Errors that can occur during chunked upload
20#[derive(Debug, thiserror::Error)]
21pub enum ChunkedUploadError {
22	#[error("IO error: {0}")]
23	Io(#[from] io::Error),
24	#[error("Upload session not found: {0}")]
25	SessionNotFound(String),
26	#[error("Invalid chunk: expected {expected}, got {actual}")]
27	InvalidChunk { expected: usize, actual: usize },
28	#[error("Chunk out of order: expected {expected}, got {actual}")]
29	ChunkOutOfOrder { expected: usize, actual: usize },
30	#[error("Upload already completed")]
31	AlreadyCompleted,
32	#[error("Checksum mismatch")]
33	ChecksumMismatch,
34}
35
36/// Upload progress information
37#[derive(Debug, Clone)]
38pub struct UploadProgress {
39	/// Current number of bytes uploaded
40	pub bytes_uploaded: usize,
41	/// Total file size in bytes
42	pub total_bytes: usize,
43	/// Progress percentage (0.0 - 100.0)
44	pub percentage: f64,
45	/// Number of chunks uploaded
46	pub chunks_uploaded: usize,
47	/// Total number of chunks
48	pub total_chunks: usize,
49	/// Upload start time
50	pub started_at: Instant,
51	/// Estimated time remaining in seconds
52	pub estimated_time_remaining: Option<f64>,
53	/// Upload speed in bytes per second
54	pub upload_speed: f64,
55}
56
57impl UploadProgress {
58	/// Create a new upload progress tracker
59	fn new(total_bytes: usize, total_chunks: usize) -> Self {
60		Self {
61			bytes_uploaded: 0,
62			total_bytes,
63			percentage: 0.0,
64			chunks_uploaded: 0,
65			total_chunks,
66			started_at: Instant::now(),
67			estimated_time_remaining: None,
68			upload_speed: 0.0,
69		}
70	}
71
72	/// Update progress with new chunk
73	fn update(&mut self, chunk_size: usize) {
74		self.chunks_uploaded += 1;
75		self.bytes_uploaded += chunk_size;
76
77		// On the final chunk, update total_bytes to reflect actual cumulative
78		// byte count, which may differ from the declared total_size if the
79		// last chunk is smaller than chunk_size.
80		if self.chunks_uploaded >= self.total_chunks {
81			self.total_bytes = self.bytes_uploaded;
82		}
83
84		self.percentage = if self.total_bytes > 0 {
85			(self.bytes_uploaded as f64 / self.total_bytes as f64) * 100.0
86		} else {
87			0.0
88		};
89
90		// Calculate upload speed
91		let elapsed = self.started_at.elapsed().as_secs_f64();
92		if elapsed > 0.0 {
93			self.upload_speed = self.bytes_uploaded as f64 / elapsed;
94
95			// Estimate time remaining
96			let bytes_remaining = self.total_bytes.saturating_sub(self.bytes_uploaded);
97			if self.upload_speed > 0.0 {
98				self.estimated_time_remaining = Some(bytes_remaining as f64 / self.upload_speed);
99			}
100		}
101	}
102
103	/// Check if upload is complete
104	pub fn is_complete(&self) -> bool {
105		self.chunks_uploaded >= self.total_chunks
106	}
107
108	/// Get formatted upload speed (e.g., "1.5 MB/s")
109	pub fn formatted_speed(&self) -> String {
110		if self.upload_speed < 1024.0 {
111			format!("{:.2} B/s", self.upload_speed)
112		} else if self.upload_speed < 1024.0 * 1024.0 {
113			format!("{:.2} KB/s", self.upload_speed / 1024.0)
114		} else {
115			format!("{:.2} MB/s", self.upload_speed / (1024.0 * 1024.0))
116		}
117	}
118
119	/// Get formatted estimated time remaining (e.g., "2m 30s")
120	pub fn formatted_eta(&self) -> String {
121		match self.estimated_time_remaining {
122			Some(seconds) => {
123				let mins = (seconds / 60.0) as u64;
124				let secs = (seconds % 60.0) as u64;
125				if mins > 0 {
126					format!("{}m {}s", mins, secs)
127				} else {
128					format!("{}s", secs)
129				}
130			}
131			None => "Unknown".to_string(),
132		}
133	}
134}
135
136/// Metadata for a chunked upload session
137#[derive(Debug, Clone)]
138pub struct ChunkedUploadSession {
139	/// Unique session ID
140	pub session_id: String,
141	/// Original filename
142	pub filename: String,
143	/// Total file size in bytes
144	pub total_size: usize,
145	/// Chunk size in bytes
146	pub chunk_size: usize,
147	/// Total number of chunks
148	pub total_chunks: usize,
149	/// Number of chunks received so far
150	pub received_chunks: usize,
151	/// Temporary directory for chunks
152	pub temp_dir: PathBuf,
153	/// Whether the upload is complete
154	pub completed: bool,
155	/// Upload progress tracker
156	progress: UploadProgress,
157	/// When the session was created, used for timeout-based cleanup
158	created_at: Instant,
159}
160
161impl ChunkedUploadSession {
162	/// Create a new upload session
163	///
164	/// Returns `ChunkedUploadError::InvalidChunk` if `chunk_size` is zero.
165	pub fn new(
166		session_id: String,
167		filename: String,
168		total_size: usize,
169		chunk_size: usize,
170		temp_dir: PathBuf,
171	) -> Result<Self, ChunkedUploadError> {
172		if chunk_size == 0 {
173			return Err(ChunkedUploadError::InvalidChunk {
174				expected: 1,
175				actual: 0,
176			});
177		}
178		let total_chunks = total_size.div_ceil(chunk_size);
179		Ok(Self {
180			session_id,
181			filename,
182			total_size,
183			chunk_size,
184			total_chunks,
185			received_chunks: 0,
186			temp_dir,
187			completed: false,
188			progress: UploadProgress::new(total_size, total_chunks),
189			created_at: Instant::now(),
190		})
191	}
192
193	/// Get progress percentage
194	pub fn progress(&self) -> f64 {
195		self.progress.percentage
196	}
197
198	/// Get detailed upload progress
199	///
200	/// # Examples
201	///
202	/// ```
203	/// use reinhardt_http::chunked_upload::ChunkedUploadSession;
204	/// use std::path::PathBuf;
205	///
206	/// let session = ChunkedUploadSession::new(
207	///     "session1".to_string(),
208	///     "file.bin".to_string(),
209	///     1000,
210	///     100,
211	///     PathBuf::from("/tmp")
212	/// ).unwrap();
213	/// let progress = session.get_progress();
214	/// assert_eq!(progress.percentage, 0.0);
215	/// ```
216	pub fn get_progress(&self) -> &UploadProgress {
217		&self.progress
218	}
219
220	/// Update progress with a new chunk
221	///
222	/// This is primarily used internally but exposed for testing purposes.
223	#[doc(hidden)]
224	pub fn update_progress(&mut self, chunk_size: usize) {
225		self.progress.update(chunk_size);
226	}
227
228	/// Check if upload is complete
229	pub fn is_complete(&self) -> bool {
230		self.completed || self.received_chunks >= self.total_chunks
231	}
232
233	/// Get the path for a specific chunk
234	///
235	/// Uses the pre-validated session_id (validated during session creation)
236	/// combined with the numeric chunk_number, both safe for path construction.
237	fn chunk_path(&self, chunk_number: usize) -> PathBuf {
238		self.temp_dir
239			.join(format!("{}_{}.chunk", self.session_id, chunk_number))
240	}
241}
242
243/// Manager for chunked uploads
244pub struct ChunkedUploadManager {
245	sessions: Arc<Mutex<HashMap<String, ChunkedUploadSession>>>,
246	temp_base_dir: PathBuf,
247}
248
249impl ChunkedUploadManager {
250	/// Create a new chunked upload manager
251	///
252	/// # Examples
253	///
254	/// ```
255	/// use reinhardt_http::chunked_upload::ChunkedUploadManager;
256	/// use std::path::PathBuf;
257	///
258	/// let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/chunked_uploads"));
259	/// ```
260	pub fn new(temp_base_dir: PathBuf) -> Self {
261		Self {
262			sessions: Arc::new(Mutex::new(HashMap::new())),
263			temp_base_dir,
264		}
265	}
266
267	/// Start a new upload session
268	///
269	/// # Examples
270	///
271	/// ```
272	/// use reinhardt_http::chunked_upload::ChunkedUploadManager;
273	/// use std::path::PathBuf;
274	///
275	/// let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/chunked_uploads"));
276	/// let session = manager.start_session(
277	///     "session123".to_string(),
278	///     "large_file.bin".to_string(),
279	///     10_000_000, // 10MB
280	///     1_000_000,  // 1MB chunks
281	/// ).unwrap();
282	/// assert_eq!(session.total_chunks, 10);
283	/// ```
284	pub fn start_session(
285		&self,
286		session_id: String,
287		filename: String,
288		total_size: usize,
289		chunk_size: usize,
290	) -> Result<ChunkedUploadSession, ChunkedUploadError> {
291		// Lazily clean up expired sessions to prevent memory exhaustion
292		self.cleanup_expired_sessions();
293
294		// Validate session_id to prevent path traversal attacks.
295		// Session IDs are used to construct directory and file paths.
296		// Check both raw and URL-decoded forms to prevent bypass via
297		// percent-encoded traversal sequences like %2e%2e%2f.
298		let decoded = percent_decode_str(&session_id).decode_utf8_lossy();
299		for candidate in [session_id.as_str(), decoded.as_ref()] {
300			if candidate.is_empty()
301				|| candidate.contains('/')
302				|| candidate.contains('\\')
303				|| candidate.contains('\0')
304				|| candidate.contains("..")
305			{
306				return Err(ChunkedUploadError::SessionNotFound(
307					"Invalid session ID".to_string(),
308				));
309			}
310		}
311		let temp_dir = self.temp_base_dir.join(&session_id);
312		fs::create_dir_all(&temp_dir)?;
313
314		let session = ChunkedUploadSession::new(
315			session_id.clone(),
316			filename,
317			total_size,
318			chunk_size,
319			temp_dir,
320		)?;
321
322		let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
323		sessions.insert(session_id, session.clone());
324
325		Ok(session)
326	}
327
328	/// Upload a chunk
329	///
330	/// # Examples
331	///
332	/// ```no_run
333	/// use reinhardt_http::chunked_upload::ChunkedUploadManager;
334	/// use std::path::PathBuf;
335	///
336	/// let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/chunked_uploads"));
337	/// manager.start_session("session123".to_string(), "file.bin".to_string(), 1000, 100).unwrap();
338	///
339	/// let chunk_data = vec![0u8; 100];
340	/// manager.upload_chunk("session123", 0, &chunk_data).unwrap();
341	/// ```
342	pub fn upload_chunk(
343		&self,
344		session_id: &str,
345		chunk_number: usize,
346		data: &[u8],
347	) -> Result<ChunkedUploadSession, ChunkedUploadError> {
348		// Lazily clean up expired sessions to prevent memory exhaustion
349		self.cleanup_expired_sessions();
350
351		let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
352		let session = sessions
353			.get_mut(session_id)
354			.ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
355
356		if session.completed {
357			return Err(ChunkedUploadError::AlreadyCompleted);
358		}
359
360		// Validate chunk number
361		if chunk_number >= session.total_chunks {
362			return Err(ChunkedUploadError::InvalidChunk {
363				expected: session.total_chunks - 1,
364				actual: chunk_number,
365			});
366		}
367
368		// Write chunk to disk
369		let chunk_path = session.chunk_path(chunk_number);
370		let mut file = File::create(chunk_path)?;
371		file.write_all(data)?;
372
373		session.received_chunks += 1;
374		session.update_progress(data.len());
375
376		if session.is_complete() {
377			session.completed = true;
378		}
379
380		Ok(session.clone())
381	}
382
383	/// Assemble all chunks into final file
384	///
385	/// # Examples
386	///
387	/// ```no_run
388	/// use reinhardt_http::chunked_upload::ChunkedUploadManager;
389	/// use std::path::PathBuf;
390	///
391	/// let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/chunked_uploads"));
392	/// let output_path = manager.assemble_chunks("session123", PathBuf::from("/tmp/final_file.bin")).unwrap();
393	/// ```
394	pub fn assemble_chunks(
395		&self,
396		session_id: &str,
397		output_path: PathBuf,
398	) -> Result<PathBuf, ChunkedUploadError> {
399		let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
400		let session = sessions
401			.get(session_id)
402			.ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
403
404		if !session.is_complete() {
405			return Err(ChunkedUploadError::InvalidChunk {
406				expected: session.total_chunks,
407				actual: session.received_chunks,
408			});
409		}
410
411		// Create output file
412		let mut output_file = File::create(&output_path)?;
413
414		// Assemble chunks in order
415		for i in 0..session.total_chunks {
416			let chunk_path = session.chunk_path(i);
417			let chunk_data = fs::read(&chunk_path)?;
418			output_file.write_all(&chunk_data)?;
419		}
420
421		Ok(output_path)
422	}
423
424	/// Clean up a session (delete temporary files)
425	///
426	/// # Examples
427	///
428	/// ```no_run
429	/// use reinhardt_http::chunked_upload::ChunkedUploadManager;
430	/// use std::path::PathBuf;
431	///
432	/// let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/chunked_uploads"));
433	/// manager.cleanup_session("session123").unwrap();
434	/// ```
435	pub fn cleanup_session(&self, session_id: &str) -> Result<(), ChunkedUploadError> {
436		let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
437		if let Some(session) = sessions.remove(session_id)
438			&& session.temp_dir.exists()
439		{
440			fs::remove_dir_all(session.temp_dir)?;
441		}
442		Ok(())
443	}
444
445	/// Get session information
446	pub fn get_session(&self, session_id: &str) -> Option<ChunkedUploadSession> {
447		let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
448		sessions.get(session_id).cloned()
449	}
450
451	/// List all active sessions
452	pub fn list_sessions(&self) -> Vec<ChunkedUploadSession> {
453		let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
454		sessions.values().cloned().collect()
455	}
456
457	/// Remove sessions that have exceeded the timeout.
458	///
459	/// Expired sessions have their temporary files cleaned up automatically.
460	/// This is called lazily on `start_session` and `upload_chunk` to prevent
461	/// unbounded memory growth from abandoned uploads.
462	pub fn cleanup_expired_sessions(&self) {
463		let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
464		sessions.retain(|_, session| {
465			let expired = session.created_at.elapsed() >= UPLOAD_SESSION_TIMEOUT;
466			if expired && session.temp_dir.exists() {
467				let _ = fs::remove_dir_all(&session.temp_dir);
468			}
469			!expired
470		});
471	}
472}
473
474#[cfg(test)]
475mod tests {
476	use super::*;
477
478	#[test]
479	fn test_session_creation() {
480		let session = ChunkedUploadSession::new(
481			"test123".to_string(),
482			"file.bin".to_string(),
483			1000,
484			100,
485			PathBuf::from("/tmp"),
486		)
487		.unwrap();
488
489		assert_eq!(session.session_id, "test123");
490		assert_eq!(session.filename, "file.bin");
491		assert_eq!(session.total_size, 1000);
492		assert_eq!(session.chunk_size, 100);
493		assert_eq!(session.total_chunks, 10);
494		assert_eq!(session.received_chunks, 0);
495		assert!(!session.completed);
496	}
497
498	#[test]
499	fn test_session_progress() {
500		let mut session = ChunkedUploadSession::new(
501			"test123".to_string(),
502			"file.bin".to_string(),
503			1000,
504			100,
505			PathBuf::from("/tmp"),
506		)
507		.unwrap();
508
509		assert_eq!(session.progress(), 0.0);
510
511		// Update progress by simulating chunk uploads
512		for _ in 0..5 {
513			session.update_progress(100);
514		}
515		assert_eq!(session.progress(), 50.0);
516
517		for _ in 0..5 {
518			session.update_progress(100);
519		}
520		assert_eq!(session.progress(), 100.0);
521	}
522
523	#[test]
524	fn test_manager_creation() {
525		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
526		assert_eq!(manager.list_sessions().len(), 0);
527	}
528
529	#[test]
530	fn test_start_session() {
531		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
532		let session = manager
533			.start_session("session1".to_string(), "file.bin".to_string(), 1000, 100)
534			.unwrap();
535
536		assert_eq!(session.session_id, "session1");
537		assert_eq!(session.total_chunks, 10);
538		assert_eq!(manager.list_sessions().len(), 1);
539	}
540
541	#[test]
542	fn test_upload_chunk() {
543		let temp_dir = PathBuf::from("/tmp/test_chunks_upload");
544		let manager = ChunkedUploadManager::new(temp_dir.clone());
545
546		manager
547			.start_session("session2".to_string(), "file.bin".to_string(), 300, 100)
548			.unwrap();
549
550		let chunk_data = vec![0u8; 100];
551		let result = manager.upload_chunk("session2", 0, &chunk_data);
552		assert!(result.is_ok());
553
554		let session = manager.get_session("session2").unwrap();
555		assert_eq!(session.received_chunks, 1);
556
557		manager.cleanup_session("session2").unwrap();
558	}
559
560	#[test]
561	fn test_invalid_session() {
562		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
563		let chunk_data = vec![0u8; 100];
564		let result = manager.upload_chunk("nonexistent", 0, &chunk_data);
565
566		assert!(result.is_err());
567		if let Err(ChunkedUploadError::SessionNotFound(id)) = result {
568			assert_eq!(id, "nonexistent");
569		} else {
570			panic!("Expected SessionNotFound error");
571		}
572	}
573
574	#[test]
575	fn test_chunk_assembly() {
576		let temp_dir = PathBuf::from("/tmp/test_chunks_assembly");
577		let manager = ChunkedUploadManager::new(temp_dir.clone());
578
579		manager
580			.start_session("session3".to_string(), "file.bin".to_string(), 300, 100)
581			.unwrap();
582
583		// Upload 3 chunks
584		for i in 0..3 {
585			let chunk_data = vec![i as u8; 100];
586			manager.upload_chunk("session3", i, &chunk_data).unwrap();
587		}
588
589		let output_path = temp_dir.join("assembled.bin");
590		let result = manager.assemble_chunks("session3", output_path.clone());
591		assert!(result.is_ok());
592
593		assert!(output_path.exists());
594		let content = fs::read(&output_path).unwrap();
595		assert_eq!(content.len(), 300);
596
597		// Cleanup
598		fs::remove_file(output_path).unwrap();
599		manager.cleanup_session("session3").unwrap();
600	}
601
602	#[test]
603	fn test_session_completion() {
604		let temp_dir = PathBuf::from("/tmp/test_chunks_completion");
605		let manager = ChunkedUploadManager::new(temp_dir.clone());
606
607		manager
608			.start_session("session4".to_string(), "file.bin".to_string(), 200, 100)
609			.unwrap();
610
611		let chunk_data = vec![0u8; 100];
612
613		manager.upload_chunk("session4", 0, &chunk_data).unwrap();
614		let session = manager.get_session("session4").unwrap();
615		assert!(!session.is_complete());
616
617		manager.upload_chunk("session4", 1, &chunk_data).unwrap();
618		let session = manager.get_session("session4").unwrap();
619		assert!(session.is_complete());
620
621		manager.cleanup_session("session4").unwrap();
622	}
623
624	// =================================================================
625	// Path traversal prevention tests (Issue #355)
626	// =================================================================
627
628	#[rstest::rstest]
629	#[case("../../../etc")]
630	#[case("foo/bar")]
631	#[case("foo\\bar")]
632	#[case("null\0byte")]
633	#[case("..")]
634	#[case("..%2f..%2fetc")]
635	#[case("%2e%2e%2f%2e%2e%2f")]
636	#[case("..%2fmalicious")]
637	fn test_start_session_rejects_traversal_in_session_id(#[case] session_id: &str) {
638		// Arrange
639		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_security"));
640
641		// Act
642		let result =
643			manager.start_session(session_id.to_string(), "file.bin".to_string(), 1000, 100);
644
645		// Assert
646		assert!(
647			result.is_err(),
648			"Expected error for session_id: {}",
649			session_id
650		);
651	}
652
653	#[rstest::rstest]
654	fn test_start_session_allows_safe_session_id() {
655		// Arrange
656		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_safe"));
657
658		// Act
659		let result = manager.start_session(
660			"safe-session_123".to_string(),
661			"file.bin".to_string(),
662			1000,
663			100,
664		);
665
666		// Assert
667		assert!(result.is_ok());
668		manager.cleanup_session("safe-session_123").unwrap();
669	}
670
671	// =================================================================
672	// Division by zero prevention tests (Issue #359)
673	// =================================================================
674
675	#[rstest::rstest]
676	fn test_chunked_upload_session_rejects_zero_chunk_size() {
677		// Arrange
678		let session_id = "test-zero".to_string();
679		let filename = "file.bin".to_string();
680		let total_size = 1000;
681		let chunk_size = 0;
682
683		// Act
684		let result = ChunkedUploadSession::new(
685			session_id,
686			filename,
687			total_size,
688			chunk_size,
689			PathBuf::from("/tmp"),
690		);
691
692		// Assert
693		assert!(result.is_err());
694		if let Err(ChunkedUploadError::InvalidChunk { expected, actual }) = result {
695			assert_eq!(expected, 1);
696			assert_eq!(actual, 0);
697		} else {
698			panic!("Expected InvalidChunk error for zero chunk_size");
699		}
700	}
701
702	#[rstest::rstest]
703	fn test_start_session_rejects_zero_chunk_size() {
704		// Arrange
705		let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_zero"));
706
707		// Act
708		let result =
709			manager.start_session("session-zero".to_string(), "file.bin".to_string(), 1000, 0);
710
711		// Assert
712		assert!(result.is_err());
713	}
714}