1use bytes::Bytes;
2use std::collections::HashSet;
3use std::io::Read;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use tokio::sync::mpsc;
8
9use camel_api::{Body, CamelError, Exchange, Message, StreamingSplitExpression, Value};
10
11const DEFAULT_MAX_ENTRIES: usize = 10000;
12const DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
13const DEFAULT_MAX_PER_ENTRY_SIZE: u64 = 512 * 1024 * 1024;
14const DEFAULT_MAX_COMPRESSED_SIZE: u64 = 1_073_741_824;
15const DEFAULT_MAX_PATH_LENGTH: usize = 4096;
16const DEFAULT_CHANNEL_CAPACITY: usize = 2;
17
18pub const CAMEL_ZIP_ENTRY_NAME: &str = "CamelZipEntryName";
19pub const CAMEL_ZIP_ENTRY_PATH: &str = "CamelZipEntryPath";
20pub const CAMEL_ZIP_ENTRY_INDEX: &str = "CamelZipEntryIndex";
21pub const CAMEL_ZIP_ENTRY_SIZE: &str = "CamelZipEntrySize";
22pub const CAMEL_ZIP_ENTRY_COMPRESSED_SIZE: &str = "CamelZipEntryCompressedSize";
23pub const CAMEL_ZIP_ENTRY_CRC32: &str = "CamelZipEntryCrc32";
24pub const CAMEL_ZIP_ENTRY_IS_DIRECTORY: &str = "CamelZipEntryIsDirectory";
25pub const CAMEL_ZIP_ENTRY_COMPRESSION: &str = "CamelZipEntryCompression";
26
27#[derive(Debug, Clone)]
28pub enum DuplicatePolicy {
29 AllowWithIndex,
30 Reject,
31}
32
33#[derive(Debug, Clone)]
34pub struct ZipSplitConfig {
35 pub max_entries: usize,
36 pub max_total_decompressed_size: u64,
37 pub max_per_entry_size: u64,
38 pub max_compressed_size: u64,
39 pub max_path_length: usize,
40 pub allow_empty_directories: bool,
41 pub duplicate_names_policy: DuplicatePolicy,
42 pub channel_capacity: usize,
43}
44
45impl Default for ZipSplitConfig {
46 fn default() -> Self {
47 Self {
48 max_entries: DEFAULT_MAX_ENTRIES,
49 max_total_decompressed_size: DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE,
50 max_per_entry_size: DEFAULT_MAX_PER_ENTRY_SIZE,
51 max_compressed_size: DEFAULT_MAX_COMPRESSED_SIZE,
52 max_path_length: DEFAULT_MAX_PATH_LENGTH,
53 allow_empty_directories: false,
54 duplicate_names_policy: DuplicatePolicy::AllowWithIndex,
55 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
56 }
57 }
58}
59
60fn validate_entry_path(path: &str, max_length: usize) -> Result<String, CamelError> {
61 if path.len() > max_length {
62 return Err(CamelError::TypeConversionFailed(format!(
63 "ZIP entry path exceeds max length: {} > {}",
64 path.len(),
65 max_length
66 )));
67 }
68
69 if path.contains('\0') {
70 return Err(CamelError::TypeConversionFailed(
71 "ZIP entry path contains NUL byte".to_string(),
72 ));
73 }
74
75 if Path::new(path).is_absolute() {
76 return Err(CamelError::TypeConversionFailed(format!(
77 "ZIP entry path is absolute: {path}"
78 )));
79 }
80
81 for component in Path::new(path).components() {
82 if let std::path::Component::ParentDir = component {
83 return Err(CamelError::TypeConversionFailed(format!(
84 "ZIP entry path contains '..' traversal: {path}"
85 )));
86 }
87 }
88
89 if path.contains('\\') {
90 return Err(CamelError::TypeConversionFailed(format!(
91 "ZIP entry path contains backslash: {path}"
92 )));
93 }
94
95 if let Some(c) = path.chars().next()
96 && c.is_ascii_alphabetic()
97 && path.chars().nth(1) == Some(':')
98 {
99 return Err(CamelError::TypeConversionFailed(format!(
100 "ZIP entry path contains Windows drive prefix: {path}"
101 )));
102 }
103
104 Ok(path.to_string())
105}
106
107struct ZipEntryData {
108 index: usize,
109 path: String,
110 size: u64,
111 compressed_size: u64,
112 crc32: Option<u32>,
113 is_dir: bool,
114 compression: String,
115 data: Vec<u8>,
116}
117
118pub fn zip_splitter(config: ZipSplitConfig) -> StreamingSplitExpression {
119 Arc::new(move |exchange: Exchange| {
120 let config = config.clone();
121 Box::pin(async_stream::stream! {
122 let raw = match &exchange.input.body {
123 Body::Bytes(b) => b.to_vec(),
124 Body::Text(s) => s.as_bytes().to_vec(),
125 _ => {
126 yield Err(CamelError::TypeConversionFailed(
127 "ZipSplitter requires Body::Bytes or Body::Text".to_string(),
128 ));
129 return;
130 }
131 };
132
133 if raw.len() as u64 > config.max_compressed_size {
134 yield Err(CamelError::TypeConversionFailed(format!(
135 "ZIP compressed size {} exceeds max {}",
136 raw.len(),
137 config.max_compressed_size
138 )));
139 return;
140 }
141
142 let (tx, mut rx) = mpsc::channel::<Result<ZipEntryData, CamelError>>(config.channel_capacity);
143
144 let total_decompressed = Arc::new(AtomicU64::new(0));
145 let entry_count = Arc::new(AtomicUsize::new(0));
146 let seen_names: Arc<std::sync::Mutex<HashSet<String>>> = Arc::new(std::sync::Mutex::new(HashSet::new()));
147
148 let max_entries = config.max_entries;
149 let max_per_entry = config.max_per_entry_size;
150 let max_total = config.max_total_decompressed_size;
151 let max_path_len = config.max_path_length;
152 let allow_dirs = config.allow_empty_directories;
153 let dup_policy = config.duplicate_names_policy.clone();
154
155 tokio::task::spawn_blocking(move || {
156 let reader = std::io::Cursor::new(&raw);
157 let mut archive = match zip::ZipArchive::new(reader) {
158 Ok(a) => a,
159 Err(e) => {
160 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
161 format!("Invalid ZIP archive: {e}"),
162 )));
163 return;
164 }
165 };
166
167 for i in 0..archive.len() {
168 let mut entry = match archive.by_index(i) {
169 Ok(e) => e,
170 Err(e) => {
171 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
172 format!("Failed to read ZIP entry {i}: {e}"),
173 )));
174 return;
175 }
176 };
177
178 let raw_name = entry.name().to_string();
179 let is_dir = entry.is_dir();
180
181 let validated = match validate_entry_path(&raw_name, max_path_len) {
182 Ok(p) => p,
183 Err(e) => {
184 let _ = tx.blocking_send(Err(e));
185 return;
186 }
187 };
188
189 if is_dir {
190 if allow_dirs {
191 let count = entry_count.fetch_add(1, Ordering::SeqCst);
192 if count >= max_entries {
193 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
194 format!("ZIP exceeds max entries: {max_entries}"),
195 )));
196 return;
197 }
198 let _ = tx.blocking_send(Ok(ZipEntryData {
199 index: count,
200 path: validated,
201 size: 0,
202 compressed_size: entry.compressed_size(),
203 crc32: Some(entry.crc32()),
204 is_dir: true,
205 compression: format!("{:?}", entry.compression()),
206 data: Vec::new(),
207 }));
208 }
209 continue;
210 }
211
212 let compressed_size = entry.compressed_size();
213 let crc32 = entry.crc32();
214
215 let mut data = Vec::new();
216 let mut limited = std::io::Read::take(&mut entry, max_per_entry.saturating_add(1));
217 if let Err(e) = limited.read_to_end(&mut data) {
218 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
219 format!("Failed to decompress ZIP entry '{raw_name}': {e}"),
220 )));
221 return;
222 }
223
224 if data.len() as u64 > max_per_entry {
225 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
226 format!("ZIP entry '{raw_name}' size {} exceeds max {}", data.len(), max_per_entry),
227 )));
228 return;
229 }
230
231 let entry_size = data.len() as u64;
232 let prev_total = total_decompressed.load(Ordering::SeqCst);
233 let new_total = prev_total.saturating_add(entry_size);
234 if new_total > max_total {
235 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
236 format!("ZIP total decompressed size exceeds max {max_total}"),
237 )));
238 return;
239 }
240 total_decompressed.store(new_total, Ordering::SeqCst);
241
242 let count = entry_count.fetch_add(1, Ordering::SeqCst);
243 if count >= max_entries {
244 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
245 format!("ZIP exceeds max entries: {max_entries}"),
246 )));
247 return;
248 }
249
250 match &dup_policy {
251 DuplicatePolicy::Reject => {
252 let mut seen = seen_names.lock().unwrap_or_else(|e| e.into_inner());
253 if seen.contains(&validated) {
254 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
255 format!("Duplicate ZIP entry name: {validated}"),
256 )));
257 return;
258 }
259 seen.insert(validated.clone());
260 }
261 DuplicatePolicy::AllowWithIndex => {}
262 }
263
264 let _ = tx.blocking_send(Ok(ZipEntryData {
265 index: count,
266 path: validated,
267 size: data.len() as u64,
268 compressed_size,
269 crc32: Some(crc32),
270 is_dir: false,
271 compression: format!("{:?}", entry.compression()),
272 data,
273 }));
274 }
275 });
276
277 while let Some(result) = rx.recv().await {
278 match result {
279 Ok(entry) => {
280 let ZipEntryData { index, path, size, compressed_size, crc32, is_dir, compression, data } = entry;
281 let body = if is_dir {
282 Body::Empty
283 } else {
284 Body::Bytes(Bytes::from(data))
285 };
286 let msg = Message {
287 headers: exchange.input.headers.clone(),
288 body,
289 };
290 let mut ex = Exchange::new(msg);
291 ex.properties = exchange.properties.clone();
292 ex.pattern = exchange.pattern;
293 ex.otel_context = exchange.otel_context.clone();
294
295 let entry_name = Path::new(&path)
296 .file_name()
297 .map(|n| n.to_string_lossy().to_string())
298 .unwrap_or_default();
299
300 ex.input.headers.insert(
301 CAMEL_ZIP_ENTRY_NAME.to_string(),
302 Value::String(entry_name),
303 );
304 ex.input.headers.insert(
305 CAMEL_ZIP_ENTRY_PATH.to_string(),
306 Value::String(path),
307 );
308 ex.input.headers.insert(
309 CAMEL_ZIP_ENTRY_INDEX.to_string(),
310 Value::from(index as u64),
311 );
312 ex.input.headers.insert(
313 CAMEL_ZIP_ENTRY_SIZE.to_string(),
314 Value::from(size),
315 );
316 ex.input.headers.insert(
317 CAMEL_ZIP_ENTRY_COMPRESSED_SIZE.to_string(),
318 Value::from(compressed_size),
319 );
320 if let Some(crc) = crc32 {
321 ex.input.headers.insert(
322 CAMEL_ZIP_ENTRY_CRC32.to_string(),
323 Value::from(crc),
324 );
325 }
326 ex.input.headers.insert(
327 CAMEL_ZIP_ENTRY_IS_DIRECTORY.to_string(),
328 Value::Bool(is_dir),
329 );
330 ex.input.headers.insert(
331 CAMEL_ZIP_ENTRY_COMPRESSION.to_string(),
332 Value::String(compression),
333 );
334
335 yield Ok(ex);
336 }
337 Err(e) => {
338 yield Err(e);
339 }
340 }
341 }
342 })
343 })
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use futures::StreamExt;
350 use std::io::Write;
351
352 fn make_zip_with_files(files: Vec<(&str, &[u8])>) -> Vec<u8> {
353 let mut buf = Vec::new();
354 {
355 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
356 let options = zip::write::SimpleFileOptions::default()
357 .compression_method(zip::CompressionMethod::Deflated);
358 for (name, content) in &files {
359 writer.start_file(*name, options).unwrap();
360 writer.write_all(content).unwrap();
361 }
362 writer.finish().unwrap();
363 }
364 buf
365 }
366
367 fn make_zip_with_dirs(entries: Vec<(&str, bool)>) -> Vec<u8> {
368 let mut buf = Vec::new();
369 {
370 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
371 let options = zip::write::SimpleFileOptions::default();
372 for (name, is_dir) in &entries {
373 if *is_dir {
374 writer.add_directory(*name, options).unwrap();
375 } else {
376 writer.start_file(name, options).unwrap();
377 writer.write_all(b"content").unwrap();
378 }
379 }
380 writer.finish().unwrap();
381 }
382 buf
383 }
384
385 async fn collect_entries(
386 config: ZipSplitConfig,
387 zip_data: Vec<u8>,
388 ) -> Vec<Result<Exchange, CamelError>> {
389 let expr = zip_splitter(config);
390 let exchange = Exchange::new(Message {
391 headers: Default::default(),
392 body: Body::Bytes(Bytes::from(zip_data)),
393 });
394 let stream = expr(exchange);
395 stream.collect().await
396 }
397
398 #[tokio::test]
399 async fn test_zip_split_single_file() {
400 let zip_data = make_zip_with_files(vec![("hello.txt", b"hello world")]);
401 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
402 assert_eq!(results.len(), 1);
403 let ex = results[0].as_ref().unwrap();
404 match &ex.input.body {
405 Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello world"),
406 _ => panic!("expected Body::Bytes"),
407 }
408 assert_eq!(
409 ex.input.headers.get(CAMEL_ZIP_ENTRY_NAME),
410 Some(&Value::String("hello.txt".to_string()))
411 );
412 }
413
414 #[tokio::test]
415 async fn test_zip_split_multiple_files() {
416 let zip_data = make_zip_with_files(vec![("a.txt", b"aaa"), ("b.txt", b"bbb")]);
417 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
418 assert_eq!(results.len(), 2);
419 }
420
421 #[tokio::test]
422 async fn test_zip_split_with_directories() {
423 let zip_data = make_zip_with_dirs(vec![("subdir/", true), ("subdir/file.txt", false)]);
424 let config = ZipSplitConfig {
425 allow_empty_directories: true,
426 ..Default::default()
427 };
428 let results = collect_entries(config, zip_data).await;
429 assert_eq!(results.len(), 2);
430 let dir_ex = results[0].as_ref().unwrap();
431 assert!(dir_ex.input.body.is_empty());
432 assert_eq!(
433 dir_ex.input.headers.get(CAMEL_ZIP_ENTRY_IS_DIRECTORY),
434 Some(&Value::Bool(true))
435 );
436 }
437
438 #[tokio::test]
439 async fn test_zip_split_preserves_paths() {
440 let zip_data = make_zip_with_files(vec![("deep/nested/path/file.txt", b"deep")]);
441 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
442 assert_eq!(results.len(), 1);
443 let ex = results[0].as_ref().unwrap();
444 assert_eq!(
445 ex.input.headers.get(CAMEL_ZIP_ENTRY_PATH),
446 Some(&Value::String("deep/nested/path/file.txt".to_string()))
447 );
448 }
449
450 #[tokio::test]
451 async fn test_zip_split_max_entries_exceeded() {
452 let files: Vec<(String, Vec<u8>)> = (0..5)
453 .map(|i| (format!("f{i}.txt"), b"x".to_vec()))
454 .collect();
455 let zip_data = {
456 let mut buf = Vec::new();
457 {
458 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
459 let options = zip::write::SimpleFileOptions::default();
460 for (name, content) in &files {
461 writer.start_file(name, options).unwrap();
462 writer.write_all(content).unwrap();
463 }
464 writer.finish().unwrap();
465 }
466 buf
467 };
468 let config = ZipSplitConfig {
469 max_entries: 3,
470 ..Default::default()
471 };
472 let results = collect_entries(config, zip_data).await;
473 let has_error = results.iter().any(|r| r.is_err());
474 assert!(has_error);
475 }
476
477 #[tokio::test]
478 async fn test_zip_split_path_traversal_rejected() {
479 let mut buf = Vec::new();
480 {
481 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
482 let options = zip::write::SimpleFileOptions::default();
483 writer.start_file("../etc/passwd", options).unwrap();
484 writer.write_all(b"oops").unwrap();
485 writer.finish().unwrap();
486 }
487 let results = collect_entries(ZipSplitConfig::default(), buf).await;
488 let has_error = results.iter().any(|r| r.is_err());
489 assert!(has_error);
490 }
491
492 #[tokio::test]
493 async fn test_zip_split_headers_set() {
494 let zip_data = make_zip_with_files(vec![("test.txt", b"content")]);
495 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
496 let ex = results[0].as_ref().unwrap();
497 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_NAME));
498 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_PATH));
499 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_INDEX));
500 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_SIZE));
501 assert!(
502 ex.input
503 .headers
504 .contains_key(CAMEL_ZIP_ENTRY_COMPRESSED_SIZE)
505 );
506 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_IS_DIRECTORY));
507 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_COMPRESSION));
508 }
509
510 #[tokio::test]
511 async fn test_zip_split_empty_zip() {
512 let mut buf = Vec::new();
513 {
514 let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
515 writer.finish().unwrap();
516 }
517 let results = collect_entries(ZipSplitConfig::default(), buf).await;
518 assert!(results.is_empty());
519 }
520
521 #[tokio::test]
522 async fn test_zip_split_duplicate_names_reject() {
523 let files: Vec<(&str, &[u8])> = vec![("a.txt", b"first"), ("b.txt", b"second")];
526 let zip_data = make_zip_with_files(files);
527 let config = ZipSplitConfig {
528 duplicate_names_policy: DuplicatePolicy::Reject,
529 ..Default::default()
530 };
531 let results = collect_entries(config, zip_data).await;
532 assert_eq!(results.len(), 2);
533 assert!(results.iter().all(|r| r.is_ok()));
534 }
535
536 #[tokio::test]
537 async fn test_zip_split_max_per_entry_size_exceeded() {
538 let zip_data = make_zip_with_files(vec![("big.txt", b"x".repeat(200).as_slice())]);
539 let config = ZipSplitConfig {
540 max_per_entry_size: 100,
541 ..Default::default()
542 };
543 let results = collect_entries(config, zip_data).await;
544 let has_error = results.iter().any(|r| r.is_err());
545 assert!(has_error);
546 }
547
548 #[tokio::test]
549 async fn test_zip_split_max_total_decompressed_size_exceeded() {
550 let zip_data =
551 make_zip_with_files(vec![("a.txt", b"aaaaaaaaaa"), ("b.txt", b"bbbbbbbbbb")]);
552 let config = ZipSplitConfig {
553 max_total_decompressed_size: 15,
554 ..Default::default()
555 };
556 let results = collect_entries(config, zip_data).await;
557 let has_error = results.iter().any(|r| r.is_err());
558 assert!(has_error);
559 }
560
561 #[tokio::test]
562 async fn test_zip_split_corrupt_zip() {
563 let results = collect_entries(ZipSplitConfig::default(), b"not a zip file".to_vec()).await;
564 let has_error = results.iter().any(|r| r.is_err());
565 assert!(has_error);
566 }
567
568 #[tokio::test]
569 async fn test_zip_split_backslash_rejected() {
570 let mut buf = Vec::new();
571 {
572 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
573 let options = zip::write::SimpleFileOptions::default();
574 writer.start_file("sub\\file.txt", options).unwrap();
575 writer.write_all(b"oops").unwrap();
576 writer.finish().unwrap();
577 }
578 let results = collect_entries(ZipSplitConfig::default(), buf).await;
579 let has_error = results.iter().any(|r| r.is_err());
580 assert!(has_error);
581 }
582}