Skip to main content

camel_processor/data_format/
zip.rs

1use bytes::Bytes;
2use camel_api::body::Body;
3use camel_api::data_format::DataFormat;
4use camel_api::error::CamelError;
5use std::io::Read;
6use std::io::Write;
7use zip::ZipArchive;
8
9const DEFAULT_MAX_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
10const ENTRY_NAME: &str = "payload";
11
12#[derive(Debug, Clone)]
13pub struct ZipConfig {
14    pub max_decompressed_size: u64,
15    pub compression_level: Option<i32>,
16    pub allow_multi_entry: bool,
17}
18
19impl Default for ZipConfig {
20    fn default() -> Self {
21        Self {
22            max_decompressed_size: DEFAULT_MAX_DECOMPRESSED_SIZE,
23            compression_level: None,
24            allow_multi_entry: false,
25        }
26    }
27}
28
29#[derive(Debug, Clone, Default)]
30pub struct ZipDataFormat {
31    config: ZipConfig,
32}
33
34impl ZipDataFormat {
35    pub fn new(config: ZipConfig) -> Self {
36        Self { config }
37    }
38}
39
40impl DataFormat for ZipDataFormat {
41    fn name(&self) -> &str {
42        "zip"
43    }
44
45    fn marshal(&self, body: Body) -> Result<Body, CamelError> {
46        let content: Vec<u8> = match &body {
47            Body::Text(s) => s.as_bytes().to_vec(),
48            Body::Json(v) => serde_json::to_vec(v).map_err(|e| {
49                CamelError::TypeConversionFailed(format!(
50                    "ZipDataFormat::marshal cannot serialize JSON: {e}"
51                ))
52            })?,
53            Body::Bytes(b) => b.to_vec(),
54            Body::Xml(s) => s.as_bytes().to_vec(),
55            Body::Empty => {
56                return Err(CamelError::TypeConversionFailed(
57                    "ZipDataFormat::marshal requires non-empty body".to_string(),
58                ));
59            }
60            Body::Stream(_) => {
61                return Err(CamelError::TypeConversionFailed(
62                    "cannot marshal Body::Stream — add 'stream_cache' or 'convert_body_to' before this step".to_string(),
63                ));
64            }
65        };
66
67        let mut buf = Vec::new();
68        {
69            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
70            let mut options = zip::write::SimpleFileOptions::default()
71                .compression_method(zip::CompressionMethod::Deflated);
72            if let Some(level) = self.config.compression_level {
73                if !(0..=9).contains(&level) {
74                    return Err(CamelError::TypeConversionFailed(format!(
75                        "ZipDataFormat::marshal compression_level must be 0-9, got {level}"
76                    )));
77                }
78                options = options.compression_level(Some(level as i64));
79            }
80            writer.start_file(ENTRY_NAME, options).map_err(|e| {
81                CamelError::TypeConversionFailed(format!(
82                    "ZipDataFormat::marshal failed to start entry: {e}"
83                ))
84            })?;
85            writer.write_all(&content).map_err(|e| {
86                CamelError::TypeConversionFailed(format!(
87                    "ZipDataFormat::marshal failed to write entry: {e}"
88                ))
89            })?;
90            writer.finish().map_err(|e| {
91                CamelError::TypeConversionFailed(format!(
92                    "ZipDataFormat::marshal failed to finalize archive: {e}"
93                ))
94            })?;
95        }
96
97        Ok(Body::Bytes(Bytes::from(buf)))
98    }
99
100    fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
101        let raw: Vec<u8> = match &body {
102            Body::Bytes(b) => b.to_vec(),
103            Body::Text(s) => s.as_bytes().to_vec(),
104            Body::Empty => {
105                return Err(CamelError::TypeConversionFailed(
106                    "ZipDataFormat::unmarshal requires non-empty body".to_string(),
107                ));
108            }
109            Body::Stream(_) => {
110                return Err(CamelError::TypeConversionFailed(
111                    "cannot unmarshal Body::Stream — use UnmarshalService which auto-materializes"
112                        .to_string(),
113                ));
114            }
115            Body::Json(_) | Body::Xml(_) => {
116                return Err(CamelError::TypeConversionFailed(
117                    "ZipDataFormat::unmarshal only supports Body::Bytes and Body::Text (ZIP data)"
118                        .to_string(),
119                ));
120            }
121        };
122
123        let reader = std::io::Cursor::new(&raw);
124        let mut archive = ZipArchive::new(reader).map_err(|e| {
125            CamelError::TypeConversionFailed(format!("ZipDataFormat::unmarshal invalid ZIP: {e}"))
126        })?;
127
128        if archive.is_empty() {
129            return Err(CamelError::TypeConversionFailed(
130                "ZipDataFormat::unmarshal ZIP archive has no entries".to_string(),
131            ));
132        }
133
134        if archive.len() > 1 && !self.config.allow_multi_entry {
135            return Err(CamelError::TypeConversionFailed(format!(
136                "ZipDataFormat::unmarshal ZIP has {} entries but allow_multi_entry is false",
137                archive.len()
138            )));
139        }
140
141        if archive.len() > 1 {
142            tracing::warn!(
143                entries = archive.len(),
144                "ZIP archive has multiple entries, extracting first only"
145            );
146        }
147
148        let mut entry = archive.by_index(0).map_err(|e| {
149            CamelError::TypeConversionFailed(format!(
150                "ZipDataFormat::unmarshal failed to read entry: {e}"
151            ))
152        })?;
153
154        let mut decompressed = Vec::new();
155        let limit = self.config.max_decompressed_size.saturating_add(1);
156        let mut limited = std::io::Read::take(&mut entry, limit);
157        limited.read_to_end(&mut decompressed).map_err(|e| {
158            CamelError::TypeConversionFailed(format!(
159                "ZipDataFormat::unmarshal failed to decompress: {e}"
160            ))
161        })?;
162
163        if decompressed.len() as u64 > self.config.max_decompressed_size {
164            return Err(CamelError::TypeConversionFailed(format!(
165                "ZipDataFormat::unmarshal decompressed size exceeds max {}",
166                self.config.max_decompressed_size
167            )));
168        }
169
170        Ok(Body::Bytes(Bytes::from(decompressed)))
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use bytes::Bytes;
178    use serde_json::json;
179    use std::io::Cursor;
180    use std::io::Read;
181    use zip::ZipArchive;
182
183    fn extract_single_entry(zip_bytes: &[u8]) -> Vec<u8> {
184        let reader = Cursor::new(zip_bytes);
185        let mut archive = ZipArchive::new(reader).unwrap();
186        let mut entry = archive.by_index(0).unwrap();
187        let name = entry.name().to_string();
188        assert_eq!(name, "payload");
189        let mut buf = Vec::new();
190        entry.read_to_end(&mut buf).unwrap();
191        buf
192    }
193
194    #[test]
195    fn test_name() {
196        let df = ZipDataFormat::default();
197        assert_eq!(df.name(), "zip");
198    }
199
200    #[test]
201    fn test_marshal_text_to_zip() {
202        let df = ZipDataFormat::default();
203        let body = Body::Text("hello world".to_string());
204        let result = df.marshal(body).unwrap();
205        match result {
206            Body::Bytes(b) => {
207                let decompressed = extract_single_entry(&b);
208                assert_eq!(decompressed, b"hello world");
209            }
210            _ => panic!("expected Body::Bytes"),
211        }
212    }
213
214    #[test]
215    fn test_marshal_json_to_zip() {
216        let df = ZipDataFormat::default();
217        let body = Body::Json(json!({"key": "value"}));
218        let result = df.marshal(body).unwrap();
219        match result {
220            Body::Bytes(b) => {
221                let decompressed = extract_single_entry(&b);
222                let original = serde_json::to_vec(&json!({"key": "value"})).unwrap();
223                assert_eq!(decompressed, original);
224            }
225            _ => panic!("expected Body::Bytes"),
226        }
227    }
228
229    #[test]
230    fn test_marshal_bytes_to_zip() {
231        let df = ZipDataFormat::default();
232        let body = Body::Bytes(Bytes::from_static(b"raw bytes"));
233        let result = df.marshal(body).unwrap();
234        match result {
235            Body::Bytes(b) => {
236                let decompressed = extract_single_entry(&b);
237                assert_eq!(decompressed, b"raw bytes");
238            }
239            _ => panic!("expected Body::Bytes"),
240        }
241    }
242
243    #[test]
244    fn test_marshal_xml_to_zip() {
245        let df = ZipDataFormat::default();
246        let body = Body::Xml("<root><item>1</item></root>".to_string());
247        let result = df.marshal(body).unwrap();
248        match result {
249            Body::Bytes(b) => {
250                let decompressed = extract_single_entry(&b);
251                assert_eq!(decompressed, b"<root><item>1</item></root>");
252            }
253            _ => panic!("expected Body::Bytes"),
254        }
255    }
256
257    #[test]
258    fn test_marshal_empty_error() {
259        let df = ZipDataFormat::default();
260        let result = df.marshal(Body::Empty);
261        assert!(result.is_err());
262    }
263
264    #[test]
265    fn test_marshal_stream_error() {
266        use camel_api::body::{StreamBody, StreamMetadata};
267        use futures::stream;
268        use std::sync::Arc;
269        use tokio::sync::Mutex;
270
271        let stream = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
272        let body = Body::Stream(StreamBody {
273            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
274            metadata: StreamMetadata::default(),
275        });
276        let df = ZipDataFormat::default();
277        let result = df.marshal(body);
278        assert!(result.is_err());
279    }
280
281    fn make_zip(content: &[u8]) -> Vec<u8> {
282        let mut buf = Vec::new();
283        {
284            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
285            let options = zip::write::SimpleFileOptions::default()
286                .compression_method(zip::CompressionMethod::Deflated);
287            writer.start_file("payload", options).unwrap();
288            writer.write_all(content).unwrap();
289            writer.finish().unwrap();
290        }
291        buf
292    }
293
294    #[test]
295    fn test_unmarshal_zip_bytes() {
296        let df = ZipDataFormat::default();
297        let zip_data = make_zip(b"decompressed content");
298        let body = Body::Bytes(Bytes::from(zip_data));
299        let result = df.unmarshal(body).unwrap();
300        match result {
301            Body::Bytes(b) => assert_eq!(b.as_ref(), b"decompressed content"),
302            _ => panic!("expected Body::Bytes"),
303        }
304    }
305
306    #[test]
307    fn test_unmarshal_zip_text() {
308        let df = ZipDataFormat::default();
309        let content = b"text from text body";
310        let zip_data = make_zip(content);
311        let text_body = unsafe { String::from_utf8_unchecked(zip_data) };
312        let body = Body::Text(text_body);
313        let result = df.unmarshal(body).unwrap();
314        match result {
315            Body::Bytes(b) => assert_eq!(b.as_ref(), content),
316            _ => panic!("expected Body::Bytes"),
317        }
318    }
319
320    #[test]
321    fn test_unmarshal_invalid_zip_error() {
322        let df = ZipDataFormat::default();
323        let body = Body::Bytes(Bytes::from_static(b"not a zip file"));
324        let result = df.unmarshal(body);
325        assert!(result.is_err());
326    }
327
328    #[test]
329    fn test_unmarshal_empty_zip_error() {
330        let mut buf = Vec::new();
331        {
332            let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
333            writer.finish().unwrap();
334        }
335        let df = ZipDataFormat::default();
336        let body = Body::Bytes(Bytes::from(buf));
337        let result = df.unmarshal(body);
338        assert!(result.is_err());
339    }
340
341    #[test]
342    fn test_unmarshal_json_error() {
343        let df = ZipDataFormat::default();
344        let body = Body::Json(json!({"not": "zip"}));
345        let result = df.unmarshal(body);
346        assert!(result.is_err());
347    }
348
349    #[test]
350    fn test_unmarshal_xml_error() {
351        let df = ZipDataFormat::default();
352        let body = Body::Xml("<root/>".to_string());
353        let result = df.unmarshal(body);
354        assert!(result.is_err());
355    }
356
357    #[test]
358    fn test_unmarshal_multi_entry_error() {
359        let mut buf = Vec::new();
360        {
361            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
362            let options = zip::write::SimpleFileOptions::default();
363            writer.start_file("file1.txt", options).unwrap();
364            writer.write_all(b"one").unwrap();
365            writer.start_file("file2.txt", options).unwrap();
366            writer.write_all(b"two").unwrap();
367            writer.finish().unwrap();
368        }
369        let df = ZipDataFormat::default();
370        let body = Body::Bytes(Bytes::from(buf));
371        let result = df.unmarshal(body);
372        assert!(result.is_err());
373    }
374
375    #[test]
376    fn test_unmarshal_multi_entry_allowed() {
377        let mut buf = Vec::new();
378        {
379            let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
380            let options = zip::write::SimpleFileOptions::default();
381            writer.start_file("file1.txt", options).unwrap();
382            writer.write_all(b"first").unwrap();
383            writer.start_file("file2.txt", options).unwrap();
384            writer.write_all(b"second").unwrap();
385            writer.finish().unwrap();
386        }
387        let config = ZipConfig {
388            allow_multi_entry: true,
389            ..Default::default()
390        };
391        let df = ZipDataFormat::new(config);
392        let body = Body::Bytes(Bytes::from(buf));
393        let result = df.unmarshal(body).unwrap();
394        match result {
395            Body::Bytes(b) => assert_eq!(b.as_ref(), b"first"),
396            _ => panic!("expected Body::Bytes"),
397        }
398    }
399
400    #[test]
401    fn test_roundtrip_text() {
402        let df = ZipDataFormat::default();
403        let original = Body::Text("roundtrip text content".to_string());
404        let compressed = df.marshal(original).unwrap();
405        let decompressed = df.unmarshal(compressed).unwrap();
406        match decompressed {
407            Body::Bytes(b) => assert_eq!(b.as_ref(), b"roundtrip text content"),
408            _ => panic!("expected Body::Bytes"),
409        }
410    }
411
412    #[test]
413    fn test_roundtrip_json() {
414        let df = ZipDataFormat::default();
415        let original = Body::Json(json!({"round": "trip"}));
416        let compressed = df.marshal(original).unwrap();
417        let decompressed = df.unmarshal(compressed).unwrap();
418        match decompressed {
419            Body::Bytes(b) => {
420                let v: serde_json::Value = serde_json::from_slice(&b).unwrap();
421                assert_eq!(v, json!({"round": "trip"}));
422            }
423            _ => panic!("expected Body::Bytes"),
424        }
425    }
426
427    #[test]
428    fn test_roundtrip_bytes() {
429        let df = ZipDataFormat::default();
430        let original = Body::Bytes(Bytes::from_static(b"\x00\x01\x02\xff"));
431        let compressed = df.marshal(original).unwrap();
432        let decompressed = df.unmarshal(compressed).unwrap();
433        match decompressed {
434            Body::Bytes(b) => assert_eq!(b.as_ref(), b"\x00\x01\x02\xff"),
435            _ => panic!("expected Body::Bytes"),
436        }
437    }
438
439    #[test]
440    fn test_max_decompressed_size_exceeded() {
441        let config = ZipConfig {
442            max_decompressed_size: 10,
443            ..Default::default()
444        };
445        let df = ZipDataFormat::new(config);
446        let zip_data = make_zip(b"this content is way longer than 10 bytes");
447        let body = Body::Bytes(Bytes::from(zip_data));
448        let result = df.unmarshal(body);
449        assert!(result.is_err());
450    }
451
452    #[test]
453    fn test_unmarshal_empty_error() {
454        let df = ZipDataFormat::default();
455        let result = df.unmarshal(Body::Empty);
456        assert!(result.is_err());
457    }
458
459    #[test]
460    fn test_unmarshal_stream_error() {
461        use camel_api::body::{StreamBody, StreamMetadata};
462        use futures::stream;
463        use std::sync::Arc;
464        use tokio::sync::Mutex;
465
466        let stream = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
467        let body = Body::Stream(StreamBody {
468            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
469            metadata: StreamMetadata::default(),
470        });
471        let df = ZipDataFormat::default();
472        let result = df.unmarshal(body);
473        assert!(result.is_err());
474    }
475
476    #[test]
477    fn test_marshal_invalid_compression_level() {
478        let config = ZipConfig {
479            compression_level: Some(42),
480            ..Default::default()
481        };
482        let df = ZipDataFormat::new(config);
483        let result = df.marshal(Body::Text("test".to_string()));
484        assert!(result.is_err());
485    }
486
487    #[test]
488    fn test_builtin_zip_registered() {
489        let df = super::super::builtin_data_format("zip");
490        assert!(df.is_some());
491        assert_eq!(df.unwrap().name(), "zip");
492    }
493}