chunkify/chunk/
impl.rs

1use crate::*;
2
3impl<'a, F> ChunkNaming<'a> for F where F: Fn(&'a str, usize) -> String + Send + Sync {}
4
5impl<'a> ChunkStrategy<'a> {
6    pub fn new<F>(
7        start_chunk_index: usize,
8        upload_dir: &'a str,
9        file_id: &'a str,
10        file_name: &'a str,
11        total_chunks: usize,
12        file_name_func: F,
13    ) -> NewChunkStrategyResult<'a>
14    where
15        F: ChunkNaming<'a> + 'static,
16    {
17        if start_chunk_index >= total_chunks {
18            return Err(ChunkStrategyError::IndexOutOfBounds(
19                start_chunk_index,
20                total_chunks,
21            ));
22        }
23        Ok(Self {
24            upload_dir,
25            start_chunk_index,
26            file_id,
27            file_name,
28            total_chunks,
29            file_name_func: Box::new(file_name_func),
30        })
31    }
32
33    fn get_chunk_json_path(&self, file_id: &'a str, chunk_index: usize) -> String {
34        (self.file_name_func)(file_id, chunk_index)
35    }
36
37    fn get_chunk_path(&self, file_id: &'a str, chunk_index: usize) -> String {
38        Path::new(&self.upload_dir)
39            .join(self.get_chunk_json_path(file_id, chunk_index))
40            .to_string_lossy()
41            .into_owned()
42    }
43
44    async fn save_chunk(&self, chunk_path: &str, chunk_data: &[u8]) -> ChunkStrategyResult {
45        async_write_to_file(chunk_path, chunk_data)
46            .await
47            .map_err(|e| {
48                ChunkStrategyError::WriteChunk(format!(
49                    "Failed to write chunk to {}: {}",
50                    chunk_path, e
51                ))
52            })?;
53        Ok(())
54    }
55}
56
57impl<'a> HandleStrategy<'a> for ChunkStrategy<'a> {
58    async fn save_chunk(&self, chunk_data: &'a [u8], chunk_index: usize) -> ChunkStrategyResult {
59        if !Path::new(&self.upload_dir).exists() {
60            fs::create_dir_all(&self.upload_dir)
61                .map_err(|e| ChunkStrategyError::CreateDirectory(e.to_string()))?;
62        }
63        let chunk_path: String = self.get_chunk_path(self.file_id, chunk_index);
64        self.save_chunk(&chunk_path, &chunk_data).await?;
65        let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
66            .entry(self.file_id.to_owned())
67            .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
68        let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
69        if chunks_status.len() != self.total_chunks {
70            *chunks_status = vec![false; self.total_chunks];
71        }
72        if chunk_index >= chunks_status.len() {
73            return Err(ChunkStrategyError::IndexOutOfBounds(
74                chunk_index,
75                self.total_chunks,
76            ));
77        }
78        chunks_status[chunk_index] = true;
79        Ok(())
80    }
81
82    async fn merge_chunks(&self) -> ChunkStrategyResult {
83        let chunks_status: RefMut<'_, String, RwLock<Vec<bool>>> = UPLOADING_FILES
84            .entry(self.file_id.to_owned())
85            .or_insert_with(|| RwLock::new(vec![false; self.total_chunks]));
86        let mut chunks_status: RwLockWriteGuard<'_, Vec<bool>> = chunks_status.write().await;
87        let all_chunks_uploaded: bool = chunks_status.iter().all(|&status| status);
88        if !all_chunks_uploaded {
89            return Err(ChunkStrategyError::Merge);
90        }
91        chunks_status.clear();
92        drop(chunks_status);
93        let final_path: String = Path::new(&self.upload_dir)
94            .join(self.file_name)
95            .to_string_lossy()
96            .into_owned();
97        let output_file: File = OpenOptions::new()
98            .create(true)
99            .write(true)
100            .open(&final_path)
101            .map_err(|e| ChunkStrategyError::CreateOutputFile(e.to_string()))?;
102        let mut writer: BufWriter<File> = BufWriter::new(output_file);
103        for i in self.start_chunk_index..self.total_chunks {
104            let chunk_path: String = self.get_chunk_path(self.file_id, i);
105            let chunk_data: Vec<u8> = async_read_from_file(&chunk_path).await.map_err(|e| {
106                ChunkStrategyError::ReadChunk(format!(
107                    "Failed to read chunk from {}: {}",
108                    chunk_path, e
109                ))
110            })?;
111            writer
112                .write_all(&chunk_data)
113                .map_err(|e| ChunkStrategyError::WriteOutput(e.to_string()))?;
114            let _ = fs::remove_file(&chunk_path);
115        }
116        Ok(())
117    }
118}