Skip to main content

camel_processor/
zip_splitter.rs

1use bytes::Bytes;
2use std::collections::HashSet;
3use std::io::Read;
4use std::path::Path;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
8use tokio::sync::mpsc;
9
10use camel_api::{Body, CamelError, Exchange, Message, StreamingSplitExpression, Value};
11use futures::Stream;
12
13const DEFAULT_MAX_ENTRIES: usize = 10000;
14const DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
15const DEFAULT_MAX_PER_ENTRY_SIZE: u64 = 512 * 1024 * 1024;
16const DEFAULT_MAX_COMPRESSED_SIZE: u64 = 1_073_741_824;
17const DEFAULT_MAX_PATH_LENGTH: usize = 4096;
18const DEFAULT_CHANNEL_CAPACITY: usize = 2;
19
20pub const CAMEL_ZIP_ENTRY_NAME: &str = "CamelZipEntryName";
21pub const CAMEL_ZIP_ENTRY_PATH: &str = "CamelZipEntryPath";
22pub const CAMEL_ZIP_ENTRY_INDEX: &str = "CamelZipEntryIndex";
23pub const CAMEL_ZIP_ENTRY_SIZE: &str = "CamelZipEntrySize";
24pub const CAMEL_ZIP_ENTRY_COMPRESSED_SIZE: &str = "CamelZipEntryCompressedSize";
25pub const CAMEL_ZIP_ENTRY_CRC32: &str = "CamelZipEntryCrc32";
26pub const CAMEL_ZIP_ENTRY_IS_DIRECTORY: &str = "CamelZipEntryIsDirectory";
27pub const CAMEL_ZIP_ENTRY_COMPRESSION: &str = "CamelZipEntryCompression";
28
29#[derive(Debug, Clone)]
30pub enum DuplicatePolicy {
31    AllowWithIndex,
32    Reject,
33}
34
35#[derive(Debug, Clone)]
36pub struct ZipSplitConfig {
37    pub max_entries: usize,
38    pub max_total_decompressed_size: u64,
39    pub max_per_entry_size: u64,
40    pub max_compressed_size: u64,
41    pub max_path_length: usize,
42    pub allow_empty_directories: bool,
43    pub duplicate_names_policy: DuplicatePolicy,
44    pub channel_capacity: usize,
45}
46
47impl Default for ZipSplitConfig {
48    fn default() -> Self {
49        Self {
50            max_entries: DEFAULT_MAX_ENTRIES,
51            max_total_decompressed_size: DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE,
52            max_per_entry_size: DEFAULT_MAX_PER_ENTRY_SIZE,
53            max_compressed_size: DEFAULT_MAX_COMPRESSED_SIZE,
54            max_path_length: DEFAULT_MAX_PATH_LENGTH,
55            allow_empty_directories: false,
56            duplicate_names_policy: DuplicatePolicy::AllowWithIndex,
57            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
58        }
59    }
60}
61
62fn validate_entry_path(path: &str, max_length: usize) -> Result<String, CamelError> {
63    if path.len() > max_length {
64        return Err(CamelError::TypeConversionFailed(format!(
65            "ZIP entry path exceeds max length: {} > {}",
66            path.len(),
67            max_length
68        )));
69    }
70
71    if path.contains('\0') {
72        return Err(CamelError::TypeConversionFailed(
73            "ZIP entry path contains NUL byte".to_string(),
74        ));
75    }
76
77    if Path::new(path).is_absolute() {
78        return Err(CamelError::TypeConversionFailed(format!(
79            "ZIP entry path is absolute: {path}"
80        )));
81    }
82
83    for component in Path::new(path).components() {
84        if let std::path::Component::ParentDir = component {
85            return Err(CamelError::TypeConversionFailed(format!(
86                "ZIP entry path contains '..' traversal: {path}"
87            )));
88        }
89    }
90
91    if path.contains('\\') {
92        return Err(CamelError::TypeConversionFailed(format!(
93            "ZIP entry path contains backslash: {path}"
94        )));
95    }
96
97    if let Some(c) = path.chars().next()
98        && c.is_ascii_alphabetic()
99        && path.chars().nth(1) == Some(':')
100    {
101        return Err(CamelError::TypeConversionFailed(format!(
102            "ZIP entry path contains Windows drive prefix: {path}"
103        )));
104    }
105
106    Ok(path.to_string())
107}
108
109struct ZipEntryData {
110    index: usize,
111    path: String,
112    size: u64,
113    compressed_size: u64,
114    crc32: Option<u32>,
115    is_dir: bool,
116    compression: String,
117    data: Vec<u8>,
118}
119
120/// Split a ZIP archive's bytes into a stream of Exchanges, one per entry.
121///
122/// Takes owned `Bytes` (for `'static` lifetime), a parent `Exchange` whose headers
123/// and properties are cloned into each entry's exchange, and a `ZipSplitConfig`
124/// controlling limits and policy.
125///
126/// This is the core extraction — callers such as `zip_splitter()` or `camel-core`
127/// component code can invoke it directly with already-acquired bytes.
128pub fn split_zip_bytes(
129    parent: Exchange,
130    bytes: Bytes,
131    config: ZipSplitConfig,
132) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
133    Box::pin(async_stream::stream! {
134        if config.channel_capacity == 0 {
135            yield Err(CamelError::Config(
136                "ZipSplitConfig.channel_capacity must be > 0".into(),
137            ));
138            return;
139        }
140
141        if bytes.len() as u64 > config.max_compressed_size {
142            yield Err(CamelError::TypeConversionFailed(format!(
143                "ZIP compressed size {} exceeds max {}",
144                bytes.len(),
145                config.max_compressed_size
146            )));
147            return;
148        }
149
150        let (tx, mut rx) = mpsc::channel::<Result<ZipEntryData, CamelError>>(config.channel_capacity);
151
152        let total_decompressed = Arc::new(AtomicU64::new(0));
153        let entry_count = Arc::new(AtomicUsize::new(0));
154        let seen_names: Arc<std::sync::Mutex<HashSet<String>>> =
155            Arc::new(std::sync::Mutex::new(HashSet::new()));
156
157        let max_entries = config.max_entries;
158        let max_per_entry = config.max_per_entry_size;
159        let max_total = config.max_total_decompressed_size;
160        let max_path_len = config.max_path_length;
161        let allow_dirs = config.allow_empty_directories;
162        let dup_policy = config.duplicate_names_policy.clone();
163
164        tokio::task::spawn_blocking(move || {
165            let reader = std::io::Cursor::new(bytes);
166            let mut archive = match zip::ZipArchive::new(reader) {
167                Ok(a) => a,
168                Err(e) => {
169                    let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
170                        format!("Invalid ZIP archive: {e}"),
171                    )));
172                    return;
173                }
174            };
175
176            for i in 0..archive.len() {
177                let mut entry = match archive.by_index(i) {
178                    Ok(e) => e,
179                    Err(e) => {
180                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
181                            format!("Failed to read ZIP entry {i}: {e}"),
182                        )));
183                        return;
184                    }
185                };
186
187                let raw_name = entry.name().to_string();
188                let is_dir = entry.is_dir();
189
190                let validated = match validate_entry_path(&raw_name, max_path_len) {
191                    Ok(p) => p,
192                    Err(e) => {
193                        let _ = tx.blocking_send(Err(e));
194                        return;
195                    }
196                };
197
198                if is_dir {
199                    if allow_dirs {
200                        let count = entry_count.fetch_add(1, Ordering::SeqCst);
201                        if count >= max_entries {
202                            let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
203                                format!("ZIP exceeds max entries: {max_entries}"),
204                            )));
205                            return;
206                        }
207                        if tx.blocking_send(Ok(ZipEntryData {
208                            index: count,
209                            path: validated,
210                            size: 0,
211                            compressed_size: entry.compressed_size(),
212                            crc32: Some(entry.crc32()),
213                            is_dir: true,
214                            compression: format!("{:?}", entry.compression()),
215                            data: Vec::new(),
216                        }))
217                        .is_err()
218                        {
219                            return;
220                        }
221                    }
222                    continue;
223                }
224
225                let compressed_size = entry.compressed_size();
226                let crc32 = entry.crc32();
227
228                let mut data = Vec::new();
229                let mut limited =
230                    std::io::Read::take(&mut entry, max_per_entry.saturating_add(1));
231                if let Err(e) = limited.read_to_end(&mut data) {
232                    let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
233                        format!("Failed to decompress ZIP entry '{raw_name}': {e}"),
234                    )));
235                    return;
236                }
237
238                if data.len() as u64 > max_per_entry {
239                    let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
240                        format!(
241                            "ZIP entry '{raw_name}' size {} exceeds max {}",
242                            data.len(),
243                            max_per_entry
244                        ),
245                    )));
246                    return;
247                }
248
249                let entry_size = data.len() as u64;
250                let prev_total = total_decompressed.load(Ordering::SeqCst);
251                let new_total = prev_total.saturating_add(entry_size);
252                if new_total > max_total {
253                    let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
254                        format!("ZIP total decompressed size exceeds max {max_total}"),
255                    )));
256                    return;
257                }
258                total_decompressed.store(new_total, Ordering::SeqCst);
259
260                let count = entry_count.fetch_add(1, Ordering::SeqCst);
261                if count >= max_entries {
262                    let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
263                        format!("ZIP exceeds max entries: {max_entries}"),
264                    )));
265                    return;
266                }
267
268                match &dup_policy {
269                    DuplicatePolicy::Reject => {
270                        let mut seen = seen_names.lock().unwrap_or_else(|e| e.into_inner());
271                        if seen.contains(&validated) {
272                            let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
273                                format!("Duplicate ZIP entry name: {validated}"),
274                            )));
275                            return;
276                        }
277                        seen.insert(validated.clone());
278                    }
279                    DuplicatePolicy::AllowWithIndex => {}
280                }
281
282                if tx
283                    .blocking_send(Ok(ZipEntryData {
284                        index: count,
285                        path: validated,
286                        size: data.len() as u64,
287                        compressed_size,
288                        crc32: Some(crc32),
289                        is_dir: false,
290                        compression: format!("{:?}", entry.compression()),
291                        data,
292                    }))
293                    .is_err()
294                {
295                    return;
296                }
297            }
298        });
299
300        while let Some(result) = rx.recv().await {
301            match result {
302                Ok(entry) => {
303                    let ZipEntryData {
304                        index,
305                        path,
306                        size,
307                        compressed_size,
308                        crc32,
309                        is_dir,
310                        compression,
311                        data,
312                    } = entry;
313                    let body = if is_dir {
314                        Body::Empty
315                    } else {
316                        Body::Bytes(Bytes::from(data))
317                    };
318                    let msg = Message {
319                        headers: parent.input.headers.clone(),
320                        body,
321                    };
322                    let mut ex = Exchange::new(msg);
323                    // Strip parent-level content headers that are stale for individual ZIP entries
324                    ex.input.headers.remove("Content-Length");
325                    ex.input.headers.remove("Content-Type");
326                    ex.properties = parent.properties.clone();
327                    ex.pattern = parent.pattern;
328                    ex.otel_context = parent.otel_context.clone();
329
330                    let entry_name = Path::new(&path)
331                        .file_name()
332                        .map(|n| n.to_string_lossy().to_string())
333                        .unwrap_or_default();
334
335                    ex.input.headers.insert(
336                        CAMEL_ZIP_ENTRY_NAME.to_string(),
337                        Value::String(entry_name),
338                    );
339                    ex.input.headers.insert(
340                        CAMEL_ZIP_ENTRY_PATH.to_string(),
341                        Value::String(path),
342                    );
343                    ex.input.headers.insert(
344                        CAMEL_ZIP_ENTRY_INDEX.to_string(),
345                        Value::from(index as u64),
346                    );
347                    ex.input.headers
348                        .insert(CAMEL_ZIP_ENTRY_SIZE.to_string(), Value::from(size));
349                    ex.input.headers.insert(
350                        CAMEL_ZIP_ENTRY_COMPRESSED_SIZE.to_string(),
351                        Value::from(compressed_size),
352                    );
353                    if let Some(crc) = crc32 {
354                        ex.input
355                            .headers
356                            .insert(CAMEL_ZIP_ENTRY_CRC32.to_string(), Value::from(crc));
357                    }
358                    ex.input.headers.insert(
359                        CAMEL_ZIP_ENTRY_IS_DIRECTORY.to_string(),
360                        Value::Bool(is_dir),
361                    );
362                    ex.input.headers.insert(
363                        CAMEL_ZIP_ENTRY_COMPRESSION.to_string(),
364                        Value::String(compression),
365                    );
366
367                    yield Ok(ex);
368                }
369                Err(e) => {
370                    yield Err(e);
371                }
372            }
373        }
374    })
375}
376
377pub fn zip_splitter(config: ZipSplitConfig) -> StreamingSplitExpression {
378    Arc::new(move |exchange: Exchange| {
379        let config = config.clone();
380        match exchange.input.body.clone() {
381            Body::Bytes(b) => split_zip_bytes(exchange, b, config),
382            Body::Text(s) => split_zip_bytes(exchange, Bytes::from(s.as_bytes().to_vec()), config),
383            _ => Box::pin(async_stream::stream! {
384                yield Err(CamelError::TypeConversionFailed(
385                    "ZipSplitter requires Body::Bytes or Body::Text".to_string(),
386                ));
387            }),
388        }
389    })
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use futures::StreamExt;
396    use std::io::Write;
397
398    fn make_zip_with_files(files: Vec<(&str, &[u8])>) -> Vec<u8> {
399        let mut buf = Vec::new();
400        {
401            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
402            let options = zip::write::SimpleFileOptions::default()
403                .compression_method(zip::CompressionMethod::Deflated);
404            for (name, content) in &files {
405                writer.start_file(*name, options).unwrap();
406                writer.write_all(content).unwrap();
407            }
408            writer.finish().unwrap();
409        }
410        buf
411    }
412
413    fn make_zip_with_dirs(entries: Vec<(&str, bool)>) -> Vec<u8> {
414        let mut buf = Vec::new();
415        {
416            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
417            let options = zip::write::SimpleFileOptions::default();
418            for (name, is_dir) in &entries {
419                if *is_dir {
420                    writer.add_directory(*name, options).unwrap();
421                } else {
422                    writer.start_file(name, options).unwrap();
423                    writer.write_all(b"content").unwrap();
424                }
425            }
426            writer.finish().unwrap();
427        }
428        buf
429    }
430
431    async fn collect_entries(
432        config: ZipSplitConfig,
433        zip_data: Vec<u8>,
434    ) -> Vec<Result<Exchange, CamelError>> {
435        let expr = zip_splitter(config);
436        let exchange = Exchange::new(Message {
437            headers: Default::default(),
438            body: Body::Bytes(Bytes::from(zip_data)),
439        });
440        let stream = expr(exchange);
441        stream.collect().await
442    }
443
444    #[tokio::test]
445    async fn test_zip_split_single_file() {
446        let zip_data = make_zip_with_files(vec![("hello.txt", b"hello world")]);
447        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
448        assert_eq!(results.len(), 1);
449        let ex = results[0].as_ref().unwrap();
450        match &ex.input.body {
451            Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello world"),
452            _ => panic!("expected Body::Bytes"),
453        }
454        assert_eq!(
455            ex.input.headers.get(CAMEL_ZIP_ENTRY_NAME),
456            Some(&Value::String("hello.txt".to_string()))
457        );
458    }
459
460    #[tokio::test]
461    async fn test_zip_split_multiple_files() {
462        let zip_data = make_zip_with_files(vec![("a.txt", b"aaa"), ("b.txt", b"bbb")]);
463        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
464        assert_eq!(results.len(), 2);
465    }
466
467    #[tokio::test]
468    async fn test_zip_split_with_directories() {
469        let zip_data = make_zip_with_dirs(vec![("subdir/", true), ("subdir/file.txt", false)]);
470        let config = ZipSplitConfig {
471            allow_empty_directories: true,
472            ..Default::default()
473        };
474        let results = collect_entries(config, zip_data).await;
475        assert_eq!(results.len(), 2);
476        let dir_ex = results[0].as_ref().unwrap();
477        assert!(dir_ex.input.body.is_empty());
478        assert_eq!(
479            dir_ex.input.headers.get(CAMEL_ZIP_ENTRY_IS_DIRECTORY),
480            Some(&Value::Bool(true))
481        );
482    }
483
484    #[tokio::test]
485    async fn test_zip_split_preserves_paths() {
486        let zip_data = make_zip_with_files(vec![("deep/nested/path/file.txt", b"deep")]);
487        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
488        assert_eq!(results.len(), 1);
489        let ex = results[0].as_ref().unwrap();
490        assert_eq!(
491            ex.input.headers.get(CAMEL_ZIP_ENTRY_PATH),
492            Some(&Value::String("deep/nested/path/file.txt".to_string()))
493        );
494    }
495
496    #[tokio::test]
497    async fn test_zip_split_max_entries_exceeded() {
498        let files: Vec<(String, Vec<u8>)> = (0..5)
499            .map(|i| (format!("f{i}.txt"), b"x".to_vec()))
500            .collect();
501        let zip_data = {
502            let mut buf = Vec::new();
503            {
504                let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
505                let options = zip::write::SimpleFileOptions::default();
506                for (name, content) in &files {
507                    writer.start_file(name, options).unwrap();
508                    writer.write_all(content).unwrap();
509                }
510                writer.finish().unwrap();
511            }
512            buf
513        };
514        let config = ZipSplitConfig {
515            max_entries: 3,
516            ..Default::default()
517        };
518        let results = collect_entries(config, zip_data).await;
519        let has_error = results.iter().any(|r| r.is_err());
520        assert!(has_error);
521    }
522
523    #[tokio::test]
524    async fn test_zip_split_path_traversal_rejected() {
525        let mut buf = Vec::new();
526        {
527            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
528            let options = zip::write::SimpleFileOptions::default();
529            writer.start_file("../etc/passwd", options).unwrap();
530            writer.write_all(b"oops").unwrap();
531            writer.finish().unwrap();
532        }
533        let results = collect_entries(ZipSplitConfig::default(), buf).await;
534        let has_error = results.iter().any(|r| r.is_err());
535        assert!(has_error);
536    }
537
538    #[tokio::test]
539    async fn test_zip_split_headers_set() {
540        let zip_data = make_zip_with_files(vec![("test.txt", b"content")]);
541        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
542        let ex = results[0].as_ref().unwrap();
543        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_NAME));
544        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_PATH));
545        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_INDEX));
546        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_SIZE));
547        assert!(
548            ex.input
549                .headers
550                .contains_key(CAMEL_ZIP_ENTRY_COMPRESSED_SIZE)
551        );
552        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_IS_DIRECTORY));
553        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_COMPRESSION));
554    }
555
556    #[tokio::test]
557    async fn test_zip_split_empty_zip() {
558        let mut buf = Vec::new();
559        {
560            let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
561            writer.finish().unwrap();
562        }
563        let results = collect_entries(ZipSplitConfig::default(), buf).await;
564        assert!(results.is_empty());
565    }
566
567    #[tokio::test]
568    async fn test_zip_split_duplicate_names_reject() {
569        // NOTE: zip crate v2 prevents creating archives with duplicate entry names,
570        // so this test validates that the Reject policy works correctly with unique names.
571        let files: Vec<(&str, &[u8])> = vec![("a.txt", b"first"), ("b.txt", b"second")];
572        let zip_data = make_zip_with_files(files);
573        let config = ZipSplitConfig {
574            duplicate_names_policy: DuplicatePolicy::Reject,
575            ..Default::default()
576        };
577        let results = collect_entries(config, zip_data).await;
578        assert_eq!(results.len(), 2);
579        assert!(results.iter().all(|r| r.is_ok()));
580    }
581
582    #[tokio::test]
583    async fn test_zip_split_max_per_entry_size_exceeded() {
584        let zip_data = make_zip_with_files(vec![("big.txt", b"x".repeat(200).as_slice())]);
585        let config = ZipSplitConfig {
586            max_per_entry_size: 100,
587            ..Default::default()
588        };
589        let results = collect_entries(config, zip_data).await;
590        let has_error = results.iter().any(|r| r.is_err());
591        assert!(has_error);
592    }
593
594    #[tokio::test]
595    async fn test_zip_split_max_total_decompressed_size_exceeded() {
596        let zip_data =
597            make_zip_with_files(vec![("a.txt", b"aaaaaaaaaa"), ("b.txt", b"bbbbbbbbbb")]);
598        let config = ZipSplitConfig {
599            max_total_decompressed_size: 15,
600            ..Default::default()
601        };
602        let results = collect_entries(config, zip_data).await;
603        let has_error = results.iter().any(|r| r.is_err());
604        assert!(has_error);
605    }
606
607    #[tokio::test]
608    async fn test_zip_split_corrupt_zip() {
609        let results = collect_entries(ZipSplitConfig::default(), b"not a zip file".to_vec()).await;
610        let has_error = results.iter().any(|r| r.is_err());
611        assert!(has_error);
612    }
613
614    #[tokio::test]
615    async fn test_zip_split_backslash_rejected() {
616        let mut buf = Vec::new();
617        {
618            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
619            let options = zip::write::SimpleFileOptions::default();
620            writer.start_file("sub\\file.txt", options).unwrap();
621            writer.write_all(b"oops").unwrap();
622            writer.finish().unwrap();
623        }
624        let results = collect_entries(ZipSplitConfig::default(), buf).await;
625        let has_error = results.iter().any(|r| r.is_err());
626        assert!(has_error);
627    }
628}