use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::collections::HashMap;
use tempfile::NamedTempFile;
use std::io::Write;
use crate::{dfx, create_error_string, UploadParams};
#[derive(Debug, Clone)]
pub struct ParallelUploadConfig {
pub max_concurrent: usize,
pub target_rate_mibs: f64,
pub max_retries: usize,
pub retry_delay_ms: u64,
pub progress_callback: Option<fn(u32, usize, &str)>,
pub rate_callback: Option<fn(f64)>,
}
impl Default for ParallelUploadConfig {
fn default() -> Self {
Self {
max_concurrent: 4, target_rate_mibs: 4.0, max_retries: 3,
retry_delay_ms: 1000,
progress_callback: None,
rate_callback: None,
}
}
}
#[derive(Debug)]
pub enum ParallelUploadResult {
Success,
PartialFailure {
successful_chunks: Vec<u32>,
failed_chunks: HashMap<u32, String>
},
Failed(String),
}
#[derive(Debug, Clone)]
pub struct ChunkInfo {
pub chunk_id: u32,
pub data: Vec<u8>,
pub size: usize,
}
#[derive(Debug)]
struct UploadTracker {
bytes_uploaded: usize,
start_time: Instant,
active_uploads: usize,
completed_chunks: Vec<u32>,
}
impl UploadTracker {
fn new() -> Self {
Self {
bytes_uploaded: 0,
start_time: Instant::now(),
active_uploads: 0,
completed_chunks: Vec::new(),
}
}
fn current_rate_mibs(&self) -> f64 {
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed > 0.0 {
(self.bytes_uploaded as f64) / (1024.0 * 1024.0) / elapsed
} else {
0.0
}
}
fn should_start_upload(&self, config: &ParallelUploadConfig) -> bool {
if self.active_uploads >= config.max_concurrent {
return false;
}
let current_rate = self.current_rate_mibs();
current_rate < config.target_rate_mibs || self.active_uploads == 0
}
fn calculate_delay(&self, config: &ParallelUploadConfig) -> Duration {
let current_rate = self.current_rate_mibs();
if current_rate > config.target_rate_mibs {
Duration::from_millis(100)
} else {
Duration::from_millis(10)
}
}
}
pub fn chunk_with_id_to_candid_args(chunk_id: u32, data: &[u8]) -> String {
let data_blob: String = data.iter().map(|&byte| format!("\\{:02X}", byte)).collect();
format!("({} : nat32, blob \"{}\")", chunk_id, data_blob)
}
pub fn create_test_format(chunk_id: u32) -> String {
match chunk_id {
0 => "(0, blob \"\01\02\03\04\")".to_string(),
_ => format!("({}, blob \"\01\02\03\04\"", chunk_id),
}
}
fn upload_chunk_with_retry(
params: &UploadParams<'_>,
chunk: &ChunkInfo,
config: &ParallelUploadConfig,
tracker: Arc<Mutex<UploadTracker>>,
) -> Result<(), String> {
let mut attempts = 0;
loop {
attempts += 1;
let result = upload_chunk_with_id_sync(params, chunk, config);
match result {
Ok(()) => {
{
let mut tracker = tracker.lock().unwrap();
tracker.bytes_uploaded += chunk.size;
tracker.completed_chunks.push(chunk.chunk_id);
tracker.active_uploads -= 1;
}
return Ok(());
}
Err(e) => {
if attempts >= config.max_retries {
{
let mut tracker = tracker.lock().unwrap();
tracker.active_uploads -= 1;
}
return Err(format!(
"Chunk {} failed after {} attempts. Last error: {}",
chunk.chunk_id, attempts, e
));
}
if let Some(callback) = config.progress_callback {
callback(
chunk.chunk_id,
chunk.size,
&format!("⚠ Attempt {}/{} failed, retrying...", attempts, config.max_retries)
);
}
thread::sleep(Duration::from_millis(config.retry_delay_ms));
}
}
}
}
fn upload_chunk_with_id_sync(
params: &UploadParams<'_>,
chunk: &ChunkInfo,
config: &ParallelUploadConfig,
) -> Result<(), String> {
let candid_args = chunk_with_id_to_candid_args(chunk.chunk_id, &chunk.data);
let mut temp_file = NamedTempFile::new()
.map_err(|e| create_error_string(&format!("Failed to create temporary file: {}", e)))?;
temp_file
.as_file_mut()
.write_all(candid_args.as_bytes())
.map_err(|e| create_error_string(&format!("Failed to write data to temporary file: {}", e)))?;
temp_file
.as_file_mut()
.flush()
.map_err(|e| create_error_string(&format!("Failed to flush temporary file: {}", e)))?;
let temp_path = temp_file.path().to_str()
.ok_or_else(|| create_error_string("temp_file path could not be converted to &str"))?;
let output = dfx(
"canister",
"call",
&vec![
params.canister_name,
params.canister_method,
"--argument-file",
temp_path,
],
params.network,
)?;
if output.status.success() {
if let Some(callback) = config.progress_callback {
callback(chunk.chunk_id, chunk.data.len(), "✓ Uploaded");
}
Ok(())
} else {
let error_message = String::from_utf8_lossy(&output.stderr).to_string();
Err(create_error_string(&format!("Chunk {} failed: {}", chunk.chunk_id, error_message)))
}
}
pub fn upload_chunks_parallel(
params: &UploadParams<'_>,
chunks: Vec<ChunkInfo>,
config: &ParallelUploadConfig,
) -> ParallelUploadResult {
if chunks.is_empty() {
return ParallelUploadResult::Failed("No chunks to upload".to_string());
}
let total_chunks_expected = chunks.len() as u32;
let tracker = Arc::new(Mutex::new(UploadTracker::new()));
let mut handles = Vec::new();
let mut successful_chunks = Vec::new();
let mut failed_chunks = HashMap::new();
println!("Starting parallel upload of {} chunks", chunks.len());
println!("Target rate: {:.1} MiB/s, Max concurrent: {}",
config.target_rate_mibs, config.max_concurrent);
let chunks_remaining = Arc::new(Mutex::new(chunks));
loop {
let should_start = {
let tracker = tracker.lock().unwrap();
tracker.should_start_upload(config)
};
if should_start {
let next_chunk = {
let mut chunks_lock = chunks_remaining.lock().unwrap();
chunks_lock.pop()
};
if let Some(chunk) = next_chunk {
{
let mut tracker = tracker.lock().unwrap();
tracker.active_uploads += 1;
}
let chunk_clone = chunk.clone();
let config_clone = config.clone();
let tracker_clone = Arc::clone(&tracker);
let canister_name = params.canister_name.to_string();
let canister_method = params.canister_method.to_string();
let name = params.name.to_string();
let network = params.network.map(|s| s.to_string());
let handle = thread::spawn(move || {
let thread_params = UploadParams {
name: &name,
canister_name: &canister_name,
canister_method: &canister_method,
network: network.as_deref(),
};
upload_chunk_with_retry(&thread_params, &chunk_clone, &config_clone, tracker_clone)
});
handles.push((chunk.chunk_id, handle));
}
}
let mut completed_handles = Vec::new();
for (i, (chunk_id, handle)) in handles.iter().enumerate() {
if handle.is_finished() {
completed_handles.push((i, *chunk_id));
}
}
for (index, chunk_id) in completed_handles.into_iter().rev() {
let (_, handle) = handles.remove(index);
{
let mut tracker = tracker.lock().unwrap();
tracker.active_uploads -= 1;
}
match handle.join() {
Ok(Ok(())) => {
successful_chunks.push(chunk_id);
}
Ok(Err(e)) => {
failed_chunks.insert(chunk_id, e);
}
Err(_) => {
failed_chunks.insert(chunk_id, "Thread panic".to_string());
}
}
}
let delay = {
let tracker = tracker.lock().unwrap();
if let Some(rate_callback) = config.rate_callback {
rate_callback(tracker.current_rate_mibs());
}
tracker.calculate_delay(config)
};
thread::sleep(delay);
let (chunks_empty, no_active) = {
let chunks_lock = chunks_remaining.lock().unwrap();
let tracker_lock = tracker.lock().unwrap();
(chunks_lock.is_empty(), tracker_lock.active_uploads == 0)
};
let total_completed = successful_chunks.len() + failed_chunks.len();
if total_completed >= total_chunks_expected as usize {
break;
}
if chunks_empty && no_active && handles.is_empty() {
break;
}
}
{
let tracker = tracker.lock().unwrap();
let final_rate = tracker.current_rate_mibs();
let total_mb = tracker.bytes_uploaded as f64 / (1024.0 * 1024.0);
println!("Upload completed. Final rate: {:.2} MiB/s, Total: {:.2} MiB",
final_rate, total_mb);
}
if failed_chunks.is_empty() {
println!("✅ All {} chunks uploaded successfully!", successful_chunks.len());
std::process::exit(0);
} else if successful_chunks.is_empty() {
println!("❌ All chunks failed");
std::process::exit(1);
} else {
println!("❌ Upload completed with {} successes and {} failures",
successful_chunks.len(), failed_chunks.len());
std::process::exit(1);
}
}
pub fn chunks_to_chunk_info(chunks: &[Vec<u8>]) -> Vec<ChunkInfo> {
chunks
.iter()
.enumerate()
.map(|(i, data)| ChunkInfo {
chunk_id: i as u32,
data: data.clone(),
size: data.len(),
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_candid_args_format() {
let test_data = vec![0x00, 0x01, 0x02];
let result = chunk_with_id_to_candid_args(0, &test_data);
let expected = r#"(0 : nat32, blob "\00\01\02")"#;
assert_eq!(result, expected);
println!("Generated: {}", result);
println!("Expected: {}", expected);
}
#[test]
fn test_single_byte() {
let test_data = vec![0xFF];
let result = chunk_with_id_to_candid_args(5, &test_data);
let expected = r#"(5 : nat32, blob "\FF")"#;
assert_eq!(result, expected);
}
#[test]
fn test_chunk_info_sequential_ids() {
let chunks = vec![
vec![1, 2, 3],
vec![4, 5, 6],
vec![7, 8, 9],
vec![10, 11, 12],
];
let chunk_infos = chunks_to_chunk_info(&chunks);
assert_eq!(chunk_infos.len(), 4);
assert_eq!(chunk_infos[0].chunk_id, 0);
assert_eq!(chunk_infos[1].chunk_id, 1);
assert_eq!(chunk_infos[2].chunk_id, 2);
assert_eq!(chunk_infos[3].chunk_id, 3);
assert_eq!(chunk_infos[0].data, vec![1, 2, 3]);
assert_eq!(chunk_infos[3].data, vec![10, 11, 12]);
}
#[test]
fn test_resume_logic_skips_correct_chunks() {
let chunks = vec![
vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9], vec![10, 11, 12], vec![13, 14, 15], ];
let chunk_infos = chunks_to_chunk_info(&chunks);
let chunk_offset = 2;
let chunks_to_upload: Vec<_> = chunk_infos
.into_iter()
.skip(chunk_offset)
.collect();
assert_eq!(chunks_to_upload.len(), 3);
assert_eq!(chunks_to_upload[0].chunk_id, 2);
assert_eq!(chunks_to_upload[1].chunk_id, 3);
assert_eq!(chunks_to_upload[2].chunk_id, 4);
assert_eq!(chunks_to_upload[0].data, vec![7, 8, 9]);
assert_eq!(chunks_to_upload[2].data, vec![13, 14, 15]);
}
#[test]
fn test_retry_chunks_filter() {
let chunks = vec![
vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8], vec![9, 10], ];
let chunk_infos = chunks_to_chunk_info(&chunks);
let retry_ids = vec![1u32, 3u32];
let chunks_to_upload: Vec<_> = chunk_infos
.into_iter()
.filter(|chunk| retry_ids.contains(&chunk.chunk_id))
.collect();
assert_eq!(chunks_to_upload.len(), 2);
assert_eq!(chunks_to_upload[0].chunk_id, 1);
assert_eq!(chunks_to_upload[1].chunk_id, 3);
assert_eq!(chunks_to_upload[0].data, vec![3, 4]);
assert_eq!(chunks_to_upload[1].data, vec![7, 8]);
}
#[test]
fn test_no_double_offset_bug() {
let chunks = vec![
vec![0], vec![1], vec![2], vec![3], vec![4], ];
let chunk_offset = 2;
let chunk_infos = chunks_to_chunk_info(&chunks);
let chunks_to_upload: Vec<_> = chunk_infos
.into_iter()
.skip(chunk_offset)
.collect();
assert_eq!(chunks_to_upload[0].chunk_id, 2);
assert_eq!(chunks_to_upload[0].data, vec![2]);
assert_eq!(chunks_to_upload.len(), 3);
assert_eq!(chunks_to_upload[2].chunk_id, 4);
}
#[test]
fn test_edge_case_resume_from_last_chunk() {
let chunks = vec![vec![1], vec![2], vec![3]];
let chunk_infos = chunks_to_chunk_info(&chunks);
let chunks_to_upload: Vec<_> = chunk_infos
.into_iter()
.skip(2)
.collect();
assert_eq!(chunks_to_upload.len(), 1);
assert_eq!(chunks_to_upload[0].chunk_id, 2);
assert_eq!(chunks_to_upload[0].data, vec![3]);
}
#[test]
fn test_edge_case_resume_beyond_chunks() {
let chunks = vec![vec![1], vec![2]];
let chunk_infos = chunks_to_chunk_info(&chunks);
let chunks_to_upload: Vec<_> = chunk_infos
.into_iter()
.skip(5) .collect();
assert_eq!(chunks_to_upload.len(), 0);
}
}