1use crate::errors::{ExtractError, ExtractResult};
2use async_compression::tokio::bufread::GzipDecoder;
3use async_zip::tokio::read::seek::ZipFileReader;
4use futures_util::io::{self, BufReader as FuturesBufReader};
5use futures_util::StreamExt;
6use std::path::{Path, PathBuf};
7use tokio::fs::{create_dir_all, OpenOptions};
8use tokio::io::AsyncBufRead;
9use tokio::io::{AsyncRead, AsyncSeek, BufReader};
10use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
11
12#[cfg(feature = "events")]
13use lighty_event::{EventBus, Event, CoreEvent};
14
15const MAX_FILE_SIZE: u64 = 2 * 1024 * 1024 * 1024;
17const BUFFER_SIZE: usize = 256 * 1024;
19
20pub async fn zip_extract<R>(
29 archive: R,
30 out_dir: &Path,
31 #[cfg(feature = "events")] event_bus: Option<&EventBus>,
32) -> ExtractResult<()>
33where
34 R: AsyncRead + AsyncSeek + Unpin + AsyncBufRead,
35{
36 let out_dir = out_dir.canonicalize()?; let mut reader = ZipFileReader::new(archive.compat()).await?;
38
39 let entries_count = reader.file().entries().len();
40
41 #[cfg(feature = "events")]
42 if let Some(bus) = event_bus {
43 bus.emit(Event::Core(CoreEvent::ExtractionStarted {
44 archive_type: "ZIP".to_string(),
45 file_count: entries_count,
46 destination: out_dir.to_string_lossy().to_string(),
47 }));
48 }
49
50 for index in 0..entries_count {
51 let (_file_name, path, is_dir, uncompressed_size) = {
53 let entry = reader.file().entries().get(index)
54 .ok_or_else(|| ExtractError::ZipEntryNotFound { index })?;
55
56 let file_name = entry.filename().as_str()?;
57 let is_dir = entry.dir()?;
58 let uncompressed_size = entry.uncompressed_size();
59
60 let sanitized = sanitize_file_path(file_name);
62
63 if sanitized.is_absolute() {
65 return Err(ExtractError::AbsolutePath {
66 path: file_name.to_string()
67 });
68 }
69
70 let path = out_dir.join(&sanitized);
71
72 if !is_path_within_base(&path, &out_dir)? {
74 return Err(ExtractError::PathTraversal {
75 path: file_name.to_string()
76 });
77 }
78
79 (file_name.to_string(), path, is_dir, uncompressed_size)
80 };
81
82 if is_dir {
84 create_dir_all(&path).await?;
86 } else {
87 if uncompressed_size > MAX_FILE_SIZE {
89 return Err(ExtractError::FileTooLarge {
90 size: uncompressed_size,
91 max: MAX_FILE_SIZE,
92 });
93 }
94
95 if let Some(parent) = path.parent() {
97 create_dir_all(parent).await?;
98 }
99
100 let entry_reader = reader.reader_with_entry(index).await?;
102 let buffered_reader = FuturesBufReader::with_capacity(BUFFER_SIZE, entry_reader);
103
104 let file = OpenOptions::new()
105 .write(true)
106 .create(true)
107 .truncate(true)
108 .open(&path)
109 .await?;
110
111 io::copy(buffered_reader, &mut file.compat_write()).await?;
113 }
114
115 #[cfg(feature = "events")]
117 if let Some(bus) = event_bus {
118 if (index + 1) % 10 == 0 || (index + 1) == entries_count {
119 bus.emit(Event::Core(CoreEvent::ExtractionProgress {
120 files_extracted: index + 1,
121 total_files: entries_count,
122 }));
123 }
124 }
125 }
126
127 #[cfg(feature = "events")]
128 if let Some(bus) = event_bus {
129 bus.emit(Event::Core(CoreEvent::ExtractionCompleted {
130 archive_type: "ZIP".to_string(),
131 files_extracted: entries_count,
132 }));
133 }
134
135 Ok(())
136}
137
138pub async fn tar_gz_extract<R>(
146 archive: R,
147 out_dir: &Path,
148 #[cfg(feature = "events")] event_bus: Option<&EventBus>,
149) -> ExtractResult<()>
150where
151 R: AsyncRead + Unpin,
152{
153 let out_dir = out_dir.canonicalize()?;
154 let decoder = GzipDecoder::new(BufReader::new(archive));
155 let mut ar = tokio_tar::Archive::new(decoder);
156
157 #[cfg(feature = "events")]
158 if let Some(bus) = event_bus {
159 bus.emit(Event::Core(CoreEvent::ExtractionStarted {
160 archive_type: "TAR.GZ".to_string(),
161 file_count: 0, destination: out_dir.to_string_lossy().to_string(),
163 }));
164 }
165
166 let mut entries = ar.entries()?;
168 let mut files_extracted = 0usize;
169
170 while let Some(entry) = entries.next().await {
171 let mut entry = entry?;
172 let path = entry.path()?.to_path_buf();
173
174 if path.is_absolute() {
176 continue;
177 }
178
179 let dest = out_dir.join(&path);
180
181 if !is_path_within_base(&dest, &out_dir)? {
183 continue;
184 }
185
186 let entry_type = entry.header().entry_type();
188 if entry_type.is_symlink() || entry_type.is_hard_link() {
189 continue;
190 }
191
192 let size = entry.header().size()?;
194 if size > MAX_FILE_SIZE {
195 return Err(ExtractError::FileTooLarge {
196 size,
197 max: MAX_FILE_SIZE,
198 });
199 }
200
201 entry.unpack(&dest).await?;
203
204 files_extracted += 1;
205
206 #[cfg(feature = "events")]
208 if let Some(bus) = event_bus {
209 if files_extracted % 10 == 0 {
210 bus.emit(Event::Core(CoreEvent::ExtractionProgress {
211 files_extracted,
212 total_files: 0, }));
214 }
215 }
216 }
217
218 #[cfg(feature = "events")]
219 if let Some(bus) = event_bus {
220 bus.emit(Event::Core(CoreEvent::ExtractionCompleted {
221 archive_type: "TAR.GZ".to_string(),
222 files_extracted,
223 }));
224 }
225
226 Ok(())
227}
228
229fn sanitize_file_path(path: &str) -> PathBuf {
231 path.replace('\\', "/")
233 .split('/')
235 .map(sanitize_filename::sanitize)
236 .collect()
237}
238
239fn is_path_within_base(path: &Path, base: &Path) -> ExtractResult<bool> {
242 let normalized_path: PathBuf = path.components()
244 .fold(PathBuf::new(), |mut acc, component| {
245 match component {
246 std::path::Component::Normal(c) => acc.push(c),
247 std::path::Component::ParentDir => { acc.pop(); },
248 std::path::Component::CurDir => {},
249 _ => acc.push(component),
250 }
251 acc
252 });
253
254 let normalized_base: PathBuf = base.components()
255 .fold(PathBuf::new(), |mut acc, component| {
256 match component {
257 std::path::Component::Normal(c) => acc.push(c),
258 std::path::Component::ParentDir => { acc.pop(); },
259 std::path::Component::CurDir => {},
260 _ => acc.push(component),
261 }
262 acc
263 });
264
265 Ok(normalized_path.starts_with(&normalized_base))
266}