use percent_encoding::percent_decode_str;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
const UPLOAD_SESSION_TIMEOUT: Duration = Duration::from_secs(3600);
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum ChunkedUploadError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Upload session not found: {0}")]
SessionNotFound(String),
#[error("Invalid chunk: expected {expected}, got {actual}")]
InvalidChunk {
expected: usize,
actual: usize,
},
#[error("Chunk out of order: expected {expected}, got {actual}")]
ChunkOutOfOrder {
expected: usize,
actual: usize,
},
#[error("Upload already completed")]
AlreadyCompleted,
#[error("Checksum mismatch")]
ChecksumMismatch,
#[error("Duplicate chunk: chunk {chunk_number} has already been uploaded")]
DuplicateChunk {
chunk_number: usize,
},
}
#[derive(Debug, Clone)]
pub struct UploadProgress {
pub bytes_uploaded: usize,
pub total_bytes: usize,
pub percentage: f64,
pub chunks_uploaded: usize,
pub total_chunks: usize,
pub started_at: Instant,
pub estimated_time_remaining: Option<f64>,
pub upload_speed: f64,
}
impl UploadProgress {
fn new(total_bytes: usize, total_chunks: usize) -> Self {
Self {
bytes_uploaded: 0,
total_bytes,
percentage: 0.0,
chunks_uploaded: 0,
total_chunks,
started_at: Instant::now(),
estimated_time_remaining: None,
upload_speed: 0.0,
}
}
fn update(&mut self, chunk_size: usize) {
self.chunks_uploaded += 1;
self.bytes_uploaded += chunk_size;
if self.chunks_uploaded >= self.total_chunks {
self.total_bytes = self.bytes_uploaded;
}
self.percentage = if self.total_bytes > 0 {
(self.bytes_uploaded as f64 / self.total_bytes as f64) * 100.0
} else {
0.0
};
let elapsed = self.started_at.elapsed().as_secs_f64();
if elapsed > 0.0 {
self.upload_speed = self.bytes_uploaded as f64 / elapsed;
let bytes_remaining = self.total_bytes.saturating_sub(self.bytes_uploaded);
if self.upload_speed > 0.0 {
self.estimated_time_remaining = Some(bytes_remaining as f64 / self.upload_speed);
}
}
}
pub fn is_complete(&self) -> bool {
self.chunks_uploaded >= self.total_chunks
}
pub fn formatted_speed(&self) -> String {
if self.upload_speed < 1024.0 {
format!("{:.2} B/s", self.upload_speed)
} else if self.upload_speed < 1024.0 * 1024.0 {
format!("{:.2} KB/s", self.upload_speed / 1024.0)
} else {
format!("{:.2} MB/s", self.upload_speed / (1024.0 * 1024.0))
}
}
pub fn formatted_eta(&self) -> String {
match self.estimated_time_remaining {
Some(seconds) => {
let mins = (seconds / 60.0) as u64;
let secs = (seconds % 60.0) as u64;
if mins > 0 {
format!("{}m {}s", mins, secs)
} else {
format!("{}s", secs)
}
}
None => "Unknown".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct ChunkedUploadSession {
pub session_id: String,
pub filename: String,
pub total_size: usize,
pub chunk_size: usize,
pub total_chunks: usize,
pub received_chunks: usize,
received_chunk_numbers: HashSet<usize>,
pub temp_dir: PathBuf,
pub completed: bool,
progress: UploadProgress,
created_at: Instant,
}
impl ChunkedUploadSession {
pub fn new(
session_id: String,
filename: String,
total_size: usize,
chunk_size: usize,
temp_dir: PathBuf,
) -> Result<Self, ChunkedUploadError> {
if chunk_size == 0 {
return Err(ChunkedUploadError::InvalidChunk {
expected: 1,
actual: 0,
});
}
let total_chunks = total_size.div_ceil(chunk_size);
Ok(Self {
session_id,
filename,
total_size,
chunk_size,
total_chunks,
received_chunks: 0,
received_chunk_numbers: HashSet::new(),
temp_dir,
completed: false,
progress: UploadProgress::new(total_size, total_chunks),
created_at: Instant::now(),
})
}
pub fn progress(&self) -> f64 {
self.progress.percentage
}
pub fn get_progress(&self) -> &UploadProgress {
&self.progress
}
#[doc(hidden)]
pub fn update_progress(&mut self, chunk_size: usize) {
self.progress.update(chunk_size);
}
pub fn is_complete(&self) -> bool {
self.completed || self.received_chunks >= self.total_chunks
}
fn chunk_path(&self, chunk_number: usize) -> PathBuf {
self.temp_dir
.join(format!("{}_{}.chunk", self.session_id, chunk_number))
}
}
pub struct ChunkedUploadManager {
sessions: Arc<Mutex<HashMap<String, ChunkedUploadSession>>>,
temp_base_dir: PathBuf,
}
impl ChunkedUploadManager {
pub fn new(temp_base_dir: PathBuf) -> Self {
Self {
sessions: Arc::new(Mutex::new(HashMap::new())),
temp_base_dir,
}
}
pub fn start_session(
&self,
session_id: String,
filename: String,
total_size: usize,
chunk_size: usize,
) -> Result<ChunkedUploadSession, ChunkedUploadError> {
self.cleanup_expired_sessions();
let decoded = percent_decode_str(&session_id).decode_utf8_lossy();
for candidate in [session_id.as_str(), decoded.as_ref()] {
if candidate.is_empty()
|| candidate.contains('/')
|| candidate.contains('\\')
|| candidate.contains('\0')
|| candidate.contains("..")
{
return Err(ChunkedUploadError::SessionNotFound(
"Invalid session ID".to_string(),
));
}
}
let temp_dir = self.temp_base_dir.join(&session_id);
fs::create_dir_all(&temp_dir)?;
let session = ChunkedUploadSession::new(
session_id.clone(),
filename,
total_size,
chunk_size,
temp_dir,
)?;
let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
sessions.insert(session_id, session.clone());
Ok(session)
}
pub fn upload_chunk(
&self,
session_id: &str,
chunk_number: usize,
data: &[u8],
) -> Result<ChunkedUploadSession, ChunkedUploadError> {
self.cleanup_expired_sessions();
let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
let session = sessions
.get_mut(session_id)
.ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
if session.completed {
return Err(ChunkedUploadError::AlreadyCompleted);
}
if chunk_number >= session.total_chunks {
return Err(ChunkedUploadError::InvalidChunk {
expected: session.total_chunks - 1,
actual: chunk_number,
});
}
if session.received_chunk_numbers.contains(&chunk_number) {
return Err(ChunkedUploadError::DuplicateChunk { chunk_number });
}
let chunk_path = session.chunk_path(chunk_number);
let mut file = File::create(chunk_path)?;
file.write_all(data)?;
session.received_chunk_numbers.insert(chunk_number);
session.received_chunks = session.received_chunk_numbers.len();
session.update_progress(data.len());
if session.is_complete() {
session.completed = true;
}
Ok(session.clone())
}
pub fn assemble_chunks(
&self,
session_id: &str,
output_path: PathBuf,
) -> Result<PathBuf, ChunkedUploadError> {
let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
let session = sessions
.get(session_id)
.ok_or_else(|| ChunkedUploadError::SessionNotFound(session_id.to_string()))?;
if !session.is_complete() {
return Err(ChunkedUploadError::InvalidChunk {
expected: session.total_chunks,
actual: session.received_chunks,
});
}
let mut output_file = File::create(&output_path)?;
for i in 0..session.total_chunks {
let chunk_path = session.chunk_path(i);
let chunk_data = fs::read(&chunk_path)?;
output_file.write_all(&chunk_data)?;
}
Ok(output_path)
}
pub fn cleanup_session(&self, session_id: &str) -> Result<(), ChunkedUploadError> {
let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(session) = sessions.remove(session_id)
&& session.temp_dir.exists()
{
fs::remove_dir_all(session.temp_dir)?;
}
Ok(())
}
pub fn get_session(&self, session_id: &str) -> Option<ChunkedUploadSession> {
let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
sessions.get(session_id).cloned()
}
pub fn list_sessions(&self) -> Vec<ChunkedUploadSession> {
let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
sessions.values().cloned().collect()
}
pub fn cleanup_expired_sessions(&self) {
let mut sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner());
sessions.retain(|_, session| {
let expired = session.created_at.elapsed() >= UPLOAD_SESSION_TIMEOUT;
if expired && session.temp_dir.exists() {
let _ = fs::remove_dir_all(&session.temp_dir);
}
!expired
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_creation() {
let session = ChunkedUploadSession::new(
"test123".to_string(),
"file.bin".to_string(),
1000,
100,
PathBuf::from("/tmp"),
)
.unwrap();
assert_eq!(session.session_id, "test123");
assert_eq!(session.filename, "file.bin");
assert_eq!(session.total_size, 1000);
assert_eq!(session.chunk_size, 100);
assert_eq!(session.total_chunks, 10);
assert_eq!(session.received_chunks, 0);
assert!(!session.completed);
}
#[test]
fn test_session_progress() {
let mut session = ChunkedUploadSession::new(
"test123".to_string(),
"file.bin".to_string(),
1000,
100,
PathBuf::from("/tmp"),
)
.unwrap();
assert_eq!(session.progress(), 0.0);
for _ in 0..5 {
session.update_progress(100);
}
assert_eq!(session.progress(), 50.0);
for _ in 0..5 {
session.update_progress(100);
}
assert_eq!(session.progress(), 100.0);
}
#[test]
fn test_manager_creation() {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
assert_eq!(manager.list_sessions().len(), 0);
}
#[test]
fn test_start_session() {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
let session = manager
.start_session("session1".to_string(), "file.bin".to_string(), 1000, 100)
.unwrap();
assert_eq!(session.session_id, "session1");
assert_eq!(session.total_chunks, 10);
assert_eq!(manager.list_sessions().len(), 1);
}
#[test]
fn test_upload_chunk() {
let temp_dir = PathBuf::from("/tmp/test_chunks_upload");
let manager = ChunkedUploadManager::new(temp_dir.clone());
manager
.start_session("session2".to_string(), "file.bin".to_string(), 300, 100)
.unwrap();
let chunk_data = vec![0u8; 100];
let result = manager.upload_chunk("session2", 0, &chunk_data);
assert!(result.is_ok());
let session = manager.get_session("session2").unwrap();
assert_eq!(session.received_chunks, 1);
manager.cleanup_session("session2").unwrap();
}
#[test]
fn test_invalid_session() {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks"));
let chunk_data = vec![0u8; 100];
let result = manager.upload_chunk("nonexistent", 0, &chunk_data);
assert!(result.is_err());
if let Err(ChunkedUploadError::SessionNotFound(id)) = result {
assert_eq!(id, "nonexistent");
} else {
panic!("Expected SessionNotFound error");
}
}
#[test]
fn test_chunk_assembly() {
let temp_dir = PathBuf::from("/tmp/test_chunks_assembly");
let manager = ChunkedUploadManager::new(temp_dir.clone());
manager
.start_session("session3".to_string(), "file.bin".to_string(), 300, 100)
.unwrap();
for i in 0..3 {
let chunk_data = vec![i as u8; 100];
manager.upload_chunk("session3", i, &chunk_data).unwrap();
}
let output_path = temp_dir.join("assembled.bin");
let result = manager.assemble_chunks("session3", output_path.clone());
assert!(result.is_ok());
assert!(output_path.exists());
let content = fs::read(&output_path).unwrap();
assert_eq!(content.len(), 300);
fs::remove_file(output_path).unwrap();
manager.cleanup_session("session3").unwrap();
}
#[test]
fn test_session_completion() {
let temp_dir = PathBuf::from("/tmp/test_chunks_completion");
let manager = ChunkedUploadManager::new(temp_dir.clone());
manager
.start_session("session4".to_string(), "file.bin".to_string(), 200, 100)
.unwrap();
let chunk_data = vec![0u8; 100];
manager.upload_chunk("session4", 0, &chunk_data).unwrap();
let session = manager.get_session("session4").unwrap();
assert!(!session.is_complete());
manager.upload_chunk("session4", 1, &chunk_data).unwrap();
let session = manager.get_session("session4").unwrap();
assert!(session.is_complete());
manager.cleanup_session("session4").unwrap();
}
#[rstest::rstest]
#[case("../../../etc")]
#[case("foo/bar")]
#[case("foo\\bar")]
#[case("null\0byte")]
#[case("..")]
#[case("..%2f..%2fetc")]
#[case("%2e%2e%2f%2e%2e%2f")]
#[case("..%2fmalicious")]
fn test_start_session_rejects_traversal_in_session_id(#[case] session_id: &str) {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_security"));
let result =
manager.start_session(session_id.to_string(), "file.bin".to_string(), 1000, 100);
assert!(
result.is_err(),
"Expected error for session_id: {}",
session_id
);
}
#[rstest::rstest]
fn test_start_session_allows_safe_session_id() {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_safe"));
let result = manager.start_session(
"safe-session_123".to_string(),
"file.bin".to_string(),
1000,
100,
);
assert!(result.is_ok());
manager.cleanup_session("safe-session_123").unwrap();
}
#[rstest::rstest]
fn test_chunked_upload_session_rejects_zero_chunk_size() {
let session_id = "test-zero".to_string();
let filename = "file.bin".to_string();
let total_size = 1000;
let chunk_size = 0;
let result = ChunkedUploadSession::new(
session_id,
filename,
total_size,
chunk_size,
PathBuf::from("/tmp"),
);
assert!(result.is_err());
if let Err(ChunkedUploadError::InvalidChunk { expected, actual }) = result {
assert_eq!(expected, 1);
assert_eq!(actual, 0);
} else {
panic!("Expected InvalidChunk error for zero chunk_size");
}
}
#[rstest::rstest]
fn test_start_session_rejects_zero_chunk_size() {
let manager = ChunkedUploadManager::new(PathBuf::from("/tmp/test_chunks_zero"));
let result =
manager.start_session("session-zero".to_string(), "file.bin".to_string(), 1000, 0);
assert!(result.is_err());
}
#[rstest::rstest]
fn test_duplicate_chunk_upload_is_rejected() {
let temp_dir = PathBuf::from("/tmp/test_chunks_dedup");
let manager = ChunkedUploadManager::new(temp_dir.clone());
manager
.start_session("dedup1".to_string(), "file.bin".to_string(), 300, 100)
.unwrap();
let chunk_data = vec![0u8; 100];
manager.upload_chunk("dedup1", 0, &chunk_data).unwrap();
let result = manager.upload_chunk("dedup1", 0, &chunk_data);
assert!(result.is_err());
if let Err(ChunkedUploadError::DuplicateChunk { chunk_number }) = result {
assert_eq!(chunk_number, 0);
} else {
panic!("Expected DuplicateChunk error");
}
let session = manager.get_session("dedup1").unwrap();
assert_eq!(session.received_chunks, 1);
assert!(!session.is_complete());
manager.cleanup_session("dedup1").unwrap();
}
#[rstest::rstest]
fn test_sequential_chunk_upload_still_works() {
let temp_dir = PathBuf::from("/tmp/test_chunks_dedup_seq");
let manager = ChunkedUploadManager::new(temp_dir.clone());
manager
.start_session("dedup2".to_string(), "file.bin".to_string(), 300, 100)
.unwrap();
for i in 0..3 {
let chunk_data = vec![i as u8; 100];
manager.upload_chunk("dedup2", i, &chunk_data).unwrap();
}
let session = manager.get_session("dedup2").unwrap();
assert_eq!(session.received_chunks, 3);
assert!(session.is_complete());
let output_path = temp_dir.join("dedup2_assembled.bin");
let result = manager.assemble_chunks("dedup2", output_path.clone());
assert!(result.is_ok());
let content = fs::read(&output_path).unwrap();
assert_eq!(content.len(), 300);
assert!(content[0..100].iter().all(|&b| b == 0));
assert!(content[100..200].iter().all(|&b| b == 1));
assert!(content[200..300].iter().all(|&b| b == 2));
fs::remove_file(output_path).unwrap();
manager.cleanup_session("dedup2").unwrap();
}
}