Skip to main content

camel_processor/
zip_splitter.rs

1use bytes::Bytes;
2use std::collections::HashSet;
3use std::io::Read;
4use std::path::Path;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use tokio::sync::mpsc;
8
9use camel_api::{Body, CamelError, Exchange, Message, StreamingSplitExpression, Value};
10
11const DEFAULT_MAX_ENTRIES: usize = 10000;
12const DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
13const DEFAULT_MAX_PER_ENTRY_SIZE: u64 = 512 * 1024 * 1024;
14const DEFAULT_MAX_COMPRESSED_SIZE: u64 = 1_073_741_824;
15const DEFAULT_MAX_PATH_LENGTH: usize = 4096;
16const DEFAULT_CHANNEL_CAPACITY: usize = 2;
17
18pub const CAMEL_ZIP_ENTRY_NAME: &str = "CamelZipEntryName";
19pub const CAMEL_ZIP_ENTRY_PATH: &str = "CamelZipEntryPath";
20pub const CAMEL_ZIP_ENTRY_INDEX: &str = "CamelZipEntryIndex";
21pub const CAMEL_ZIP_ENTRY_SIZE: &str = "CamelZipEntrySize";
22pub const CAMEL_ZIP_ENTRY_COMPRESSED_SIZE: &str = "CamelZipEntryCompressedSize";
23pub const CAMEL_ZIP_ENTRY_CRC32: &str = "CamelZipEntryCrc32";
24pub const CAMEL_ZIP_ENTRY_IS_DIRECTORY: &str = "CamelZipEntryIsDirectory";
25pub const CAMEL_ZIP_ENTRY_COMPRESSION: &str = "CamelZipEntryCompression";
26
27#[derive(Debug, Clone)]
28pub enum DuplicatePolicy {
29    AllowWithIndex,
30    Reject,
31}
32
33#[derive(Debug, Clone)]
34pub struct ZipSplitConfig {
35    pub max_entries: usize,
36    pub max_total_decompressed_size: u64,
37    pub max_per_entry_size: u64,
38    pub max_compressed_size: u64,
39    pub max_path_length: usize,
40    pub allow_empty_directories: bool,
41    pub duplicate_names_policy: DuplicatePolicy,
42    pub channel_capacity: usize,
43}
44
45impl Default for ZipSplitConfig {
46    fn default() -> Self {
47        Self {
48            max_entries: DEFAULT_MAX_ENTRIES,
49            max_total_decompressed_size: DEFAULT_MAX_TOTAL_DECOMPRESSED_SIZE,
50            max_per_entry_size: DEFAULT_MAX_PER_ENTRY_SIZE,
51            max_compressed_size: DEFAULT_MAX_COMPRESSED_SIZE,
52            max_path_length: DEFAULT_MAX_PATH_LENGTH,
53            allow_empty_directories: false,
54            duplicate_names_policy: DuplicatePolicy::AllowWithIndex,
55            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
56        }
57    }
58}
59
60fn validate_entry_path(path: &str, max_length: usize) -> Result<String, CamelError> {
61    if path.len() > max_length {
62        return Err(CamelError::TypeConversionFailed(format!(
63            "ZIP entry path exceeds max length: {} > {}",
64            path.len(),
65            max_length
66        )));
67    }
68
69    if path.contains('\0') {
70        return Err(CamelError::TypeConversionFailed(
71            "ZIP entry path contains NUL byte".to_string(),
72        ));
73    }
74
75    if Path::new(path).is_absolute() {
76        return Err(CamelError::TypeConversionFailed(format!(
77            "ZIP entry path is absolute: {path}"
78        )));
79    }
80
81    for component in Path::new(path).components() {
82        if let std::path::Component::ParentDir = component {
83            return Err(CamelError::TypeConversionFailed(format!(
84                "ZIP entry path contains '..' traversal: {path}"
85            )));
86        }
87    }
88
89    if path.contains('\\') {
90        return Err(CamelError::TypeConversionFailed(format!(
91            "ZIP entry path contains backslash: {path}"
92        )));
93    }
94
95    if let Some(c) = path.chars().next()
96        && c.is_ascii_alphabetic()
97        && path.chars().nth(1) == Some(':')
98    {
99        return Err(CamelError::TypeConversionFailed(format!(
100            "ZIP entry path contains Windows drive prefix: {path}"
101        )));
102    }
103
104    Ok(path.to_string())
105}
106
107struct ZipEntryData {
108    index: usize,
109    path: String,
110    size: u64,
111    compressed_size: u64,
112    crc32: Option<u32>,
113    is_dir: bool,
114    compression: String,
115    data: Vec<u8>,
116}
117
118pub fn zip_splitter(config: ZipSplitConfig) -> StreamingSplitExpression {
119    Arc::new(move |exchange: Exchange| {
120        let config = config.clone();
121        Box::pin(async_stream::stream! {
122            let raw = match &exchange.input.body {
123                Body::Bytes(b) => b.to_vec(),
124                Body::Text(s) => s.as_bytes().to_vec(),
125                _ => {
126                    yield Err(CamelError::TypeConversionFailed(
127                        "ZipSplitter requires Body::Bytes or Body::Text".to_string(),
128                    ));
129                    return;
130                }
131            };
132
133            if raw.len() as u64 > config.max_compressed_size {
134                yield Err(CamelError::TypeConversionFailed(format!(
135                    "ZIP compressed size {} exceeds max {}",
136                    raw.len(),
137                    config.max_compressed_size
138                )));
139                return;
140            }
141
142            let (tx, mut rx) = mpsc::channel::<Result<ZipEntryData, CamelError>>(config.channel_capacity);
143
144            let total_decompressed = Arc::new(AtomicU64::new(0));
145            let entry_count = Arc::new(AtomicUsize::new(0));
146            let seen_names: Arc<std::sync::Mutex<HashSet<String>>> = Arc::new(std::sync::Mutex::new(HashSet::new()));
147
148            let max_entries = config.max_entries;
149            let max_per_entry = config.max_per_entry_size;
150            let max_total = config.max_total_decompressed_size;
151            let max_path_len = config.max_path_length;
152            let allow_dirs = config.allow_empty_directories;
153            let dup_policy = config.duplicate_names_policy.clone();
154
155            tokio::task::spawn_blocking(move || {
156                let reader = std::io::Cursor::new(&raw);
157                let mut archive = match zip::ZipArchive::new(reader) {
158                    Ok(a) => a,
159                    Err(e) => {
160                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
161                            format!("Invalid ZIP archive: {e}"),
162                        )));
163                        return;
164                    }
165                };
166
167                for i in 0..archive.len() {
168                    let mut entry = match archive.by_index(i) {
169                        Ok(e) => e,
170                        Err(e) => {
171                            let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
172                                format!("Failed to read ZIP entry {i}: {e}"),
173                            )));
174                            return;
175                        }
176                    };
177
178                    let raw_name = entry.name().to_string();
179                    let is_dir = entry.is_dir();
180
181                    let validated = match validate_entry_path(&raw_name, max_path_len) {
182                        Ok(p) => p,
183                        Err(e) => {
184                            let _ = tx.blocking_send(Err(e));
185                            return;
186                        }
187                    };
188
189                    if is_dir {
190                        if allow_dirs {
191                            let count = entry_count.fetch_add(1, Ordering::SeqCst);
192                            if count >= max_entries {
193                                let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
194                                    format!("ZIP exceeds max entries: {max_entries}"),
195                                )));
196                                return;
197                            }
198                            let _ = tx.blocking_send(Ok(ZipEntryData {
199                                index: count,
200                                path: validated,
201                                size: 0,
202                                compressed_size: entry.compressed_size(),
203                                crc32: Some(entry.crc32()),
204                                is_dir: true,
205                                compression: format!("{:?}", entry.compression()),
206                                data: Vec::new(),
207                            }));
208                        }
209                        continue;
210                    }
211
212                    let compressed_size = entry.compressed_size();
213                    let crc32 = entry.crc32();
214
215                    let mut data = Vec::new();
216                    let mut limited = std::io::Read::take(&mut entry, max_per_entry.saturating_add(1));
217                    if let Err(e) = limited.read_to_end(&mut data) {
218                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
219                            format!("Failed to decompress ZIP entry '{raw_name}': {e}"),
220                        )));
221                        return;
222                    }
223
224                    if data.len() as u64 > max_per_entry {
225                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
226                            format!("ZIP entry '{raw_name}' size {} exceeds max {}", data.len(), max_per_entry),
227                        )));
228                        return;
229                    }
230
231                    let entry_size = data.len() as u64;
232                    let prev_total = total_decompressed.load(Ordering::SeqCst);
233                    let new_total = prev_total.saturating_add(entry_size);
234                    if new_total > max_total {
235                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
236                            format!("ZIP total decompressed size exceeds max {max_total}"),
237                        )));
238                        return;
239                    }
240                    total_decompressed.store(new_total, Ordering::SeqCst);
241
242                    let count = entry_count.fetch_add(1, Ordering::SeqCst);
243                    if count >= max_entries {
244                        let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
245                            format!("ZIP exceeds max entries: {max_entries}"),
246                        )));
247                        return;
248                    }
249
250                    match &dup_policy {
251                        DuplicatePolicy::Reject => {
252                            let mut seen = seen_names.lock().unwrap_or_else(|e| e.into_inner());
253                            if seen.contains(&validated) {
254                                let _ = tx.blocking_send(Err(CamelError::TypeConversionFailed(
255                                    format!("Duplicate ZIP entry name: {validated}"),
256                                )));
257                                return;
258                            }
259                            seen.insert(validated.clone());
260                        }
261                        DuplicatePolicy::AllowWithIndex => {}
262                    }
263
264                    let _ = tx.blocking_send(Ok(ZipEntryData {
265                        index: count,
266                        path: validated,
267                        size: data.len() as u64,
268                        compressed_size,
269                        crc32: Some(crc32),
270                        is_dir: false,
271                        compression: format!("{:?}", entry.compression()),
272                        data,
273                    }));
274                }
275            });
276
277            while let Some(result) = rx.recv().await {
278                match result {
279                    Ok(entry) => {
280                        let ZipEntryData { index, path, size, compressed_size, crc32, is_dir, compression, data } = entry;
281                        let body = if is_dir {
282                            Body::Empty
283                        } else {
284                            Body::Bytes(Bytes::from(data))
285                        };
286                        let msg = Message {
287                            headers: exchange.input.headers.clone(),
288                            body,
289                        };
290                        let mut ex = Exchange::new(msg);
291                        ex.properties = exchange.properties.clone();
292                        ex.pattern = exchange.pattern;
293                        ex.otel_context = exchange.otel_context.clone();
294
295                        let entry_name = Path::new(&path)
296                            .file_name()
297                            .map(|n| n.to_string_lossy().to_string())
298                            .unwrap_or_default();
299
300                        ex.input.headers.insert(
301                            CAMEL_ZIP_ENTRY_NAME.to_string(),
302                            Value::String(entry_name),
303                        );
304                        ex.input.headers.insert(
305                            CAMEL_ZIP_ENTRY_PATH.to_string(),
306                            Value::String(path),
307                        );
308                        ex.input.headers.insert(
309                            CAMEL_ZIP_ENTRY_INDEX.to_string(),
310                            Value::from(index as u64),
311                        );
312                        ex.input.headers.insert(
313                            CAMEL_ZIP_ENTRY_SIZE.to_string(),
314                            Value::from(size),
315                        );
316                        ex.input.headers.insert(
317                            CAMEL_ZIP_ENTRY_COMPRESSED_SIZE.to_string(),
318                            Value::from(compressed_size),
319                        );
320                        if let Some(crc) = crc32 {
321                            ex.input.headers.insert(
322                                CAMEL_ZIP_ENTRY_CRC32.to_string(),
323                                Value::from(crc),
324                            );
325                        }
326                        ex.input.headers.insert(
327                            CAMEL_ZIP_ENTRY_IS_DIRECTORY.to_string(),
328                            Value::Bool(is_dir),
329                        );
330                        ex.input.headers.insert(
331                            CAMEL_ZIP_ENTRY_COMPRESSION.to_string(),
332                            Value::String(compression),
333                        );
334
335                        yield Ok(ex);
336                    }
337                    Err(e) => {
338                        yield Err(e);
339                    }
340                }
341            }
342        })
343    })
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use futures::StreamExt;
350    use std::io::Write;
351
352    fn make_zip_with_files(files: Vec<(&str, &[u8])>) -> Vec<u8> {
353        let mut buf = Vec::new();
354        {
355            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
356            let options = zip::write::SimpleFileOptions::default()
357                .compression_method(zip::CompressionMethod::Deflated);
358            for (name, content) in &files {
359                writer.start_file(*name, options).unwrap();
360                writer.write_all(content).unwrap();
361            }
362            writer.finish().unwrap();
363        }
364        buf
365    }
366
367    fn make_zip_with_dirs(entries: Vec<(&str, bool)>) -> Vec<u8> {
368        let mut buf = Vec::new();
369        {
370            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
371            let options = zip::write::SimpleFileOptions::default();
372            for (name, is_dir) in &entries {
373                if *is_dir {
374                    writer.add_directory(*name, options).unwrap();
375                } else {
376                    writer.start_file(name, options).unwrap();
377                    writer.write_all(b"content").unwrap();
378                }
379            }
380            writer.finish().unwrap();
381        }
382        buf
383    }
384
385    async fn collect_entries(
386        config: ZipSplitConfig,
387        zip_data: Vec<u8>,
388    ) -> Vec<Result<Exchange, CamelError>> {
389        let expr = zip_splitter(config);
390        let exchange = Exchange::new(Message {
391            headers: Default::default(),
392            body: Body::Bytes(Bytes::from(zip_data)),
393        });
394        let stream = expr(exchange);
395        stream.collect().await
396    }
397
398    #[tokio::test]
399    async fn test_zip_split_single_file() {
400        let zip_data = make_zip_with_files(vec![("hello.txt", b"hello world")]);
401        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
402        assert_eq!(results.len(), 1);
403        let ex = results[0].as_ref().unwrap();
404        match &ex.input.body {
405            Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello world"),
406            _ => panic!("expected Body::Bytes"),
407        }
408        assert_eq!(
409            ex.input.headers.get(CAMEL_ZIP_ENTRY_NAME),
410            Some(&Value::String("hello.txt".to_string()))
411        );
412    }
413
414    #[tokio::test]
415    async fn test_zip_split_multiple_files() {
416        let zip_data = make_zip_with_files(vec![("a.txt", b"aaa"), ("b.txt", b"bbb")]);
417        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
418        assert_eq!(results.len(), 2);
419    }
420
421    #[tokio::test]
422    async fn test_zip_split_with_directories() {
423        let zip_data = make_zip_with_dirs(vec![("subdir/", true), ("subdir/file.txt", false)]);
424        let config = ZipSplitConfig {
425            allow_empty_directories: true,
426            ..Default::default()
427        };
428        let results = collect_entries(config, zip_data).await;
429        assert_eq!(results.len(), 2);
430        let dir_ex = results[0].as_ref().unwrap();
431        assert!(dir_ex.input.body.is_empty());
432        assert_eq!(
433            dir_ex.input.headers.get(CAMEL_ZIP_ENTRY_IS_DIRECTORY),
434            Some(&Value::Bool(true))
435        );
436    }
437
438    #[tokio::test]
439    async fn test_zip_split_preserves_paths() {
440        let zip_data = make_zip_with_files(vec![("deep/nested/path/file.txt", b"deep")]);
441        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
442        assert_eq!(results.len(), 1);
443        let ex = results[0].as_ref().unwrap();
444        assert_eq!(
445            ex.input.headers.get(CAMEL_ZIP_ENTRY_PATH),
446            Some(&Value::String("deep/nested/path/file.txt".to_string()))
447        );
448    }
449
450    #[tokio::test]
451    async fn test_zip_split_max_entries_exceeded() {
452        let files: Vec<(String, Vec<u8>)> = (0..5)
453            .map(|i| (format!("f{i}.txt"), b"x".to_vec()))
454            .collect();
455        let zip_data = {
456            let mut buf = Vec::new();
457            {
458                let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
459                let options = zip::write::SimpleFileOptions::default();
460                for (name, content) in &files {
461                    writer.start_file(name, options).unwrap();
462                    writer.write_all(content).unwrap();
463                }
464                writer.finish().unwrap();
465            }
466            buf
467        };
468        let config = ZipSplitConfig {
469            max_entries: 3,
470            ..Default::default()
471        };
472        let results = collect_entries(config, zip_data).await;
473        let has_error = results.iter().any(|r| r.is_err());
474        assert!(has_error);
475    }
476
477    #[tokio::test]
478    async fn test_zip_split_path_traversal_rejected() {
479        let mut buf = Vec::new();
480        {
481            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
482            let options = zip::write::SimpleFileOptions::default();
483            writer.start_file("../etc/passwd", options).unwrap();
484            writer.write_all(b"oops").unwrap();
485            writer.finish().unwrap();
486        }
487        let results = collect_entries(ZipSplitConfig::default(), buf).await;
488        let has_error = results.iter().any(|r| r.is_err());
489        assert!(has_error);
490    }
491
492    #[tokio::test]
493    async fn test_zip_split_headers_set() {
494        let zip_data = make_zip_with_files(vec![("test.txt", b"content")]);
495        let results = collect_entries(ZipSplitConfig::default(), zip_data).await;
496        let ex = results[0].as_ref().unwrap();
497        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_NAME));
498        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_PATH));
499        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_INDEX));
500        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_SIZE));
501        assert!(
502            ex.input
503                .headers
504                .contains_key(CAMEL_ZIP_ENTRY_COMPRESSED_SIZE)
505        );
506        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_IS_DIRECTORY));
507        assert!(ex.input.headers.contains_key(CAMEL_ZIP_ENTRY_COMPRESSION));
508    }
509
510    #[tokio::test]
511    async fn test_zip_split_empty_zip() {
512        let mut buf = Vec::new();
513        {
514            let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
515            writer.finish().unwrap();
516        }
517        let results = collect_entries(ZipSplitConfig::default(), buf).await;
518        assert!(results.is_empty());
519    }
520
521    #[tokio::test]
522    async fn test_zip_split_duplicate_names_reject() {
523        // NOTE: zip crate v2 prevents creating archives with duplicate entry names,
524        // so this test validates that the Reject policy works correctly with unique names.
525        let files: Vec<(&str, &[u8])> = vec![("a.txt", b"first"), ("b.txt", b"second")];
526        let zip_data = make_zip_with_files(files);
527        let config = ZipSplitConfig {
528            duplicate_names_policy: DuplicatePolicy::Reject,
529            ..Default::default()
530        };
531        let results = collect_entries(config, zip_data).await;
532        assert_eq!(results.len(), 2);
533        assert!(results.iter().all(|r| r.is_ok()));
534    }
535
536    #[tokio::test]
537    async fn test_zip_split_max_per_entry_size_exceeded() {
538        let zip_data = make_zip_with_files(vec![("big.txt", b"x".repeat(200).as_slice())]);
539        let config = ZipSplitConfig {
540            max_per_entry_size: 100,
541            ..Default::default()
542        };
543        let results = collect_entries(config, zip_data).await;
544        let has_error = results.iter().any(|r| r.is_err());
545        assert!(has_error);
546    }
547
548    #[tokio::test]
549    async fn test_zip_split_max_total_decompressed_size_exceeded() {
550        let zip_data =
551            make_zip_with_files(vec![("a.txt", b"aaaaaaaaaa"), ("b.txt", b"bbbbbbbbbb")]);
552        let config = ZipSplitConfig {
553            max_total_decompressed_size: 15,
554            ..Default::default()
555        };
556        let results = collect_entries(config, zip_data).await;
557        let has_error = results.iter().any(|r| r.is_err());
558        assert!(has_error);
559    }
560
561    #[tokio::test]
562    async fn test_zip_split_corrupt_zip() {
563        let results = collect_entries(ZipSplitConfig::default(), b"not a zip file".to_vec()).await;
564        let has_error = results.iter().any(|r| r.is_err());
565        assert!(has_error);
566    }
567
568    #[tokio::test]
569    async fn test_zip_split_backslash_rejected() {
570        let mut buf = Vec::new();
571        {
572            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
573            let options = zip::write::SimpleFileOptions::default();
574            writer.start_file("sub\\file.txt", options).unwrap();
575            writer.write_all(b"oops").unwrap();
576            writer.finish().unwrap();
577        }
578        let results = collect_entries(ZipSplitConfig::default(), buf).await;
579        let has_error = results.iter().any(|r| r.is_err());
580        assert!(has_error);
581    }
582}