Skip to main content

camel_component_file/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use futures::StreamExt;
8use regex::Regex;
9use tokio::fs;
10use tokio::time;
11use tokio_util::io::ReaderStream;
12use tower::Service;
13use tracing::{debug, warn};
14
15use camel_api::{
16    BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
17};
18use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
19use camel_endpoint::parse_uri;
20
21// ---------------------------------------------------------------------------
22// FileExistStrategy
23// ---------------------------------------------------------------------------
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum FileExistStrategy {
27    Override,
28    Append,
29    Fail,
30}
31
32impl FileExistStrategy {
33    fn from_str(s: &str) -> Self {
34        match s {
35            "Append" | "append" => FileExistStrategy::Append,
36            "Fail" | "fail" => FileExistStrategy::Fail,
37            _ => FileExistStrategy::Override,
38        }
39    }
40}
41
42// ---------------------------------------------------------------------------
43// FileConfig
44// ---------------------------------------------------------------------------
45
46/// Configuration for file component endpoints.
47///
48/// # Memory Limits and Stream Materialization
49///
50/// The file component uses a **100MB default memory limit** for stream materialization
51/// (configurable via `max_body_size`). This limit applies when the file producer needs to
52/// materialize a stream body into bytes for writing to disk.
53///
54/// ## Why This Limit?
55///
56/// This limit is designed for **batch file processing scenarios with trusted input**:
57/// - Processing moderate-sized files in batch jobs
58/// - Enterprise integration patterns with controlled file sizes
59/// - Scenarios where files are generated by trusted sources
60///
61/// The 100MB default balances memory safety with practical file processing needs in
62/// typical integration scenarios.
63///
64/// ## Overriding the Limit
65///
66/// Adjust the limit via the `maxBodySize` URI parameter (in bytes):
67///
68/// ```text
69/// file:/data/output?maxBodySize=524288000  // 500MB
70/// file:/data/output?maxBodySize=1048576    // 1MB
71/// ```
72///
73/// ## Behavior When Exceeded
74///
75/// When a file exceeds the configured `max_body_size`:
76/// - The producer returns a `CamelError::ProcessorError`
77/// - The exchange fails without writing partial data
78/// - No memory is exhausted - the check happens before full materialization
79///
80/// ## Large Files and Lazy Evaluation
81///
82/// **Important**: The file consumer uses **lazy stream evaluation**, allowing it to
83/// handle files of **any size** (including multi-gigabyte files) without loading them
84/// entirely into memory. Files are streamed chunk-by-chunk as the exchange flows
85/// through the route.
86///
87/// The memory limit only applies when:
88/// - A downstream processor requires the full body (e.g., `.into_bytes()`)
89/// - The producer needs to write the body to a file
90///
91/// For large file scenarios, consider:
92/// - Keeping the body as a stream throughout the route
93/// - Using streaming processors that don't require full materialization
94/// - Adjusting `max_body_size` only when you control the input sources
95#[derive(Debug, Clone)]
96pub struct FileConfig {
97    pub directory: String,
98    pub delay: Duration,
99    pub initial_delay: Duration,
100    pub noop: bool,
101    pub delete: bool,
102    pub move_to: Option<String>,
103    pub file_name: Option<String>,
104    pub include: Option<String>,
105    pub exclude: Option<String>,
106    pub recursive: bool,
107    pub file_exist: FileExistStrategy,
108    pub temp_prefix: Option<String>,
109    pub auto_create: bool,
110    // Timeout fields for preventing hanging on slow filesystems
111    pub read_timeout: Duration,
112    pub write_timeout: Duration,
113    // Memory limit for body materialization (in bytes)
114    pub max_body_size: usize,
115}
116
117impl FileConfig {
118    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
119        let parts = parse_uri(uri)?;
120        if parts.scheme != "file" {
121            return Err(CamelError::InvalidUri(format!(
122                "expected scheme 'file', got '{}'",
123                parts.scheme
124            )));
125        }
126
127        let delay = parts
128            .params
129            .get("delay")
130            .and_then(|v| v.parse::<u64>().ok())
131            .unwrap_or(500);
132
133        let initial_delay = parts
134            .params
135            .get("initialDelay")
136            .and_then(|v| v.parse::<u64>().ok())
137            .unwrap_or(1000);
138
139        let noop = parts
140            .params
141            .get("noop")
142            .map(|v| v == "true")
143            .unwrap_or(false);
144
145        let delete = parts
146            .params
147            .get("delete")
148            .map(|v| v == "true")
149            .unwrap_or(false);
150
151        let move_to = if noop || delete {
152            None
153        } else {
154            Some(
155                parts
156                    .params
157                    .get("move")
158                    .cloned()
159                    .unwrap_or_else(|| ".camel".to_string()),
160            )
161        };
162
163        let file_name = parts.params.get("fileName").cloned();
164        let include = parts.params.get("include").cloned();
165        let exclude = parts.params.get("exclude").cloned();
166
167        let recursive = parts
168            .params
169            .get("recursive")
170            .map(|v| v == "true")
171            .unwrap_or(false);
172
173        let file_exist = parts
174            .params
175            .get("fileExist")
176            .map(|v| FileExistStrategy::from_str(v))
177            .unwrap_or(FileExistStrategy::Override);
178
179        let temp_prefix = parts.params.get("tempPrefix").cloned();
180
181        let auto_create = parts
182            .params
183            .get("autoCreate")
184            .map(|v| v != "false")
185            .unwrap_or(true);
186
187        let read_timeout = parts
188            .params
189            .get("readTimeout")
190            .and_then(|v| v.parse::<u64>().ok())
191            .map(Duration::from_millis)
192            .unwrap_or(Duration::from_secs(30));
193
194        let write_timeout = parts
195            .params
196            .get("writeTimeout")
197            .and_then(|v| v.parse::<u64>().ok())
198            .map(Duration::from_millis)
199            .unwrap_or(Duration::from_secs(30));
200
201        let max_body_size = parts
202            .params
203            .get("maxBodySize")
204            .and_then(|v| v.parse::<usize>().ok())
205            .unwrap_or(100 * 1024 * 1024); // Default: 100MB
206
207        Ok(Self {
208            directory: parts.path,
209            delay: Duration::from_millis(delay),
210            initial_delay: Duration::from_millis(initial_delay),
211            noop,
212            delete,
213            move_to,
214            file_name,
215            include,
216            exclude,
217            recursive,
218            file_exist,
219            temp_prefix,
220            auto_create,
221            read_timeout,
222            write_timeout,
223            max_body_size,
224        })
225    }
226}
227
228// ---------------------------------------------------------------------------
229// FileComponent
230// ---------------------------------------------------------------------------
231
232pub struct FileComponent;
233
234impl FileComponent {
235    pub fn new() -> Self {
236        Self
237    }
238}
239
240impl Default for FileComponent {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246impl Component for FileComponent {
247    fn scheme(&self) -> &str {
248        "file"
249    }
250
251    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
252        let config = FileConfig::from_uri(uri)?;
253        Ok(Box::new(FileEndpoint {
254            uri: uri.to_string(),
255            config,
256        }))
257    }
258}
259
260// ---------------------------------------------------------------------------
261// FileEndpoint
262// ---------------------------------------------------------------------------
263
264struct FileEndpoint {
265    uri: String,
266    config: FileConfig,
267}
268
269impl Endpoint for FileEndpoint {
270    fn uri(&self) -> &str {
271        &self.uri
272    }
273
274    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
275        Ok(Box::new(FileConsumer {
276            config: self.config.clone(),
277        }))
278    }
279
280    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
281        Ok(BoxProcessor::new(FileProducer {
282            config: self.config.clone(),
283        }))
284    }
285}
286
287// ---------------------------------------------------------------------------
288// FileConsumer
289// ---------------------------------------------------------------------------
290
291struct FileConsumer {
292    config: FileConfig,
293}
294
295#[async_trait]
296impl Consumer for FileConsumer {
297    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
298        let config = self.config.clone();
299
300        let include_re = config
301            .include
302            .as_ref()
303            .map(|p| Regex::new(p))
304            .transpose()
305            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
306        let exclude_re = config
307            .exclude
308            .as_ref()
309            .map(|p| Regex::new(p))
310            .transpose()
311            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
312
313        if !config.initial_delay.is_zero() {
314            tokio::select! {
315                _ = time::sleep(config.initial_delay) => {}
316                _ = context.cancelled() => {
317                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
318                    return Ok(());
319                }
320            }
321        }
322
323        let mut interval = time::interval(config.delay);
324
325        loop {
326            tokio::select! {
327                _ = context.cancelled() => {
328                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
329                    break;
330                }
331                _ = interval.tick() => {
332                    if let Err(e) = poll_directory(
333                        &config,
334                        &context,
335                        &include_re,
336                        &exclude_re,
337                    ).await {
338                        warn!(directory = config.directory, error = %e, "Error polling directory");
339                    }
340                }
341            }
342        }
343
344        Ok(())
345    }
346
347    async fn stop(&mut self) -> Result<(), CamelError> {
348        Ok(())
349    }
350}
351
352async fn poll_directory(
353    config: &FileConfig,
354    context: &ConsumerContext,
355    include_re: &Option<Regex>,
356    exclude_re: &Option<Regex>,
357) -> Result<(), CamelError> {
358    let base_path = std::path::Path::new(&config.directory);
359
360    let files = list_files(base_path, config.recursive).await?;
361
362    for file_path in files {
363        let file_name = file_path
364            .file_name()
365            .and_then(|n| n.to_str())
366            .unwrap_or_default()
367            .to_string();
368
369        if let Some(ref target_name) = config.file_name
370            && file_name != *target_name
371        {
372            continue;
373        }
374
375        if let Some(re) = include_re
376            && !re.is_match(&file_name)
377        {
378            continue;
379        }
380
381        if let Some(re) = exclude_re
382            && re.is_match(&file_name)
383        {
384            continue;
385        }
386
387        if let Some(ref move_dir) = config.move_to
388            && file_path.starts_with(base_path.join(move_dir))
389        {
390            continue;
391        }
392
393        let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
394            let f = fs::File::open(&file_path).await?;
395            let m = f.metadata().await?;
396            Ok::<_, std::io::Error>((f, m))
397        })
398        .await
399        {
400            Ok(Ok((f, m))) => (f, Some(m)),
401            Ok(Err(e)) => {
402                warn!(
403                    file = %file_path.display(),
404                    error = %e,
405                    "Failed to open file"
406                );
407                continue;
408            }
409            Err(_) => {
410                warn!(
411                    file = %file_path.display(),
412                    timeout_ms = config.read_timeout.as_millis(),
413                    "Timeout opening file"
414                );
415                continue;
416            }
417        };
418
419        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
420        let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
421
422        let last_modified = metadata
423            .as_ref()
424            .and_then(|m| m.modified().ok())
425            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
426            .map(|d| d.as_millis() as u64)
427            .unwrap_or(0);
428
429        let relative_path = file_path
430            .strip_prefix(base_path)
431            .unwrap_or(&file_path)
432            .to_string_lossy()
433            .to_string();
434
435        let absolute_path = file_path
436            .canonicalize()
437            .unwrap_or_else(|_| file_path.clone())
438            .to_string_lossy()
439            .to_string();
440
441        let body = Body::Stream(StreamBody {
442            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
443            metadata: StreamMetadata {
444                size_hint: Some(file_len),
445                content_type: None,
446                origin: Some(absolute_path.clone()),
447            },
448        });
449
450        let mut exchange = Exchange::new(Message::new(body));
451        exchange
452            .input
453            .set_header("CamelFileName", serde_json::Value::String(relative_path));
454        exchange.input.set_header(
455            "CamelFileNameOnly",
456            serde_json::Value::String(file_name.clone()),
457        );
458        exchange.input.set_header(
459            "CamelFileAbsolutePath",
460            serde_json::Value::String(absolute_path),
461        );
462        exchange.input.set_header(
463            "CamelFileLength",
464            serde_json::Value::Number(file_len.into()),
465        );
466        exchange.input.set_header(
467            "CamelFileLastModified",
468            serde_json::Value::Number(last_modified.into()),
469        );
470
471        debug!(
472            file = %file_path.display(),
473            correlation_id = %exchange.correlation_id(),
474            "Processing file"
475        );
476
477        if context.send(exchange).await.is_err() {
478            break;
479        }
480
481        if config.noop {
482            // Do nothing
483        } else if config.delete {
484            if let Err(e) = fs::remove_file(&file_path).await {
485                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
486            }
487        } else if let Some(ref move_dir) = config.move_to {
488            let target_dir = base_path.join(move_dir);
489            if let Err(e) = fs::create_dir_all(&target_dir).await {
490                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
491                continue;
492            }
493            let target_path = target_dir.join(&file_name);
494            if let Err(e) = fs::rename(&file_path, &target_path).await {
495                warn!(
496                    from = %file_path.display(),
497                    to = %target_path.display(),
498                    error = %e,
499                    "Failed to move file"
500                );
501            }
502        }
503    }
504
505    Ok(())
506}
507
508async fn list_files(
509    dir: &std::path::Path,
510    recursive: bool,
511) -> Result<Vec<std::path::PathBuf>, CamelError> {
512    let mut files = Vec::new();
513    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
514
515    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
516        let path = entry.path();
517        if path.is_file() {
518            files.push(path);
519        } else if path.is_dir() && recursive {
520            let mut sub_files = Box::pin(list_files(&path, true)).await?;
521            files.append(&mut sub_files);
522        }
523    }
524
525    files.sort();
526    Ok(files)
527}
528
529// ---------------------------------------------------------------------------
530// Path validation for security
531// ---------------------------------------------------------------------------
532
533fn validate_path_is_within_base(
534    base_dir: &std::path::Path,
535    target_path: &std::path::Path,
536) -> Result<(), CamelError> {
537    let canonical_base = base_dir.canonicalize().map_err(|e| {
538        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
539    })?;
540
541    // For non-existent paths, canonicalize the parent and construct the full path
542    let canonical_target = if target_path.exists() {
543        target_path.canonicalize().map_err(|e| {
544            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
545        })?
546    } else if let Some(parent) = target_path.parent() {
547        // Ensure parent exists (should have been created by auto_create)
548        if !parent.exists() {
549            return Err(CamelError::ProcessorError(format!(
550                "Parent directory '{}' does not exist",
551                parent.display()
552            )));
553        }
554        let canonical_parent = parent.canonicalize().map_err(|e| {
555            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
556        })?;
557        // Reconstruct the full path with the filename
558        if let Some(filename) = target_path.file_name() {
559            canonical_parent.join(filename)
560        } else {
561            return Err(CamelError::ProcessorError(
562                "Invalid target path: no filename".to_string(),
563            ));
564        }
565    } else {
566        return Err(CamelError::ProcessorError(
567            "Invalid target path: no parent directory".to_string(),
568        ));
569    };
570
571    if !canonical_target.starts_with(&canonical_base) {
572        return Err(CamelError::ProcessorError(format!(
573            "Path '{}' is outside base directory '{}'",
574            canonical_target.display(),
575            canonical_base.display()
576        )));
577    }
578
579    Ok(())
580}
581
582// ---------------------------------------------------------------------------
583// FileProducer
584// ---------------------------------------------------------------------------
585
586#[derive(Clone)]
587struct FileProducer {
588    config: FileConfig,
589}
590
591impl FileProducer {
592    async fn body_to_bytes(body: Body, max_size: usize) -> Result<Vec<u8>, CamelError> {
593        let bytes = body.into_bytes(max_size).await?;
594        Ok(bytes.to_vec())
595    }
596
597    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
598        if let Some(name) = exchange
599            .input
600            .header("CamelFileName")
601            .and_then(|v| v.as_str())
602        {
603            return Ok(name.to_string());
604        }
605        if let Some(ref name) = config.file_name {
606            return Ok(name.clone());
607        }
608        Err(CamelError::ProcessorError(
609            "No filename specified: set CamelFileName header or fileName option".to_string(),
610        ))
611    }
612}
613
614impl Service<Exchange> for FileProducer {
615    type Response = Exchange;
616    type Error = CamelError;
617    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
618
619    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
620        Poll::Ready(Ok(()))
621    }
622
623    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
624        let config = self.config.clone();
625
626        Box::pin(async move {
627            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
628            let data =
629                FileProducer::body_to_bytes(exchange.input.body.clone(), config.max_body_size)
630                    .await?;
631
632            let dir_path = std::path::Path::new(&config.directory);
633            let target_path = dir_path.join(&file_name);
634
635            if config.auto_create
636                && let Some(parent) = target_path.parent()
637            {
638                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
639                    .await
640                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
641                    .map_err(CamelError::from)?;
642            }
643
644            // SECURITY: Validate path is within base directory
645            validate_path_is_within_base(dir_path, &target_path)?;
646
647            if target_path.exists() {
648                match config.file_exist {
649                    FileExistStrategy::Fail => {
650                        return Err(CamelError::ProcessorError(format!(
651                            "File already exists: {}",
652                            target_path.display()
653                        )));
654                    }
655                    FileExistStrategy::Append => {
656                        use tokio::io::AsyncWriteExt;
657                        let mut file = tokio::time::timeout(
658                            config.write_timeout,
659                            fs::OpenOptions::new().append(true).open(&target_path),
660                        )
661                        .await
662                        .map_err(|_| {
663                            CamelError::ProcessorError("Timeout opening file for append".into())
664                        })?
665                        .map_err(CamelError::from)?;
666
667                        tokio::time::timeout(config.write_timeout, async {
668                            file.write_all(&data).await?;
669                            file.flush().await?;
670                            Ok::<_, std::io::Error>(())
671                        })
672                        .await
673                        .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
674                        .map_err(CamelError::from)?;
675
676                        let abs_path = target_path
677                            .canonicalize()
678                            .unwrap_or_else(|_| target_path.clone())
679                            .to_string_lossy()
680                            .to_string();
681                        exchange.input.set_header(
682                            "CamelFileNameProduced",
683                            serde_json::Value::String(abs_path),
684                        );
685                        return Ok(exchange);
686                    }
687                    FileExistStrategy::Override => {}
688                }
689            }
690
691            if let Some(ref prefix) = config.temp_prefix {
692                let temp_name = format!("{prefix}{file_name}");
693                let temp_path = dir_path.join(&temp_name);
694
695                tokio::time::timeout(config.write_timeout, fs::write(&temp_path, &data))
696                    .await
697                    .map_err(|_| CamelError::ProcessorError("Timeout writing temp file".into()))?
698                    .map_err(CamelError::from)?;
699
700                tokio::time::timeout(config.write_timeout, fs::rename(&temp_path, &target_path))
701                    .await
702                    .map_err(|_| CamelError::ProcessorError("Timeout renaming file".into()))?
703                    .map_err(CamelError::from)?;
704            } else {
705                tokio::time::timeout(config.write_timeout, fs::write(&target_path, &data))
706                    .await
707                    .map_err(|_| CamelError::ProcessorError("Timeout writing file".into()))?
708                    .map_err(CamelError::from)?;
709            }
710
711            let abs_path = target_path
712                .canonicalize()
713                .unwrap_or_else(|_| target_path.clone())
714                .to_string_lossy()
715                .to_string();
716            exchange
717                .input
718                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
719
720            debug!(
721                file = %target_path.display(),
722                correlation_id = %exchange.correlation_id(),
723                "File written"
724            );
725            Ok(exchange)
726        })
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733    use std::sync::Arc;
734    use std::time::Duration;
735    use tokio::sync::Mutex;
736    use tokio_util::sync::CancellationToken;
737
738    // NullRouteController for testing
739    struct NullRouteController;
740
741    #[async_trait::async_trait]
742    impl camel_api::RouteController for NullRouteController {
743        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
744            Ok(())
745        }
746        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
747            Ok(())
748        }
749        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
750            Ok(())
751        }
752        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
753            Ok(())
754        }
755        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
756            Ok(())
757        }
758        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
759            None
760        }
761        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
762            Ok(())
763        }
764        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
765            Ok(())
766        }
767    }
768
769    fn test_producer_ctx() -> ProducerContext {
770        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
771    }
772
773    #[test]
774    fn test_file_config_defaults() {
775        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
776        assert_eq!(config.directory, "/tmp/inbox");
777        assert_eq!(config.delay, Duration::from_millis(500));
778        assert_eq!(config.initial_delay, Duration::from_millis(1000));
779        assert!(!config.noop);
780        assert!(!config.delete);
781        assert_eq!(config.move_to, Some(".camel".to_string()));
782        assert!(config.file_name.is_none());
783        assert!(config.include.is_none());
784        assert!(config.exclude.is_none());
785        assert!(!config.recursive);
786        assert_eq!(config.file_exist, FileExistStrategy::Override);
787        assert!(config.temp_prefix.is_none());
788        assert!(config.auto_create);
789        // New timeout defaults
790        assert_eq!(config.read_timeout, Duration::from_secs(30));
791        assert_eq!(config.write_timeout, Duration::from_secs(30));
792    }
793
794    #[test]
795    fn test_file_config_consumer_options() {
796        let config = FileConfig::from_uri(
797            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
798        ).unwrap();
799        assert_eq!(config.directory, "/data/input");
800        assert_eq!(config.delay, Duration::from_millis(1000));
801        assert_eq!(config.initial_delay, Duration::from_millis(2000));
802        assert!(config.noop);
803        assert!(config.recursive);
804        assert_eq!(config.include, Some(".*\\.csv".to_string()));
805    }
806
807    #[test]
808    fn test_file_config_producer_options() {
809        let config = FileConfig::from_uri(
810            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
811        )
812        .unwrap();
813        assert_eq!(config.file_exist, FileExistStrategy::Append);
814        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
815        assert!(!config.auto_create);
816        assert_eq!(config.file_name, Some("out.txt".to_string()));
817    }
818
819    #[test]
820    fn test_file_config_delete_mode() {
821        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
822        assert!(config.delete);
823        assert!(config.move_to.is_none());
824    }
825
826    #[test]
827    fn test_file_config_noop_mode() {
828        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
829        assert!(config.noop);
830        assert!(config.move_to.is_none());
831    }
832
833    #[test]
834    fn test_file_config_wrong_scheme() {
835        let result = FileConfig::from_uri("timer:tick");
836        assert!(result.is_err());
837    }
838
839    #[test]
840    fn test_file_component_scheme() {
841        let component = FileComponent::new();
842        assert_eq!(component.scheme(), "file");
843    }
844
845    #[test]
846    fn test_file_component_creates_endpoint() {
847        let component = FileComponent::new();
848        let endpoint = component.create_endpoint("file:/tmp/test");
849        assert!(endpoint.is_ok());
850    }
851
852    // -----------------------------------------------------------------------
853    // Consumer tests
854    // -----------------------------------------------------------------------
855
856    #[tokio::test]
857    async fn test_file_consumer_reads_files() {
858        let dir = tempfile::tempdir().unwrap();
859        let dir_path = dir.path().to_str().unwrap();
860
861        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
862        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
863
864        let component = FileComponent::new();
865        let endpoint = component
866            .create_endpoint(&format!(
867                "file:{dir_path}?noop=true&initialDelay=0&delay=100"
868            ))
869            .unwrap();
870        let mut consumer = endpoint.create_consumer().unwrap();
871
872        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
873        let token = CancellationToken::new();
874        let ctx = ConsumerContext::new(tx, token.clone());
875
876        tokio::spawn(async move {
877            consumer.start(ctx).await.unwrap();
878        });
879
880        let mut received = Vec::new();
881        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
882            while let Some(envelope) = rx.recv().await {
883                received.push(envelope.exchange);
884                if received.len() == 2 {
885                    break;
886                }
887            }
888        })
889        .await;
890        token.cancel();
891
892        assert!(timeout.is_ok(), "Should have received 2 exchanges");
893        assert_eq!(received.len(), 2);
894
895        for ex in &received {
896            assert!(ex.input.header("CamelFileName").is_some());
897            assert!(ex.input.header("CamelFileNameOnly").is_some());
898            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
899            assert!(ex.input.header("CamelFileLength").is_some());
900            assert!(ex.input.header("CamelFileLastModified").is_some());
901        }
902    }
903
904    #[tokio::test]
905    async fn test_file_consumer_include_filter() {
906        let dir = tempfile::tempdir().unwrap();
907        let dir_path = dir.path().to_str().unwrap();
908
909        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
910        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
911
912        let component = FileComponent::new();
913        let endpoint = component
914            .create_endpoint(&format!(
915                "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
916            ))
917            .unwrap();
918        let mut consumer = endpoint.create_consumer().unwrap();
919
920        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
921        let token = CancellationToken::new();
922        let ctx = ConsumerContext::new(tx, token.clone());
923
924        tokio::spawn(async move {
925            consumer.start(ctx).await.unwrap();
926        });
927
928        let mut received = Vec::new();
929        let _ = tokio::time::timeout(Duration::from_millis(500), async {
930            while let Some(envelope) = rx.recv().await {
931                received.push(envelope.exchange);
932                if received.len() == 1 {
933                    break;
934                }
935            }
936        })
937        .await;
938        token.cancel();
939
940        assert_eq!(received.len(), 1);
941        let name = received[0]
942            .input
943            .header("CamelFileNameOnly")
944            .and_then(|v| v.as_str())
945            .unwrap();
946        assert_eq!(name, "data.csv");
947    }
948
949    #[tokio::test]
950    async fn test_file_consumer_delete_mode() {
951        let dir = tempfile::tempdir().unwrap();
952        let dir_path = dir.path().to_str().unwrap();
953
954        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
955
956        let component = FileComponent::new();
957        let endpoint = component
958            .create_endpoint(&format!(
959                "file:{dir_path}?delete=true&initialDelay=0&delay=100"
960            ))
961            .unwrap();
962        let mut consumer = endpoint.create_consumer().unwrap();
963
964        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
965        let token = CancellationToken::new();
966        let ctx = ConsumerContext::new(tx, token.clone());
967
968        tokio::spawn(async move {
969            consumer.start(ctx).await.unwrap();
970        });
971
972        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
973        token.cancel();
974
975        tokio::time::sleep(Duration::from_millis(100)).await;
976
977        assert!(
978            !dir.path().join("deleteme.txt").exists(),
979            "File should be deleted"
980        );
981    }
982
983    #[tokio::test]
984    async fn test_file_consumer_move_mode() {
985        let dir = tempfile::tempdir().unwrap();
986        let dir_path = dir.path().to_str().unwrap();
987
988        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
989
990        let component = FileComponent::new();
991        let endpoint = component
992            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
993            .unwrap();
994        let mut consumer = endpoint.create_consumer().unwrap();
995
996        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
997        let token = CancellationToken::new();
998        let ctx = ConsumerContext::new(tx, token.clone());
999
1000        tokio::spawn(async move {
1001            consumer.start(ctx).await.unwrap();
1002        });
1003
1004        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1005        token.cancel();
1006
1007        tokio::time::sleep(Duration::from_millis(100)).await;
1008
1009        assert!(
1010            !dir.path().join("moveme.txt").exists(),
1011            "Original file should be gone"
1012        );
1013        assert!(
1014            dir.path().join(".camel").join("moveme.txt").exists(),
1015            "File should be in .camel/"
1016        );
1017    }
1018
1019    #[tokio::test]
1020    async fn test_file_consumer_respects_cancellation() {
1021        let dir = tempfile::tempdir().unwrap();
1022        let dir_path = dir.path().to_str().unwrap();
1023
1024        let component = FileComponent::new();
1025        let endpoint = component
1026            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1027            .unwrap();
1028        let mut consumer = endpoint.create_consumer().unwrap();
1029
1030        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1031        let token = CancellationToken::new();
1032        let ctx = ConsumerContext::new(tx, token.clone());
1033
1034        let handle = tokio::spawn(async move {
1035            consumer.start(ctx).await.unwrap();
1036        });
1037
1038        tokio::time::sleep(Duration::from_millis(150)).await;
1039        token.cancel();
1040
1041        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1042        assert!(
1043            result.is_ok(),
1044            "Consumer should have stopped after cancellation"
1045        );
1046    }
1047
1048    // -----------------------------------------------------------------------
1049    // Producer tests
1050    // -----------------------------------------------------------------------
1051
1052    #[tokio::test]
1053    async fn test_file_producer_writes_file() {
1054        use tower::ServiceExt;
1055
1056        let dir = tempfile::tempdir().unwrap();
1057        let dir_path = dir.path().to_str().unwrap();
1058
1059        let component = FileComponent::new();
1060        let endpoint = component
1061            .create_endpoint(&format!("file:{dir_path}"))
1062            .unwrap();
1063        let ctx = test_producer_ctx();
1064        let producer = endpoint.create_producer(&ctx).unwrap();
1065
1066        let mut exchange = Exchange::new(Message::new("file content"));
1067        exchange.input.set_header(
1068            "CamelFileName",
1069            serde_json::Value::String("output.txt".to_string()),
1070        );
1071
1072        let result = producer.oneshot(exchange).await.unwrap();
1073
1074        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1075        assert_eq!(content, "file content");
1076
1077        assert!(result.input.header("CamelFileNameProduced").is_some());
1078    }
1079
1080    #[tokio::test]
1081    async fn test_file_producer_auto_create_dirs() {
1082        use tower::ServiceExt;
1083
1084        let dir = tempfile::tempdir().unwrap();
1085        let dir_path = dir.path().to_str().unwrap();
1086
1087        let component = FileComponent::new();
1088        let endpoint = component
1089            .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1090            .unwrap();
1091        let ctx = test_producer_ctx();
1092        let producer = endpoint.create_producer(&ctx).unwrap();
1093
1094        let mut exchange = Exchange::new(Message::new("nested"));
1095        exchange.input.set_header(
1096            "CamelFileName",
1097            serde_json::Value::String("file.txt".to_string()),
1098        );
1099
1100        producer.oneshot(exchange).await.unwrap();
1101
1102        assert!(dir.path().join("sub/dir/file.txt").exists());
1103    }
1104
1105    #[tokio::test]
1106    async fn test_file_producer_file_exist_fail() {
1107        use tower::ServiceExt;
1108
1109        let dir = tempfile::tempdir().unwrap();
1110        let dir_path = dir.path().to_str().unwrap();
1111
1112        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1113
1114        let component = FileComponent::new();
1115        let endpoint = component
1116            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1117            .unwrap();
1118        let ctx = test_producer_ctx();
1119        let producer = endpoint.create_producer(&ctx).unwrap();
1120
1121        let mut exchange = Exchange::new(Message::new("new"));
1122        exchange.input.set_header(
1123            "CamelFileName",
1124            serde_json::Value::String("existing.txt".to_string()),
1125        );
1126
1127        let result = producer.oneshot(exchange).await;
1128        assert!(
1129            result.is_err(),
1130            "Should fail when file exists with Fail strategy"
1131        );
1132    }
1133
1134    #[tokio::test]
1135    async fn test_file_producer_file_exist_append() {
1136        use tower::ServiceExt;
1137
1138        let dir = tempfile::tempdir().unwrap();
1139        let dir_path = dir.path().to_str().unwrap();
1140
1141        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1142
1143        let component = FileComponent::new();
1144        let endpoint = component
1145            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1146            .unwrap();
1147        let ctx = test_producer_ctx();
1148        let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150        let mut exchange = Exchange::new(Message::new("new"));
1151        exchange.input.set_header(
1152            "CamelFileName",
1153            serde_json::Value::String("append.txt".to_string()),
1154        );
1155
1156        producer.oneshot(exchange).await.unwrap();
1157
1158        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1159        assert_eq!(content, "oldnew");
1160    }
1161
1162    #[tokio::test]
1163    async fn test_file_producer_temp_prefix() {
1164        use tower::ServiceExt;
1165
1166        let dir = tempfile::tempdir().unwrap();
1167        let dir_path = dir.path().to_str().unwrap();
1168
1169        let component = FileComponent::new();
1170        let endpoint = component
1171            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1172            .unwrap();
1173        let ctx = test_producer_ctx();
1174        let producer = endpoint.create_producer(&ctx).unwrap();
1175
1176        let mut exchange = Exchange::new(Message::new("atomic write"));
1177        exchange.input.set_header(
1178            "CamelFileName",
1179            serde_json::Value::String("final.txt".to_string()),
1180        );
1181
1182        producer.oneshot(exchange).await.unwrap();
1183
1184        assert!(dir.path().join("final.txt").exists());
1185        assert!(!dir.path().join(".tmpfinal.txt").exists());
1186        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1187        assert_eq!(content, "atomic write");
1188    }
1189
1190    #[tokio::test]
1191    async fn test_file_producer_uses_filename_option() {
1192        use tower::ServiceExt;
1193
1194        let dir = tempfile::tempdir().unwrap();
1195        let dir_path = dir.path().to_str().unwrap();
1196
1197        let component = FileComponent::new();
1198        let endpoint = component
1199            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1200            .unwrap();
1201        let ctx = test_producer_ctx();
1202        let producer = endpoint.create_producer(&ctx).unwrap();
1203
1204        let exchange = Exchange::new(Message::new("content"));
1205
1206        producer.oneshot(exchange).await.unwrap();
1207        assert!(dir.path().join("fixed.txt").exists());
1208    }
1209
1210    #[tokio::test]
1211    async fn test_file_producer_no_filename_errors() {
1212        use tower::ServiceExt;
1213
1214        let dir = tempfile::tempdir().unwrap();
1215        let dir_path = dir.path().to_str().unwrap();
1216
1217        let component = FileComponent::new();
1218        let endpoint = component
1219            .create_endpoint(&format!("file:{dir_path}"))
1220            .unwrap();
1221        let ctx = test_producer_ctx();
1222        let producer = endpoint.create_producer(&ctx).unwrap();
1223
1224        let exchange = Exchange::new(Message::new("content"));
1225
1226        let result = producer.oneshot(exchange).await;
1227        assert!(result.is_err(), "Should error when no filename is provided");
1228    }
1229
1230    // -----------------------------------------------------------------------
1231    // Security tests - Path traversal protection
1232    // -----------------------------------------------------------------------
1233
1234    #[tokio::test]
1235    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1236        use tower::ServiceExt;
1237
1238        let dir = tempfile::tempdir().unwrap();
1239        let dir_path = dir.path().to_str().unwrap();
1240
1241        // Create a subdirectory
1242        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1243        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1244
1245        let component = FileComponent::new();
1246        let endpoint = component
1247            .create_endpoint(&format!("file:{dir_path}/subdir"))
1248            .unwrap();
1249        let ctx = test_producer_ctx();
1250        let producer = endpoint.create_producer(&ctx).unwrap();
1251
1252        let mut exchange = Exchange::new(Message::new("malicious"));
1253        exchange.input.set_header(
1254            "CamelFileName",
1255            serde_json::Value::String("../secret.txt".to_string()),
1256        );
1257
1258        let result = producer.oneshot(exchange).await;
1259        assert!(result.is_err(), "Should reject path traversal attempt");
1260
1261        let err = result.unwrap_err();
1262        assert!(
1263            err.to_string().contains("outside"),
1264            "Error should mention path is outside base directory"
1265        );
1266    }
1267
1268    #[tokio::test]
1269    async fn test_file_producer_rejects_absolute_path_outside_base() {
1270        use tower::ServiceExt;
1271
1272        let dir = tempfile::tempdir().unwrap();
1273        let dir_path = dir.path().to_str().unwrap();
1274
1275        let component = FileComponent::new();
1276        let endpoint = component
1277            .create_endpoint(&format!("file:{dir_path}"))
1278            .unwrap();
1279        let ctx = test_producer_ctx();
1280        let producer = endpoint.create_producer(&ctx).unwrap();
1281
1282        let mut exchange = Exchange::new(Message::new("malicious"));
1283        exchange.input.set_header(
1284            "CamelFileName",
1285            serde_json::Value::String("/etc/passwd".to_string()),
1286        );
1287
1288        let result = producer.oneshot(exchange).await;
1289        assert!(result.is_err(), "Should reject absolute path outside base");
1290    }
1291
1292    // -----------------------------------------------------------------------
1293    // Large file streaming tests
1294    // -----------------------------------------------------------------------
1295
1296    #[tokio::test]
1297    #[ignore] // Slow test - run with --ignored flag
1298    async fn test_large_file_streaming_constant_memory() {
1299        use std::io::Write;
1300        use tempfile::NamedTempFile;
1301
1302        // Create a 150MB file (larger than 100MB limit)
1303        let mut temp_file = NamedTempFile::new().unwrap();
1304        let file_size = 150 * 1024 * 1024; // 150MB
1305        let chunk = vec![b'X'; 1024 * 1024]; // 1MB chunk
1306
1307        for _ in 0..150 {
1308            temp_file.write_all(&chunk).unwrap();
1309        }
1310        temp_file.flush().unwrap();
1311
1312        let dir = temp_file.path().parent().unwrap();
1313        let dir_path = dir.to_str().unwrap();
1314        let file_name = temp_file
1315            .path()
1316            .file_name()
1317            .unwrap()
1318            .to_str()
1319            .unwrap()
1320            .to_string();
1321
1322        // Read file as stream (should succeed with lazy evaluation)
1323        let component = FileComponent::new();
1324        let endpoint = component
1325            .create_endpoint(&format!(
1326                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1327            ))
1328            .unwrap();
1329        let mut consumer = endpoint.create_consumer().unwrap();
1330
1331        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1332        let token = CancellationToken::new();
1333        let ctx = ConsumerContext::new(tx, token.clone());
1334
1335        tokio::spawn(async move {
1336            let _ = consumer.start(ctx).await;
1337        });
1338
1339        let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1340            rx.recv().await.unwrap().exchange
1341        })
1342        .await
1343        .expect("Should receive exchange");
1344        token.cancel();
1345
1346        // Verify body is a stream (not materialized)
1347        assert!(matches!(exchange.input.body, Body::Stream(_)));
1348
1349        // Verify we can read metadata without consuming
1350        if let Body::Stream(ref stream_body) = exchange.input.body {
1351            assert!(stream_body.metadata.size_hint.is_some());
1352            let size = stream_body.metadata.size_hint.unwrap();
1353            assert_eq!(size, file_size as u64);
1354        }
1355
1356        // Materializing should fail (exceeds 100MB limit)
1357        if let Body::Stream(stream_body) = exchange.input.body {
1358            let body = Body::Stream(stream_body);
1359            let result = body.into_bytes(100 * 1024 * 1024).await;
1360            assert!(result.is_err());
1361        }
1362
1363        // But we CAN read chunks one at a time (simulating line-by-line processing)
1364        // This demonstrates lazy evaluation - we don't need to load entire file
1365        let component2 = FileComponent::new();
1366        let endpoint2 = component2
1367            .create_endpoint(&format!(
1368                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1369            ))
1370            .unwrap();
1371        let mut consumer2 = endpoint2.create_consumer().unwrap();
1372
1373        let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1374        let token2 = CancellationToken::new();
1375        let ctx2 = ConsumerContext::new(tx2, token2.clone());
1376
1377        tokio::spawn(async move {
1378            let _ = consumer2.start(ctx2).await;
1379        });
1380
1381        let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1382            rx2.recv().await.unwrap().exchange
1383        })
1384        .await
1385        .expect("Should receive exchange");
1386        token2.cancel();
1387
1388        if let Body::Stream(stream_body) = exchange2.input.body {
1389            let mut stream_lock = stream_body.stream.lock().await;
1390            let mut stream = stream_lock.take().unwrap();
1391
1392            // Read first chunk (size varies based on ReaderStream's buffer)
1393            if let Some(chunk_result) = stream.next().await {
1394                let chunk = chunk_result.unwrap();
1395                assert!(!chunk.is_empty());
1396                assert!(chunk.len() < file_size);
1397                // Memory usage is constant - we only have this chunk in memory, not 150MB
1398            }
1399        }
1400    }
1401}