1use crate::*;
2
3impl<'a, F> ChunkNaming<'a> for F where F: Fn(&'a str, usize) -> String + Send + Sync {}
5
6impl<'a> ChunkStrategy<'a> {
8 pub fn new<F>(
23 start_chunk_index: usize,
24 upload_dir: &'a str,
25 file_id: &'a str,
26 file_name: &'a str,
27 total_chunks: usize,
28 file_name_func: F,
29 ) -> NewChunkStrategyResult<'a>
30 where
31 F: ChunkNaming<'a> + 'static,
32 {
33 if start_chunk_index >= total_chunks {
34 return Err(ChunkStrategyError::IndexOutOfBounds(
35 start_chunk_index,
36 total_chunks,
37 ));
38 }
39 Ok(Self {
40 upload_dir,
41 start_chunk_index,
42 file_id,
43 file_name,
44 total_chunks,
45 file_name_func: Box::new(file_name_func),
46 })
47 }
48
49 fn get_chunk_json_path(&self, file_id: &'a str, chunk_index: usize) -> String {
60 (self.file_name_func)(file_id, chunk_index)
61 }
62
63 fn get_chunk_path(&self, file_id: &'a str, chunk_index: usize) -> String {
74 Path::new(&self.upload_dir)
75 .join(self.get_chunk_json_path(file_id, chunk_index))
76 .to_string_lossy()
77 .into_owned()
78 }
79
80 async fn save_chunk(&self, chunk_path: &str, chunk_data: &[u8]) -> ChunkStrategyResult {
91 async_write_to_file(chunk_path, chunk_data)
92 .await
93 .map_err(|e| {
94 ChunkStrategyError::WriteChunk(format!(
95 "Failed to write chunk to {}: {}",
96 chunk_path, e
97 ))
98 })?;
99 Ok(())
100 }
101}
102
103impl<'a> HandleStrategy<'a> for ChunkStrategy<'a> {
105 async fn save_chunk(&self, chunk_data: &'a [u8], chunk_index: usize) -> ChunkStrategyResult {
116 if !Path::new(&self.upload_dir).exists() {
117 fs::create_dir_all(&self.upload_dir)
118 .map_err(|e| ChunkStrategyError::CreateDirectory(e.to_string()))?;
119 }
120 let chunk_path: String = self.get_chunk_path(self.file_id, chunk_index);
121 self.save_chunk(&chunk_path, &chunk_data).await?;
122 let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
123 .entry(self.file_id.to_owned())
124 .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
125 let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
126 if chunks_status.len() != self.total_chunks {
127 *chunks_status = vec![false; self.total_chunks];
128 }
129 if chunk_index >= chunks_status.len() {
130 return Err(ChunkStrategyError::IndexOutOfBounds(
131 chunk_index,
132 self.total_chunks,
133 ));
134 }
135 chunks_status[chunk_index] = true;
136 Ok(())
137 }
138
139 async fn merge_chunks(&self) -> ChunkStrategyResult {
145 let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
146 .entry(self.file_id.to_owned())
147 .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
148 let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
149 let all_chunks_uploaded: bool = chunks_status.iter().all(|&status| status);
150 if !all_chunks_uploaded {
151 return Err(ChunkStrategyError::Merge);
152 }
153 chunks_status.clear();
154 drop(chunks_status);
155 let final_path: String = Path::new(&self.upload_dir)
156 .join(self.file_name)
157 .to_string_lossy()
158 .into_owned();
159 let output_file: File = OpenOptions::new()
160 .create(true)
161 .write(true)
162 .open(&final_path)
163 .map_err(|e| ChunkStrategyError::CreateOutputFile(e.to_string()))?;
164 let mut writer: BufWriter<File> = BufWriter::new(output_file);
165 for i in self.start_chunk_index..self.total_chunks {
166 let chunk_path: String = self.get_chunk_path(self.file_id, i);
167 let chunk_data: Vec<u8> = async_read_from_file(&chunk_path).await.map_err(|e| {
168 ChunkStrategyError::ReadChunk(format!(
169 "Failed to read chunk from {}: {}",
170 chunk_path, e
171 ))
172 })?;
173 writer
174 .write_all(&chunk_data)
175 .map_err(|e| ChunkStrategyError::WriteOutput(e.to_string()))?;
176 let _ = fs::remove_file(&chunk_path);
177 }
178 Ok(())
179 }
180}