1use bytes::Bytes;
2use std::collections::HashSet;
3use std::io::Read;
4use std::path::Path;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use tokio::sync::mpsc;
9
10use camel_api::{Body, CamelError, Exchange, Message, StreamingSplitExpression, Value};
11use futures::Stream;
12
13const DEFAULT_MAX_ENTRIES: usize = 10000;
14const DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
15const DEFAULT_MAX_PER_ENTRY_SIZE: u64 = 512 * 1024 * 1024;
16const DEFAULT_MAX_COMPRESSED_SIZE: u64 = 1_073_741_824;
17const DEFAULT_MAX_PATH_LENGTH: usize = 4096;
18const DEFAULT_CHANNEL_CAPACITY: usize = 2;
19
20pub const CAMEL_ZIP_ENTRY_NAME: &str = "CamelZipEntryName";
21pub const CAMEL_ZIP_ENTRY_PATH: &str = "CamelZipEntryPath";
22pub const CAMEL_ZIP_ENTRY_INDEX: &str = "CamelZipEntryIndex";
23pub const CAMEL_ZIP_ENTRY_SIZE: &str = "CamelZipEntrySize";
24pub const CAMEL_ZIP_ENTRY_COMPRESSED_SIZE: &str = "CamelZipEntryCompressedSize";
25pub const CAMEL_ZIP_ENTRY_CRC32: &str = "CamelZipEntryCrc32";
26pub const CAMEL_ZIP_ENTRY_IS_DIRECTORY: &str = "CamelZipEntryIsDirectory";
27pub const CAMEL_ZIP_ENTRY_COMPRESSION: &str = "CamelZipEntryCompression";
28
29#[derive(Debug, Clone)]
30pub enum DuplicatePolicy {
31 AllowWithIndex,
32 Reject,
33}
34
35#[derive(Debug, Clone)]
36pub struct ZipSplitConfig {
37 pub max_entries: usize,
38 pub max_total_decompressed_size: u64,
39 pub max_per_entry_size: u64,
40 pub max_compressed_size: u64,
41 pub max_path_length: usize,
42 pub allow_empty_directories: bool,
43 pub duplicate_names_policy: DuplicatePolicy,
44 pub channel_capacity: usize,
45}
46
47impl Default for ZipSplitConfig {
48 fn default() -> Self {
49 Self {
50 max_entries: DEFAULT_MAX_ENTRIES,
51 max_total_decompressed_size: DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE,
52 max_per_entry_size: DEFAULT_MAX_PER_ENTRY_SIZE,
53 max_compressed_size: DEFAULT_MAX_COMPRESSED_SIZE,
54 max_path_length: DEFAULT_MAX_PATH_LENGTH,
55 allow_empty_directories: false,
56 duplicate_names_policy: DuplicatePolicy::AllowWithIndex,
57 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
58 }
59 }
60}
61
62fn validate_entry_path(path: &str, max_length: usize) -> Result<String, CamelError> {
63 if path.len() > max_length {
64 return Err(CamelError::TypeConversionFailed(format!(
65 "ZIP entry path exceeds max length: {} > {}",
66 path.len(),
67 max_length
68 )));
69 }
70
71 if path.contains('\0') {
72 return Err(CamelError::TypeConversionFailed(
73 "ZIP entry path contains NUL byte".to_string(),
74 ));
75 }
76
77 if Path::new(path).is_absolute() {
78 return Err(CamelError::TypeConversionFailed(format!(
79 "ZIP entry path is absolute: {path}"
80 )));
81 }
82
83 for component in Path::new(path).components() {
84 if let std::path::Component::ParentDir = component {
85 return Err(CamelError::TypeConversionFailed(format!(
86 "ZIP entry path contains '..' traversal: {path}"
87 )));
88 }
89 }
90
91 if path.contains('\\') {
92 return Err(CamelError::TypeConversionFailed(format!(
93 "ZIP entry path contains backslash: {path}"
94 )));
95 }
96
97 if let Some(c) = path.chars().next()
98 && c.is_ascii_alphabetic()
99 && path.chars().nth(1) == Some(':')
100 {
101 return Err(CamelError::TypeConversionFailed(format!(
102 "ZIP entry path contains Windows drive prefix: {path}"
103 )));
104 }
105
106 Ok(path.to_string())
107}
108
109struct ZipEntryData {
110 index: usize,
111 path: String,
112 size: u64,
113 compressed_size: u64,
114 crc32: Option<u32>,
115 is_dir: bool,
116 compression: String,
117 data: Vec<u8>,
118}
119
120pub fn split_zip_bytes(
129 parent: Exchange,
130 bytes: Bytes,
131 config: ZipSplitConfig,
132) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
133 Box::pin(async_stream::stream! {
134 if config.channel_capacity == 0 {
135 yield Err(CamelError::Config(
136 "ZipSplitConfig.channel_capacity must be > 0".into(),
137 ));
138 return;
139 }
140
141 if bytes.len() as u64 > config.max_compressed_size {
142 yield Err(CamelError::TypeConversionFailed(format!(
143 "ZIP compressed size {} exceeds max {}",
144 bytes.len(),
145 config.max_compressed_size
146 )));
147 return;
148 }
149
150 let (tx, mut rx) = mpsc::channel::<Result<ZipEntryData, CamelError>>(config.channel_capacity);
151
152 let total_decompressed = Arc::new(AtomicU64::new(0));
153 let entry_count = Arc::new(AtomicUsize::new(0));
154 let seen_names: Arc<std::sync::Mutex<HashSet<String>>> =
155 Arc::new(std::sync::Mutex::new(HashSet::new()));
156
157 let max_entries = config.max_entries;
158 let max_per_entry = config.max_per_entry_size;
159 let max_total = config.max_total_decompressed_size;
160 let max_path_len = config.max_path_length;
161 let allow_dirs = config.allow_empty_directories;
162 let dup_policy = config.duplicate_names_policy.clone();
163
164 tokio::task::spawn_blocking(move || {
165 let reader = std::io::Cursor::new(bytes);
166 let mut archive = match zip::ZipArchive::new(reader) {
167 Ok(a) => a,
168 Err(e) => {
169 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
170 format!("Invalid ZIP archive: {e}"),
171 )));
172 return;
173 }
174 };
175
176 for i in 0..archive.len() {
177 let mut entry = match archive.by_index(i) {
178 Ok(e) => e,
179 Err(e) => {
180 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
181 format!("Failed to read ZIP entry {i}: {e}"),
182 )));
183 return;
184 }
185 };
186
187 let raw_name = entry.name().to_string();
188 let is_dir = entry.is_dir();
189
190 let validated = match validate_entry_path(&raw_name, max_path_len) {
191 Ok(p) => p,
192 Err(e) => {
193 let _ = tx.blocking_send(Err(e));
194 return;
195 }
196 };
197
198 if is_dir {
199 if allow_dirs {
200 let count = entry_count.fetch_add(1, Ordering::SeqCst);
201 if count >= max_entries {
202 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
203 format!("ZIP exceeds max entries: {max_entries}"),
204 )));
205 return;
206 }
207 if tx.blocking_send(Ok(ZipEntryData {
208 index: count,
209 path: validated,
210 size: 0,
211 compressed_size: entry.compressed_size(),
212 crc32: Some(entry.crc32()),
213 is_dir: true,
214 compression: format!("{:?}", entry.compression()),
215 data: Vec::new(),
216 }))
217 .is_err()
218 {
219 return;
220 }
221 }
222 continue;
223 }
224
225 let compressed_size = entry.compressed_size();
226 let crc32 = entry.crc32();
227
228 let mut data = Vec::new();
229 let mut limited =
230 std::io::Read::take(&mut entry, max_per_entry.saturating_add(1));
231 if let Err(e) = limited.read_to_end(&mut data) {
232 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
233 format!("Failed to decompress ZIP entry '{raw_name}': {e}"),
234 )));
235 return;
236 }
237
238 if data.len() as u64 > max_per_entry {
239 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
240 format!(
241 "ZIP entry '{raw_name}' size {} exceeds max {}",
242 data.len(),
243 max_per_entry
244 ),
245 )));
246 return;
247 }
248
249 let entry_size = data.len() as u64;
250 let prev_total = total_decompressed.load(Ordering::SeqCst);
251 let new_total = prev_total.saturating_add(entry_size);
252 if new_total > max_total {
253 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
254 format!("ZIP total decompressed size exceeds max {max_total}"),
255 )));
256 return;
257 }
258 total_decompressed.store(new_total, Ordering::SeqCst);
259
260 let count = entry_count.fetch_add(1, Ordering::SeqCst);
261 if count >= max_entries {
262 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
263 format!("ZIP exceeds max entries: {max_entries}"),
264 )));
265 return;
266 }
267
268 match &dup_policy {
269 DuplicatePolicy::Reject => {
270 let mut seen = seen_names.lock().unwrap_or_else(|e| e.into_inner());
271 if seen.contains(&validated) {
272 let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
273 format!("Duplicate ZIP entry name: {validated}"),
274 )));
275 return;
276 }
277 seen.insert(validated.clone());
278 }
279 DuplicatePolicy::AllowWithIndex => {}
280 }
281
282 if tx
283 .blocking_send(Ok(ZipEntryData {
284 index: count,
285 path: validated,
286 size: data.len() as u64,
287 compressed_size,
288 crc32: Some(crc32),
289 is_dir: false,
290 compression: format!("{:?}", entry.compression()),
291 data,
292 }))
293 .is_err()
294 {
295 return;
296 }
297 }
298 });
299
300 while let Some(result) = rx.recv().await {
301 match result {
302 Ok(entry) => {
303 let ZipEntryData {
304 index,
305 path,
306 size,
307 compressed_size,
308 crc32,
309 is_dir,
310 compression,
311 data,
312 } = entry;
313 let body = if is_dir {
314 Body::Empty
315 } else {
316 Body::Bytes(Bytes::from(data))
317 };
318 let msg = Message {
319 headers: parent.input.headers.clone(),
320 body,
321 };
322 let mut ex = Exchange::new(msg);
323 ex.input.headers.remove("Content-Length");
325 ex.input.headers.remove("Content-Type");
326 ex.properties = parent.properties.clone();
327 ex.pattern = parent.pattern;
328 ex.otel_context = parent.otel_context.clone();
329
330 let entry_name = Path::new(&path)
331 .file_name()
332 .map(|n| n.to_string_lossy().to_string())
333 .unwrap_or_default();
334
335 ex.input.headers.insert(
336 CAMEL_ZIP_ENTRY_NAME.to_string(),
337 Value::String(entry_name),
338 );
339 ex.input.headers.insert(
340 CAMEL_ZIP_ENTRY_PATH.to_string(),
341 Value::String(path),
342 );
343 ex.input.headers.insert(
344 CAMEL_ZIP_ENTRY_INDEX.to_string(),
345 Value::from(index as u64),
346 );
347 ex.input.headers
348 .insert(CAMEL_ZIP_ENTRY_SIZE.to_string(), Value::from(size));
349 ex.input.headers.insert(
350 CAMEL_ZIP_ENTRY_COMPRESSED_SIZE.to_string(),
351 Value::from(compressed_size),
352 );
353 if let Some(crc) = crc32 {
354 ex.input
355 .headers
356 .insert(CAMEL_ZIP_ENTRY_CRC32.to_string(), Value::from(crc));
357 }
358 ex.input.headers.insert(
359 CAMEL_ZIP_ENTRY_IS_DIRECTORY.to_string(),
360 Value::Bool(is_dir),
361 );
362 ex.input.headers.insert(
363 CAMEL_ZIP_ENTRY_COMPRESSION.to_string(),
364 Value::String(compression),
365 );
366
367 yield Ok(ex);
368 }
369 Err(e) => {
370 yield Err(e);
371 }
372 }
373 }
374 })
375}
376
377pub fn zip_splitter(config: ZipSplitConfig) -> StreamingSplitExpression {
378 Arc::new(move |exchange: Exchange| {
379 let config = config.clone();
380 match exchange.input.body.clone() {
381 Body::Bytes(b) => split_zip_bytes(exchange, b, config),
382 Body::Text(s) => split_zip_bytes(exchange, Bytes::from(s.as_bytes().to_vec()), config),
383 _ => Box::pin(async_stream::stream! {
384 yield Err(CamelError::TypeConversionFailed(
385 "ZipSplitter requires Body::Bytes or Body::Text".to_string(),
386 ));
387 }),
388 }
389 })
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use futures::StreamExt;
396 use std::io::Write;
397
398 fn make_zip_with_files(files: Vec<(&str, &[u8])>) -> Vec<u8> {
399 let mut buf = Vec::new();
400 {
401 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
402 let options = zip::write::SimpleFileOptions::default()
403 .compression_method(zip::CompressionMethod::Deflated);
404 for (name, content) in &files {
405 writer.start_file(*name, options).unwrap();
406 writer.write_all(content).unwrap();
407 }
408 writer.finish().unwrap();
409 }
410 buf
411 }
412
413 fn make_zip_with_dirs(entries: Vec<(&str, bool)>) -> Vec<u8> {
414 let mut buf = Vec::new();
415 {
416 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
417 let options = zip::write::SimpleFileOptions::default();
418 for (name, is_dir) in &entries {
419 if *is_dir {
420 writer.add_directory(*name, options).unwrap();
421 } else {
422 writer.start_file(name, options).unwrap();
423 writer.write_all(b"content").unwrap();
424 }
425 }
426 writer.finish().unwrap();
427 }
428 buf
429 }
430
431 async fn collect_entries(
432 config: ZipSplitConfig,
433 zip_data: Vec<u8>,
434 ) -> Vec<Result<Exchange, CamelError>> {
435 let expr = zip_splitter(config);
436 let exchange = Exchange::new(Message {
437 headers: Default::default(),
438 body: Body::Bytes(Bytes::from(zip_data)),
439 });
440 let stream = expr(exchange);
441 stream.collect().await
442 }
443
444 #[tokio::test]
445 async fn test_zip_split_single_file() {
446 let zip_data = make_zip_with_files(vec![("hello.txt", b"hello world")]);
447 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
448 assert_eq!(results.len(), 1);
449 let ex = results[0].as_ref().unwrap();
450 match &ex.input.body {
451 Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello world"),
452 _ => panic!("expected Body::Bytes"),
453 }
454 assert_eq!(
455 ex.input.headers.get(CAMEL_ZIP_ENTRY_NAME),
456 Some(&Value::String("hello.txt".to_string()))
457 );
458 }
459
460 #[tokio::test]
461 async fn test_zip_split_multiple_files() {
462 let zip_data = make_zip_with_files(vec![("a.txt", b"aaa"), ("b.txt", b"bbb")]);
463 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
464 assert_eq!(results.len(), 2);
465 }
466
467 #[tokio::test]
468 async fn test_zip_split_with_directories() {
469 let zip_data = make_zip_with_dirs(vec![("subdir/", true), ("subdir/file.txt", false)]);
470 let config = ZipSplitConfig {
471 allow_empty_directories: true,
472 ..Default::default()
473 };
474 let results = collect_entries(config, zip_data).await;
475 assert_eq!(results.len(), 2);
476 let dir_ex = results[0].as_ref().unwrap();
477 assert!(dir_ex.input.body.is_empty());
478 assert_eq!(
479 dir_ex.input.headers.get(CAMEL_ZIP_ENTRY_IS_DIRECTORY),
480 Some(&Value::Bool(true))
481 );
482 }
483
484 #[tokio::test]
485 async fn test_zip_split_preserves_paths() {
486 let zip_data = make_zip_with_files(vec![("deep/nested/path/file.txt", b"deep")]);
487 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
488 assert_eq!(results.len(), 1);
489 let ex = results[0].as_ref().unwrap();
490 assert_eq!(
491 ex.input.headers.get(CAMEL_ZIP_ENTRY_PATH),
492 Some(&Value::String("deep/nested/path/file.txt".to_string()))
493 );
494 }
495
496 #[tokio::test]
497 async fn test_zip_split_max_entries_exceeded() {
498 let files: Vec<(String, Vec<u8>)> = (0..5)
499 .map(|i| (format!("f{i}.txt"), b"x".to_vec()))
500 .collect();
501 let zip_data = {
502 let mut buf = Vec::new();
503 {
504 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
505 let options = zip::write::SimpleFileOptions::default();
506 for (name, content) in &files {
507 writer.start_file(name, options).unwrap();
508 writer.write_all(content).unwrap();
509 }
510 writer.finish().unwrap();
511 }
512 buf
513 };
514 let config = ZipSplitConfig {
515 max_entries: 3,
516 ..Default::default()
517 };
518 let results = collect_entries(config, zip_data).await;
519 let has_error = results.iter().any(|r| r.is_err());
520 assert!(has_error);
521 }
522
523 #[tokio::test]
524 async fn test_zip_split_path_traversal_rejected() {
525 let mut buf = Vec::new();
526 {
527 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
528 let options = zip::write::SimpleFileOptions::default();
529 writer.start_file("../etc/passwd", options).unwrap();
530 writer.write_all(b"oops").unwrap();
531 writer.finish().unwrap();
532 }
533 let results = collect_entries(ZipSplitConfig::default(), buf).await;
534 let has_error = results.iter().any(|r| r.is_err());
535 assert!(has_error);
536 }
537
538 #[tokio::test]
539 async fn test_zip_split_headers_set() {
540 let zip_data = make_zip_with_files(vec![("test.txt", b"content")]);
541 let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
542 let ex = results[0].as_ref().unwrap();
543 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_NAME));
544 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_PATH));
545 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_INDEX));
546 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_SIZE));
547 assert!(
548 ex.input
549 .headers
550 .contains_key(CAMEL_ZIP_ENTRY_COMPRESSED_SIZE)
551 );
552 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_IS_DIRECTORY));
553 assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_COMPRESSION));
554 }
555
556 #[tokio::test]
557 async fn test_zip_split_empty_zip() {
558 let mut buf = Vec::new();
559 {
560 let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
561 writer.finish().unwrap();
562 }
563 let results = collect_entries(ZipSplitConfig::default(), buf).await;
564 assert!(results.is_empty());
565 }
566
567 #[tokio::test]
568 async fn test_zip_split_duplicate_names_reject() {
569 let files: Vec<(&str, &[u8])> = vec![("a.txt", b"first"), ("b.txt", b"second")];
572 let zip_data = make_zip_with_files(files);
573 let config = ZipSplitConfig {
574 duplicate_names_policy: DuplicatePolicy::Reject,
575 ..Default::default()
576 };
577 let results = collect_entries(config, zip_data).await;
578 assert_eq!(results.len(), 2);
579 assert!(results.iter().all(|r| r.is_ok()));
580 }
581
582 #[tokio::test]
583 async fn test_zip_split_max_per_entry_size_exceeded() {
584 let zip_data = make_zip_with_files(vec![("big.txt", b"x".repeat(200).as_slice())]);
585 let config = ZipSplitConfig {
586 max_per_entry_size: 100,
587 ..Default::default()
588 };
589 let results = collect_entries(config, zip_data).await;
590 let has_error = results.iter().any(|r| r.is_err());
591 assert!(has_error);
592 }
593
594 #[tokio::test]
595 async fn test_zip_split_max_total_decompressed_size_exceeded() {
596 let zip_data =
597 make_zip_with_files(vec![("a.txt", b"aaaaaaaaaa"), ("b.txt", b"bbbbbbbbbb")]);
598 let config = ZipSplitConfig {
599 max_total_decompressed_size: 15,
600 ..Default::default()
601 };
602 let results = collect_entries(config, zip_data).await;
603 let has_error = results.iter().any(|r| r.is_err());
604 assert!(has_error);
605 }
606
607 #[tokio::test]
608 async fn test_zip_split_corrupt_zip() {
609 let results = collect_entries(ZipSplitConfig::default(), b"not a zip file".to_vec()).await;
610 let has_error = results.iter().any(|r| r.is_err());
611 assert!(has_error);
612 }
613
614 #[tokio::test]
615 async fn test_zip_split_backslash_rejected() {
616 let mut buf = Vec::new();
617 {
618 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
619 let options = zip::write::SimpleFileOptions::default();
620 writer.start_file("sub\\file.txt", options).unwrap();
621 writer.write_all(b"oops").unwrap();
622 writer.finish().unwrap();
623 }
624 let results = collect_entries(ZipSplitConfig::default(), buf).await;
625 let has_error = results.iter().any(|r| r.is_err());
626 assert!(has_error);
627 }
628}