camel_processor/data_format/
zip.rs1use bytes::Bytes;
2use camel_api::body::Body;
3use camel_api::data_format::DataFormat;
4use camel_api::error::CamelError;
5use std::io::Read;
6use std::io::Write;
7use zip::ZipArchive;
8
9const DEFAULT_MAX_DECOMPRESSED_SIZE: u64 = 1_073_741_824;
10const ENTRY_NAME: &str = "payload";
11
12#[derive(Debug, Clone)]
13pub struct ZipConfig {
14 pub max_decompressed_size: u64,
15 pub compression_level: Option<i32>,
16 pub allow_multi_entry: bool,
17}
18
19impl Default for ZipConfig {
20 fn default() -> Self {
21 Self {
22 max_decompressed_size: DEFAULT_MAX_DECOMPRESSED_SIZE,
23 compression_level: None,
24 allow_multi_entry: false,
25 }
26 }
27}
28
29#[derive(Debug, Clone, Default)]
30pub struct ZipDataFormat {
31 config: ZipConfig,
32}
33
34impl ZipDataFormat {
35 pub fn new(config: ZipConfig) -> Self {
36 Self { config }
37 }
38}
39
40impl DataFormat for ZipDataFormat {
41 fn name(&self) -> &str {
42 "zip"
43 }
44
45 fn marshal(&self, body: Body) -> Result<Body, CamelError> {
46 let content: Vec<u8> = match &body {
47 Body::Text(s) => s.as_bytes().to_vec(),
48 Body::Json(v) => serde_json::to_vec(v).map_err(|e| {
49 CamelError::TypeConversionFailed(format!(
50 "ZipDataFormat::marshal cannot serialize JSON: {e}"
51 ))
52 })?,
53 Body::Bytes(b) => b.to_vec(),
54 Body::Xml(s) => s.as_bytes().to_vec(),
55 Body::Empty => {
56 return Err(CamelError::TypeConversionFailed(
57 "ZipDataFormat::marshal requires non-empty body".to_string(),
58 ));
59 }
60 Body::Stream(_) => {
61 return Err(CamelError::TypeConversionFailed(
62 "cannot marshal Body::Stream — add 'stream_cache' or 'convert_body_to' before this step".to_string(),
63 ));
64 }
65 };
66
67 let mut buf = Vec::new();
68 {
69 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
70 let mut options = zip::write::SimpleFileOptions::default()
71 .compression_method(zip::CompressionMethod::Deflated);
72 if let Some(level) = self.config.compression_level {
73 if !(0..=9).contains(&level) {
74 return Err(CamelError::TypeConversionFailed(format!(
75 "ZipDataFormat::marshal compression_level must be 0-9, got {level}"
76 )));
77 }
78 options = options.compression_level(Some(level as i64));
79 }
80 writer.start_file(ENTRY_NAME, options).map_err(|e| {
81 CamelError::TypeConversionFailed(format!(
82 "ZipDataFormat::marshal failed to start entry: {e}"
83 ))
84 })?;
85 writer.write_all(&content).map_err(|e| {
86 CamelError::TypeConversionFailed(format!(
87 "ZipDataFormat::marshal failed to write entry: {e}"
88 ))
89 })?;
90 writer.finish().map_err(|e| {
91 CamelError::TypeConversionFailed(format!(
92 "ZipDataFormat::marshal failed to finalize archive: {e}"
93 ))
94 })?;
95 }
96
97 Ok(Body::Bytes(Bytes::from(buf)))
98 }
99
100 fn unmarshal(&self, body: Body) -> Result<Body, CamelError> {
101 let raw: Vec<u8> = match &body {
102 Body::Bytes(b) => b.to_vec(),
103 Body::Text(s) => s.as_bytes().to_vec(),
104 Body::Empty => {
105 return Err(CamelError::TypeConversionFailed(
106 "ZipDataFormat::unmarshal requires non-empty body".to_string(),
107 ));
108 }
109 Body::Stream(_) => {
110 return Err(CamelError::TypeConversionFailed(
111 "cannot unmarshal Body::Stream — use UnmarshalService which auto-materializes"
112 .to_string(),
113 ));
114 }
115 Body::Json(_) | Body::Xml(_) => {
116 return Err(CamelError::TypeConversionFailed(
117 "ZipDataFormat::unmarshal only supports Body::Bytes and Body::Text (ZIP data)"
118 .to_string(),
119 ));
120 }
121 };
122
123 let reader = std::io::Cursor::new(&raw);
124 let mut archive = ZipArchive::new(reader).map_err(|e| {
125 CamelError::TypeConversionFailed(format!("ZipDataFormat::unmarshal invalid ZIP: {e}"))
126 })?;
127
128 if archive.is_empty() {
129 return Err(CamelError::TypeConversionFailed(
130 "ZipDataFormat::unmarshal ZIP archive has no entries".to_string(),
131 ));
132 }
133
134 if archive.len() > 1 && !self.config.allow_multi_entry {
135 return Err(CamelError::TypeConversionFailed(format!(
136 "ZipDataFormat::unmarshal ZIP has {} entries but allow_multi_entry is false",
137 archive.len()
138 )));
139 }
140
141 if archive.len() > 1 {
142 tracing::warn!(
143 entries = archive.len(),
144 "ZIP archive has multiple entries, extracting first only"
145 );
146 }
147
148 let mut entry = archive.by_index(0).map_err(|e| {
149 CamelError::TypeConversionFailed(format!(
150 "ZipDataFormat::unmarshal failed to read entry: {e}"
151 ))
152 })?;
153
154 let mut decompressed = Vec::new();
155 let limit = self.config.max_decompressed_size.saturating_add(1);
156 let mut limited = std::io::Read::take(&mut entry, limit);
157 limited.read_to_end(&mut decompressed).map_err(|e| {
158 CamelError::TypeConversionFailed(format!(
159 "ZipDataFormat::unmarshal failed to decompress: {e}"
160 ))
161 })?;
162
163 if decompressed.len() as u64 > self.config.max_decompressed_size {
164 return Err(CamelError::TypeConversionFailed(format!(
165 "ZipDataFormat::unmarshal decompressed size exceeds max {}",
166 self.config.max_decompressed_size
167 )));
168 }
169
170 Ok(Body::Bytes(Bytes::from(decompressed)))
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use bytes::Bytes;
178 use serde_json::json;
179 use std::io::Cursor;
180 use std::io::Read;
181 use zip::ZipArchive;
182
183 fn extract_single_entry(zip_bytes: &[u8]) -> Vec<u8> {
184 let reader = Cursor::new(zip_bytes);
185 let mut archive = ZipArchive::new(reader).unwrap();
186 let mut entry = archive.by_index(0).unwrap();
187 let name = entry.name().to_string();
188 assert_eq!(name, "payload");
189 let mut buf = Vec::new();
190 entry.read_to_end(&mut buf).unwrap();
191 buf
192 }
193
194 #[test]
195 fn test_name() {
196 let df = ZipDataFormat::default();
197 assert_eq!(df.name(), "zip");
198 }
199
200 #[test]
201 fn test_marshal_text_to_zip() {
202 let df = ZipDataFormat::default();
203 let body = Body::Text("hello world".to_string());
204 let result = df.marshal(body).unwrap();
205 match result {
206 Body::Bytes(b) => {
207 let decompressed = extract_single_entry(&b);
208 assert_eq!(decompressed, b"hello world");
209 }
210 _ => panic!("expected Body::Bytes"),
211 }
212 }
213
214 #[test]
215 fn test_marshal_json_to_zip() {
216 let df = ZipDataFormat::default();
217 let body = Body::Json(json!({"key": "value"}));
218 let result = df.marshal(body).unwrap();
219 match result {
220 Body::Bytes(b) => {
221 let decompressed = extract_single_entry(&b);
222 let original = serde_json::to_vec(&json!({"key": "value"})).unwrap();
223 assert_eq!(decompressed, original);
224 }
225 _ => panic!("expected Body::Bytes"),
226 }
227 }
228
229 #[test]
230 fn test_marshal_bytes_to_zip() {
231 let df = ZipDataFormat::default();
232 let body = Body::Bytes(Bytes::from_static(b"raw bytes"));
233 let result = df.marshal(body).unwrap();
234 match result {
235 Body::Bytes(b) => {
236 let decompressed = extract_single_entry(&b);
237 assert_eq!(decompressed, b"raw bytes");
238 }
239 _ => panic!("expected Body::Bytes"),
240 }
241 }
242
243 #[test]
244 fn test_marshal_xml_to_zip() {
245 let df = ZipDataFormat::default();
246 let body = Body::Xml("<root><item>1</item></root>".to_string());
247 let result = df.marshal(body).unwrap();
248 match result {
249 Body::Bytes(b) => {
250 let decompressed = extract_single_entry(&b);
251 assert_eq!(decompressed, b"<root><item>1</item></root>");
252 }
253 _ => panic!("expected Body::Bytes"),
254 }
255 }
256
257 #[test]
258 fn test_marshal_empty_error() {
259 let df = ZipDataFormat::default();
260 let result = df.marshal(Body::Empty);
261 assert!(result.is_err());
262 }
263
264 #[test]
265 fn test_marshal_stream_error() {
266 use camel_api::body::{StreamBody, StreamMetadata};
267 use futures::stream;
268 use std::sync::Arc;
269 use tokio::sync::Mutex;
270
271 let stream = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
272 let body = Body::Stream(StreamBody {
273 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
274 metadata: StreamMetadata::default(),
275 });
276 let df = ZipDataFormat::default();
277 let result = df.marshal(body);
278 assert!(result.is_err());
279 }
280
281 fn make_zip(content: &[u8]) -> Vec<u8> {
282 let mut buf = Vec::new();
283 {
284 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
285 let options = zip::write::SimpleFileOptions::default()
286 .compression_method(zip::CompressionMethod::Deflated);
287 writer.start_file("payload", options).unwrap();
288 writer.write_all(content).unwrap();
289 writer.finish().unwrap();
290 }
291 buf
292 }
293
294 #[test]
295 fn test_unmarshal_zip_bytes() {
296 let df = ZipDataFormat::default();
297 let zip_data = make_zip(b"decompressed content");
298 let body = Body::Bytes(Bytes::from(zip_data));
299 let result = df.unmarshal(body).unwrap();
300 match result {
301 Body::Bytes(b) => assert_eq!(b.as_ref(), b"decompressed content"),
302 _ => panic!("expected Body::Bytes"),
303 }
304 }
305
306 #[test]
307 fn test_unmarshal_zip_text() {
308 let df = ZipDataFormat::default();
309 let content = b"text from text body";
310 let zip_data = make_zip(content);
311 let text_body = unsafe { String::from_utf8_unchecked(zip_data) };
312 let body = Body::Text(text_body);
313 let result = df.unmarshal(body).unwrap();
314 match result {
315 Body::Bytes(b) => assert_eq!(b.as_ref(), content),
316 _ => panic!("expected Body::Bytes"),
317 }
318 }
319
320 #[test]
321 fn test_unmarshal_invalid_zip_error() {
322 let df = ZipDataFormat::default();
323 let body = Body::Bytes(Bytes::from_static(b"not a zip file"));
324 let result = df.unmarshal(body);
325 assert!(result.is_err());
326 }
327
328 #[test]
329 fn test_unmarshal_empty_zip_error() {
330 let mut buf = Vec::new();
331 {
332 let writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
333 writer.finish().unwrap();
334 }
335 let df = ZipDataFormat::default();
336 let body = Body::Bytes(Bytes::from(buf));
337 let result = df.unmarshal(body);
338 assert!(result.is_err());
339 }
340
341 #[test]
342 fn test_unmarshal_json_error() {
343 let df = ZipDataFormat::default();
344 let body = Body::Json(json!({"not": "zip"}));
345 let result = df.unmarshal(body);
346 assert!(result.is_err());
347 }
348
349 #[test]
350 fn test_unmarshal_xml_error() {
351 let df = ZipDataFormat::default();
352 let body = Body::Xml("<root/>".to_string());
353 let result = df.unmarshal(body);
354 assert!(result.is_err());
355 }
356
357 #[test]
358 fn test_unmarshal_multi_entry_error() {
359 let mut buf = Vec::new();
360 {
361 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
362 let options = zip::write::SimpleFileOptions::default();
363 writer.start_file("file1.txt", options).unwrap();
364 writer.write_all(b"one").unwrap();
365 writer.start_file("file2.txt", options).unwrap();
366 writer.write_all(b"two").unwrap();
367 writer.finish().unwrap();
368 }
369 let df = ZipDataFormat::default();
370 let body = Body::Bytes(Bytes::from(buf));
371 let result = df.unmarshal(body);
372 assert!(result.is_err());
373 }
374
375 #[test]
376 fn test_unmarshal_multi_entry_allowed() {
377 let mut buf = Vec::new();
378 {
379 let mut writer = zip::ZipWriter::new(std::io::Cursor::new(&mut buf));
380 let options = zip::write::SimpleFileOptions::default();
381 writer.start_file("file1.txt", options).unwrap();
382 writer.write_all(b"first").unwrap();
383 writer.start_file("file2.txt", options).unwrap();
384 writer.write_all(b"second").unwrap();
385 writer.finish().unwrap();
386 }
387 let config = ZipConfig {
388 allow_multi_entry: true,
389 ..Default::default()
390 };
391 let df = ZipDataFormat::new(config);
392 let body = Body::Bytes(Bytes::from(buf));
393 let result = df.unmarshal(body).unwrap();
394 match result {
395 Body::Bytes(b) => assert_eq!(b.as_ref(), b"first"),
396 _ => panic!("expected Body::Bytes"),
397 }
398 }
399
400 #[test]
401 fn test_roundtrip_text() {
402 let df = ZipDataFormat::default();
403 let original = Body::Text("roundtrip text content".to_string());
404 let compressed = df.marshal(original).unwrap();
405 let decompressed = df.unmarshal(compressed).unwrap();
406 match decompressed {
407 Body::Bytes(b) => assert_eq!(b.as_ref(), b"roundtrip text content"),
408 _ => panic!("expected Body::Bytes"),
409 }
410 }
411
412 #[test]
413 fn test_roundtrip_json() {
414 let df = ZipDataFormat::default();
415 let original = Body::Json(json!({"round": "trip"}));
416 let compressed = df.marshal(original).unwrap();
417 let decompressed = df.unmarshal(compressed).unwrap();
418 match decompressed {
419 Body::Bytes(b) => {
420 let v: serde_json::Value = serde_json::from_slice(&b).unwrap();
421 assert_eq!(v, json!({"round": "trip"}));
422 }
423 _ => panic!("expected Body::Bytes"),
424 }
425 }
426
427 #[test]
428 fn test_roundtrip_bytes() {
429 let df = ZipDataFormat::default();
430 let original = Body::Bytes(Bytes::from_static(b"\x00\x01\x02\xff"));
431 let compressed = df.marshal(original).unwrap();
432 let decompressed = df.unmarshal(compressed).unwrap();
433 match decompressed {
434 Body::Bytes(b) => assert_eq!(b.as_ref(), b"\x00\x01\x02\xff"),
435 _ => panic!("expected Body::Bytes"),
436 }
437 }
438
439 #[test]
440 fn test_max_decompressed_size_exceeded() {
441 let config = ZipConfig {
442 max_decompressed_size: 10,
443 ..Default::default()
444 };
445 let df = ZipDataFormat::new(config);
446 let zip_data = make_zip(b"this content is way longer than 10 bytes");
447 let body = Body::Bytes(Bytes::from(zip_data));
448 let result = df.unmarshal(body);
449 assert!(result.is_err());
450 }
451
452 #[test]
453 fn test_unmarshal_empty_error() {
454 let df = ZipDataFormat::default();
455 let result = df.unmarshal(Body::Empty);
456 assert!(result.is_err());
457 }
458
459 #[test]
460 fn test_unmarshal_stream_error() {
461 use camel_api::body::{StreamBody, StreamMetadata};
462 use futures::stream;
463 use std::sync::Arc;
464 use tokio::sync::Mutex;
465
466 let stream = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
467 let body = Body::Stream(StreamBody {
468 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
469 metadata: StreamMetadata::default(),
470 });
471 let df = ZipDataFormat::default();
472 let result = df.unmarshal(body);
473 assert!(result.is_err());
474 }
475
476 #[test]
477 fn test_marshal_invalid_compression_level() {
478 let config = ZipConfig {
479 compression_level: Some(42),
480 ..Default::default()
481 };
482 let df = ZipDataFormat::new(config);
483 let result = df.marshal(Body::Text("test".to_string()));
484 assert!(result.is_err());
485 }
486
487 #[test]
488 fn test_builtin_zip_registered() {
489 let df = super::super::builtin_data_format("zip");
490 assert!(df.is_some());
491 assert_eq!(df.unwrap().name(), "zip");
492 }
493}