1use crate::*;
2
3impl<'a, F> ChunkNaming<'a> for F where F: Fn(&'a str, usize) -> String + Send + Sync {}
4
5impl<'a> ChunkStrategy<'a> {
6 pub fn new<F>(
7 start_chunk_index: usize,
8 upload_dir: &'a str,
9 file_id: &'a str,
10 file_name: &'a str,
11 total_chunks: usize,
12 file_name_func: F,
13 ) -> NewChunkStrategyResult<'a>
14 where
15 F: ChunkNaming<'a> + 'static,
16 {
17 if start_chunk_index >= total_chunks {
18 return Err(ChunkStrategyError::IndexOutOfBounds(
19 start_chunk_index,
20 total_chunks,
21 ));
22 }
23 Ok(Self {
24 upload_dir,
25 start_chunk_index,
26 file_id,
27 file_name,
28 total_chunks,
29 file_name_func: Box::new(file_name_func),
30 })
31 }
32
33 fn get_chunk_json_path(&self, file_id: &'a str, chunk_index: usize) -> String {
34 (self.file_name_func)(file_id, chunk_index)
35 }
36
37 fn get_chunk_path(&self, file_id: &'a str, chunk_index: usize) -> String {
38 Path::new(&self.upload_dir)
39 .join(self.get_chunk_json_path(file_id, chunk_index))
40 .to_string_lossy()
41 .into_owned()
42 }
43
44 async fn save_chunk(&self, chunk_path: &str, chunk_data: &[u8]) -> ChunkStrategyResult {
45 async_write_to_file(chunk_path, chunk_data)
46 .await
47 .map_err(|e| {
48 ChunkStrategyError::WriteChunk(format!(
49 "Failed to write chunk to {}: {}",
50 chunk_path, e
51 ))
52 })?;
53 Ok(())
54 }
55}
56
57impl<'a> HandleStrategy<'a> for ChunkStrategy<'a> {
58 async fn save_chunk(&self, chunk_data: &'a [u8], chunk_index: usize) -> ChunkStrategyResult {
59 if !Path::new(&self.upload_dir).exists() {
60 fs::create_dir_all(&self.upload_dir)
61 .map_err(|e| ChunkStrategyError::CreateDirectory(e.to_string()))?;
62 }
63 let chunk_path: String = self.get_chunk_path(self.file_id, chunk_index);
64 self.save_chunk(&chunk_path, &chunk_data).await?;
65 let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
66 .entry(self.file_id.to_owned())
67 .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
68 let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
69 if chunks_status.len() != self.total_chunks {
70 *chunks_status = vec![false; self.total_chunks];
71 }
72 if chunk_index >= chunks_status.len() {
73 return Err(ChunkStrategyError::IndexOutOfBounds(
74 chunk_index,
75 self.total_chunks,
76 ));
77 }
78 chunks_status[chunk_index] = true;
79 Ok(())
80 }
81
82 async fn merge_chunks(&self) -> ChunkStrategyResult {
83 let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
84 .entry(self.file_id.to_owned())
85 .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
86 let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
87 let all_chunks_uploaded: bool = chunks_status.iter().all(|&status| status);
88 if !all_chunks_uploaded {
89 return Err(ChunkStrategyError::Merge);
90 }
91 chunks_status.clear();
92 drop(chunks_status);
93 let final_path: String = Path::new(&self.upload_dir)
94 .join(self.file_name)
95 .to_string_lossy()
96 .into_owned();
97 let output_file: File = OpenOptions::new()
98 .create(true)
99 .write(true)
100 .open(&final_path)
101 .map_err(|e| ChunkStrategyError::CreateOutputFile(e.to_string()))?;
102 let mut writer: BufWriter<File> = BufWriter::new(output_file);
103 for i in self.start_chunk_index..self.total_chunks {
104 let chunk_path: String = self.get_chunk_path(self.file_id, i);
105 let chunk_data: Vec<u8> = async_read_from_file(&chunk_path).await.map_err(|e| {
106 ChunkStrategyError::ReadChunk(format!(
107 "Failed to read chunk from {}: {}",
108 chunk_path, e
109 ))
110 })?;
111 writer
112 .write_all(&chunk_data)
113 .map_err(|e| ChunkStrategyError::WriteOutput(e.to_string()))?;
114 let _ = fs::remove_file(&chunk_path);
115 }
116 Ok(())
117 }
118}