Skip to main content

camel_component_file/
lib.rs

1pub mod bundle;
2
3pub use bundle::FileBundle;
4
5use std::collections::HashSet;
6use std::future::Future;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use futures::StreamExt;
15use regex::Regex;
16use tokio::fs;
17use tokio::fs::OpenOptions;
18use tokio::io;
19use tokio::io::AsyncWriteExt;
20use tokio::time;
21use tokio_util::io::ReaderStream;
22use tower::Service;
23use tracing::{debug, warn};
24
25use camel_component_api::{
26    Body, BoxProcessor, CamelError, Exchange, Message, StreamBody, StreamMetadata,
27};
28use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
29use camel_component_api::{UriConfig, parse_uri};
30use camel_language_api::Language;
31use camel_language_simple::SimpleLanguage;
32
33// ---------------------------------------------------------------------------
34// TempFileGuard — RAII cleanup for temp files (panic-safe)
35// ---------------------------------------------------------------------------
36
37/// RAII guard that ensures temp file cleanup even on panic.
38///
39/// When dropped, removes the file at `path` unless `disarm` is set to true.
40/// This protects against temp file leaks if `io::copy` panics mid-write.
41struct TempFileGuard {
42    path: PathBuf,
43    disarm: bool,
44}
45
46impl TempFileGuard {
47    fn new(path: PathBuf) -> Self {
48        Self {
49            path,
50            disarm: false,
51        }
52    }
53
54    /// Call after successful rename to prevent cleanup.
55    fn disarm(&mut self) {
56        self.disarm = true;
57    }
58}
59
60impl Drop for TempFileGuard {
61    fn drop(&mut self) {
62        if !self.disarm {
63            // Best-effort cleanup; ignore errors (file may not exist)
64            let _ = std::fs::remove_file(&self.path);
65        }
66    }
67}
68
69// ---------------------------------------------------------------------------
70// FileExistStrategy
71// ---------------------------------------------------------------------------
72
73/// Strategy for handling existing files when writing.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum FileExistStrategy {
76    /// Overwrite existing file (default).
77    #[default]
78    Override,
79    /// Append to existing file.
80    Append,
81    /// Fail if file exists.
82    Fail,
83}
84
85impl FromStr for FileExistStrategy {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        match s {
90            "Override" | "override" => Ok(FileExistStrategy::Override),
91            "Append" | "append" => Ok(FileExistStrategy::Append),
92            "Fail" | "fail" => Ok(FileExistStrategy::Fail),
93            _ => Ok(FileExistStrategy::Override), // Default for unknown values
94        }
95    }
96}
97
98// ---------------------------------------------------------------------------
99// FileGlobalConfig
100// ---------------------------------------------------------------------------
101
102/// Global configuration for File component.
103/// Supports serde deserialization with defaults and builder methods.
104/// These are the fallback defaults when URI params are not set.
105#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
106#[serde(default)]
107pub struct FileGlobalConfig {
108    pub delay_ms: u64,
109    pub initial_delay_ms: u64,
110    pub read_timeout_ms: u64,
111    pub write_timeout_ms: u64,
112}
113
114impl Default for FileGlobalConfig {
115    fn default() -> Self {
116        Self {
117            delay_ms: 500,
118            initial_delay_ms: 1_000,
119            read_timeout_ms: 30_000,
120            write_timeout_ms: 30_000,
121        }
122    }
123}
124
125impl FileGlobalConfig {
126    pub fn new() -> Self {
127        Self::default()
128    }
129    pub fn with_delay_ms(mut self, v: u64) -> Self {
130        self.delay_ms = v;
131        self
132    }
133    pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
134        self.initial_delay_ms = v;
135        self
136    }
137    pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
138        self.read_timeout_ms = v;
139        self
140    }
141    pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
142        self.write_timeout_ms = v;
143        self
144    }
145}
146
147// ---------------------------------------------------------------------------
148// FileConfig
149// ---------------------------------------------------------------------------
150
151/// Configuration for file component endpoints.
152///
153/// # Streaming
154///
155/// Both the file consumer and producer use **native streaming** with no RAM
156/// materialization:
157///
158/// - The **consumer** creates a `Body::Stream` backed by `tokio::fs::File` via
159///   `ReaderStream`. Files of any size are handled without loading them into memory.
160///
161/// - The **producer** writes via `tokio::io::copy` directly to a `tokio::fs::File`
162///   using `Body::into_async_read()`. Writes for the `Override` strategy are
163///   **atomic**: data is written to a temporary file first and renamed only on
164///   success, preventing partial files on failure.
165///
166/// # Write strategies (`fileExist` URI parameter)
167///
168/// | Value | Behavior |
169/// |-------|----------|
170/// | `Override` (default) | Atomic write via temp file + rename |
171/// | `Append` | Appends to existing file; non-atomic by nature |
172/// | `Fail` | Returns error if file already exists |
173#[derive(Debug, Clone, UriConfig)]
174#[uri_scheme = "file"]
175#[uri_config(skip_impl, crate = "camel_component_api")]
176pub struct FileConfig {
177    /// Directory path to read from or write to.
178    pub directory: String,
179
180    /// Polling delay in milliseconds (companion field for `delay`).
181    #[allow(dead_code)]
182    #[uri_param(name = "delay", default = "500")]
183    delay_ms: u64,
184
185    /// Polling delay as Duration.
186    pub delay: Duration,
187
188    /// Initial delay in milliseconds (companion field for `initial_delay`).
189    #[allow(dead_code)]
190    #[uri_param(name = "initialDelay", default = "1000")]
191    initial_delay_ms: u64,
192
193    /// Initial delay as Duration.
194    pub initial_delay: Duration,
195
196    /// If true, don't delete or move files after processing.
197    #[uri_param(default = "false")]
198    pub noop: bool,
199
200    /// If true, delete files after processing.
201    #[uri_param(default = "false")]
202    pub delete: bool,
203
204    /// Directory to move processed files to (only if not noop/delete).
205    /// Default is ".camel" when not specified and noop/delete are false.
206    #[uri_param(name = "move")]
207    move_to: Option<String>,
208
209    /// Fixed filename for producer (optional).
210    #[uri_param(name = "fileName")]
211    pub file_name: Option<String>,
212
213    /// Regex pattern for including files (consumer).
214    #[uri_param]
215    pub include: Option<String>,
216
217    /// Regex pattern for excluding files (consumer).
218    #[uri_param]
219    pub exclude: Option<String>,
220
221    /// Whether to scan directories recursively.
222    #[uri_param(default = "false")]
223    pub recursive: bool,
224
225    /// Strategy for handling existing files when writing.
226    #[uri_param(name = "fileExist", default = "Override")]
227    pub file_exist: FileExistStrategy,
228
229    /// Prefix for temporary files during atomic writes.
230    #[uri_param(name = "tempPrefix")]
231    pub temp_prefix: Option<String>,
232
233    /// Whether to automatically create directories.
234    #[uri_param(name = "autoCreate", default = "true")]
235    pub auto_create: bool,
236
237    /// Read timeout in milliseconds (companion field for `read_timeout`).
238    #[allow(dead_code)]
239    #[uri_param(name = "readTimeout", default = "30000")]
240    read_timeout_ms: u64,
241
242    /// Read timeout as Duration.
243    pub read_timeout: Duration,
244
245    /// Write timeout in milliseconds (companion field for `write_timeout`).
246    #[allow(dead_code)]
247    #[uri_param(name = "writeTimeout", default = "30000")]
248    write_timeout_ms: u64,
249
250    /// Write timeout as Duration.
251    pub write_timeout: Duration,
252}
253
254impl UriConfig for FileConfig {
255    fn scheme() -> &'static str {
256        "file"
257    }
258
259    fn from_uri(uri: &str) -> Result<Self, CamelError> {
260        let parts = parse_uri(uri)?;
261        Self::from_components(parts)
262    }
263
264    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
265        Self::parse_uri_components(parts)?.validate()
266    }
267
268    fn validate(self) -> Result<Self, CamelError> {
269        // Apply conditional logic for move_to:
270        // - If noop or delete is true, move_to should be None
271        // - Otherwise, if move_to is None, default to ".camel"
272        let move_to = if self.noop || self.delete {
273            None
274        } else {
275            Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
276        };
277
278        Ok(Self { move_to, ..self })
279    }
280}
281
282impl FileConfig {
283    /// Apply global config defaults. Since FileConfig uses a proc macro that bakes in
284    /// defaults, we compare Duration values against the known macro defaults to detect
285    /// "not explicitly set by user". Only overrides when current value == macro default.
286    ///
287    /// **Note**: If a user explicitly sets a URI param to its default value (e.g.,
288    /// `?delay=500`), it is indistinguishable from "not set" and will be overridden
289    /// by global config. This is a known limitation of the Duration comparison approach.
290    pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
291        if self.delay == Duration::from_millis(500) {
292            self.delay = Duration::from_millis(global.delay_ms);
293        }
294        if self.initial_delay == Duration::from_millis(1_000) {
295            self.initial_delay = Duration::from_millis(global.initial_delay_ms);
296        }
297        if self.read_timeout == Duration::from_millis(30_000) {
298            self.read_timeout = Duration::from_millis(global.read_timeout_ms);
299        }
300        if self.write_timeout == Duration::from_millis(30_000) {
301            self.write_timeout = Duration::from_millis(global.write_timeout_ms);
302        }
303    }
304}
305
306// ---------------------------------------------------------------------------
307// FileComponent
308// ---------------------------------------------------------------------------
309
310pub struct FileComponent {
311    config: Option<FileGlobalConfig>,
312}
313
314impl FileComponent {
315    pub fn new() -> Self {
316        Self { config: None }
317    }
318
319    pub fn with_config(config: FileGlobalConfig) -> Self {
320        Self {
321            config: Some(config),
322        }
323    }
324
325    pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
326        Self { config }
327    }
328}
329
330impl Default for FileComponent {
331    fn default() -> Self {
332        Self::new()
333    }
334}
335
336impl Component for FileComponent {
337    fn scheme(&self) -> &str {
338        "file"
339    }
340
341    fn create_endpoint(
342        &self,
343        uri: &str,
344        _ctx: &dyn camel_component_api::ComponentContext,
345    ) -> Result<Box<dyn Endpoint>, CamelError> {
346        let mut config = FileConfig::from_uri(uri)?;
347        if let Some(ref global_config) = self.config {
348            config.apply_global_defaults(global_config);
349        }
350        Ok(Box::new(FileEndpoint {
351            uri: uri.to_string(),
352            config,
353        }))
354    }
355}
356
357// ---------------------------------------------------------------------------
358// FileEndpoint
359// ---------------------------------------------------------------------------
360
361struct FileEndpoint {
362    uri: String,
363    config: FileConfig,
364}
365
366impl Endpoint for FileEndpoint {
367    fn uri(&self) -> &str {
368        &self.uri
369    }
370
371    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
372        Ok(Box::new(FileConsumer {
373            config: self.config.clone(),
374            seen: HashSet::new(),
375        }))
376    }
377
378    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
379        Ok(BoxProcessor::new(FileProducer {
380            config: self.config.clone(),
381        }))
382    }
383}
384
385// ---------------------------------------------------------------------------
386// FileConsumer
387// ---------------------------------------------------------------------------
388
389struct FileConsumer {
390    config: FileConfig,
391    seen: HashSet<PathBuf>,
392}
393
394#[async_trait]
395impl Consumer for FileConsumer {
396    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
397        let config = self.config.clone();
398
399        let include_re = config
400            .include
401            .as_ref()
402            .map(|p| Regex::new(p))
403            .transpose()
404            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
405        let exclude_re = config
406            .exclude
407            .as_ref()
408            .map(|p| Regex::new(p))
409            .transpose()
410            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
411
412        if !config.initial_delay.is_zero() {
413            tokio::select! {
414                _ = time::sleep(config.initial_delay) => {}
415                _ = context.cancelled() => {
416                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
417                    return Ok(());
418                }
419            }
420        }
421
422        let mut interval = time::interval(config.delay);
423
424        loop {
425            tokio::select! {
426                _ = context.cancelled() => {
427                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
428                    break;
429                }
430                _ = interval.tick() => {
431                    if let Err(e) = poll_directory(
432                        &config,
433                        &context,
434                        &include_re,
435                        &exclude_re,
436                        &mut self.seen,
437                    ).await {
438                        warn!(directory = config.directory, error = %e, "Error polling directory");
439                    }
440                }
441            }
442        }
443
444        Ok(())
445    }
446
447    async fn stop(&mut self) -> Result<(), CamelError> {
448        Ok(())
449    }
450}
451
452async fn poll_directory(
453    config: &FileConfig,
454    context: &ConsumerContext,
455    include_re: &Option<Regex>,
456    exclude_re: &Option<Regex>,
457    seen: &mut HashSet<PathBuf>,
458) -> Result<(), CamelError> {
459    let base_path = std::path::Path::new(&config.directory);
460
461    let files = list_files(base_path, config.recursive).await?;
462
463    for file_path in files {
464        let file_name = file_path
465            .file_name()
466            .and_then(|n| n.to_str())
467            .unwrap_or_default()
468            .to_string();
469
470        if let Some(ref target_name) = config.file_name
471            && file_name != *target_name
472        {
473            continue;
474        }
475
476        if let Some(re) = include_re
477            && !re.is_match(&file_name)
478        {
479            continue;
480        }
481
482        if let Some(re) = exclude_re
483            && re.is_match(&file_name)
484        {
485            continue;
486        }
487
488        if let Some(ref move_dir) = config.move_to
489            && file_path.starts_with(base_path.join(move_dir))
490        {
491            continue;
492        }
493
494        // Idempotent consumer: skip already-seen files when noop=true
495        if config.noop && seen.contains(&file_path) {
496            continue;
497        }
498
499        let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
500            let f = fs::File::open(&file_path).await?;
501            let m = f.metadata().await?;
502            Ok::<_, std::io::Error>((f, m))
503        })
504        .await
505        {
506            Ok(Ok((f, m))) => (f, Some(m)),
507            Ok(Err(e)) => {
508                warn!(
509                    file = %file_path.display(),
510                    error = %e,
511                    "Failed to open file"
512                );
513                continue;
514            }
515            Err(_) => {
516                warn!(
517                    file = %file_path.display(),
518                    timeout_ms = config.read_timeout.as_millis(),
519                    "Timeout opening file"
520                );
521                continue;
522            }
523        };
524
525        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
526        let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
527
528        let last_modified = metadata
529            .as_ref()
530            .and_then(|m| m.modified().ok())
531            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
532            .map(|d| d.as_millis() as u64)
533            .unwrap_or(0);
534
535        let relative_path = file_path
536            .strip_prefix(base_path)
537            .unwrap_or(&file_path)
538            .to_string_lossy()
539            .to_string();
540
541        let absolute_path = file_path
542            .canonicalize()
543            .unwrap_or_else(|_| file_path.clone())
544            .to_string_lossy()
545            .to_string();
546
547        let body = Body::Stream(StreamBody {
548            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
549            metadata: StreamMetadata {
550                size_hint: Some(file_len),
551                content_type: None,
552                origin: Some(absolute_path.clone()),
553            },
554        });
555
556        let mut exchange = Exchange::new(Message::new(body));
557        exchange
558            .input
559            .set_header("CamelFileName", serde_json::Value::String(relative_path));
560        exchange.input.set_header(
561            "CamelFileNameOnly",
562            serde_json::Value::String(file_name.clone()),
563        );
564        exchange.input.set_header(
565            "CamelFileAbsolutePath",
566            serde_json::Value::String(absolute_path),
567        );
568        exchange.input.set_header(
569            "CamelFileLength",
570            serde_json::Value::Number(file_len.into()),
571        );
572        exchange.input.set_header(
573            "CamelFileLastModified",
574            serde_json::Value::Number(last_modified.into()),
575        );
576
577        debug!(
578            file = %file_path.display(),
579            correlation_id = %exchange.correlation_id(),
580            "Processing file"
581        );
582
583        if context.send(exchange).await.is_err() {
584            break;
585        }
586
587        if config.noop {
588            seen.insert(file_path.clone());
589        }
590
591        if config.noop {
592            // Do nothing
593        } else if config.delete {
594            if let Err(e) = fs::remove_file(&file_path).await {
595                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
596            }
597        } else if let Some(ref move_dir) = config.move_to {
598            let target_dir = base_path.join(move_dir);
599            if let Err(e) = fs::create_dir_all(&target_dir).await {
600                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
601                continue;
602            }
603            let target_path = target_dir.join(&file_name);
604            if let Err(e) = fs::rename(&file_path, &target_path).await {
605                warn!(
606                    from = %file_path.display(),
607                    to = %target_path.display(),
608                    error = %e,
609                    "Failed to move file"
610                );
611            }
612        }
613    }
614
615    Ok(())
616}
617
618async fn list_files(
619    dir: &std::path::Path,
620    recursive: bool,
621) -> Result<Vec<std::path::PathBuf>, CamelError> {
622    let mut files = Vec::new();
623    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
624
625    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
626        let path = entry.path();
627        if path.is_file() {
628            files.push(path);
629        } else if path.is_dir() && recursive {
630            let mut sub_files = Box::pin(list_files(&path, true)).await?;
631            files.append(&mut sub_files);
632        }
633    }
634
635    files.sort();
636    Ok(files)
637}
638
639// ---------------------------------------------------------------------------
640// Path validation for security
641// ---------------------------------------------------------------------------
642
643fn validate_path_is_within_base(
644    base_dir: &std::path::Path,
645    target_path: &std::path::Path,
646) -> Result<(), CamelError> {
647    let canonical_base = base_dir.canonicalize().map_err(|e| {
648        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
649    })?;
650
651    // For non-existent paths, canonicalize the parent and construct the full path
652    let canonical_target = if target_path.exists() {
653        target_path.canonicalize().map_err(|e| {
654            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
655        })?
656    } else if let Some(parent) = target_path.parent() {
657        // Ensure parent exists (should have been created by auto_create)
658        if !parent.exists() {
659            return Err(CamelError::ProcessorError(format!(
660                "Parent directory '{}' does not exist",
661                parent.display()
662            )));
663        }
664        let canonical_parent = parent.canonicalize().map_err(|e| {
665            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
666        })?;
667        // Reconstruct the full path with the filename
668        if let Some(filename) = target_path.file_name() {
669            canonical_parent.join(filename)
670        } else {
671            return Err(CamelError::ProcessorError(
672                "Invalid target path: no filename".to_string(),
673            ));
674        }
675    } else {
676        return Err(CamelError::ProcessorError(
677            "Invalid target path: no parent directory".to_string(),
678        ));
679    };
680
681    if !canonical_target.starts_with(&canonical_base) {
682        return Err(CamelError::ProcessorError(format!(
683            "Path '{}' is outside base directory '{}'",
684            canonical_target.display(),
685            canonical_base.display()
686        )));
687    }
688
689    Ok(())
690}
691
692// ---------------------------------------------------------------------------
693// FileProducer
694// ---------------------------------------------------------------------------
695
696#[derive(Clone)]
697struct FileProducer {
698    config: FileConfig,
699}
700
701impl FileProducer {
702    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
703        let raw = if let Some(name) = exchange
704            .input
705            .header("CamelFileName")
706            .and_then(|v| v.as_str())
707        {
708            Some(name.to_string())
709        } else {
710            config.file_name.clone()
711        };
712
713        match raw {
714            Some(name) if name.contains("${") => {
715                let lang = SimpleLanguage::new();
716                let expr = lang.create_expression(&name).map_err(|e| {
717                    CamelError::ProcessorError(format!(
718                        "cannot parse fileName expression '{}': {e}",
719                        name
720                    ))
721                })?;
722                let val = expr.evaluate(exchange).map_err(|e| {
723                    CamelError::ProcessorError(format!(
724                        "cannot evaluate fileName expression '{}': {e}",
725                        name
726                    ))
727                })?;
728                match val {
729                    serde_json::Value::String(s) => Ok(s),
730                    other => Ok(other.to_string()),
731                }
732            }
733            Some(name) => Ok(name),
734            None => Err(CamelError::ProcessorError(
735                "No filename specified: set CamelFileName header or fileName option".to_string(),
736            )),
737        }
738    }
739}
740
741impl Service<Exchange> for FileProducer {
742    type Response = Exchange;
743    type Error = CamelError;
744    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
745
746    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
747        Poll::Ready(Ok(()))
748    }
749
750    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
751        let config = self.config.clone();
752
753        Box::pin(async move {
754            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
755            let body = exchange.input.body.clone();
756
757            let dir_path = std::path::Path::new(&config.directory);
758            let target_path = dir_path.join(&file_name);
759
760            // 1. Auto-create directories
761            if config.auto_create
762                && let Some(parent) = target_path.parent()
763            {
764                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
765                    .await
766                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
767                    .map_err(CamelError::from)?;
768            }
769
770            // 2. Security: validate path is within base directory
771            validate_path_is_within_base(dir_path, &target_path)?;
772
773            // 3. Handle file-exist strategy
774            match config.file_exist {
775                FileExistStrategy::Fail if target_path.exists() => {
776                    return Err(CamelError::ProcessorError(format!(
777                        "File already exists: {}",
778                        target_path.display()
779                    )));
780                }
781                FileExistStrategy::Append => {
782                    // Append: write directly without temp file (append is inherently non-atomic)
783                    let mut file = tokio::time::timeout(
784                        config.write_timeout,
785                        OpenOptions::new()
786                            .append(true)
787                            .create(true)
788                            .open(&target_path),
789                    )
790                    .await
791                    .map_err(|_| {
792                        CamelError::ProcessorError("Timeout opening file for append".into())
793                    })?
794                    .map_err(CamelError::from)?;
795
796                    tokio::time::timeout(
797                        config.write_timeout,
798                        io::copy(&mut body.into_async_read(), &mut file),
799                    )
800                    .await
801                    .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
802                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
803
804                    file.flush().await.map_err(CamelError::from)?;
805                }
806                _ => {
807                    // Override (or Fail when file doesn't exist): always atomic via temp file
808                    let temp_name = if let Some(ref prefix) = config.temp_prefix {
809                        format!("{prefix}{file_name}")
810                    } else {
811                        format!(".tmp.{file_name}")
812                    };
813                    let temp_path = dir_path.join(&temp_name);
814
815                    // RAII guard ensures cleanup even on panic
816                    let mut guard = TempFileGuard::new(temp_path.clone());
817
818                    // Write to temp file
819                    let mut file =
820                        tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
821                            .await
822                            .map_err(|_| {
823                                CamelError::ProcessorError("Timeout creating temp file".into())
824                            })?
825                            .map_err(CamelError::from)?;
826
827                    let copy_result = tokio::time::timeout(
828                        config.write_timeout,
829                        io::copy(&mut body.into_async_read(), &mut file),
830                    )
831                    .await;
832
833                    // Flush any kernel buffers (best-effort; actual write errors come from io::copy above)
834                    let _ = file.flush().await;
835
836                    match copy_result {
837                        Ok(Ok(_)) => {}
838                        Ok(Err(e)) => {
839                            // Guard will clean up temp file on drop
840                            return Err(CamelError::ProcessorError(e.to_string()));
841                        }
842                        Err(_) => {
843                            // Guard will clean up temp file on drop
844                            return Err(CamelError::ProcessorError("Timeout writing file".into()));
845                        }
846                    }
847
848                    // Atomic rename: temp → target
849                    let rename_result = tokio::time::timeout(
850                        config.write_timeout,
851                        fs::rename(&temp_path, &target_path),
852                    )
853                    .await;
854
855                    match rename_result {
856                        Ok(Ok(_)) => {
857                            // Success — disarm guard so it doesn't delete the renamed file
858                            guard.disarm();
859                        }
860                        Ok(Err(e)) => {
861                            // Guard will clean up temp file on drop
862                            return Err(CamelError::from(e));
863                        }
864                        Err(_) => {
865                            // Guard will clean up temp file on drop
866                            return Err(CamelError::ProcessorError("Timeout renaming file".into()));
867                        }
868                    }
869                }
870            }
871
872            // 4. Set output header
873            let abs_path = target_path
874                .canonicalize()
875                .unwrap_or_else(|_| target_path.clone())
876                .to_string_lossy()
877                .to_string();
878            exchange
879                .input
880                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
881
882            debug!(
883                file = %target_path.display(),
884                correlation_id = %exchange.correlation_id(),
885                "File written"
886            );
887
888            Ok(exchange)
889        })
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896    use bytes::Bytes;
897    use camel_component_api::NoOpComponentContext;
898    use std::time::Duration;
899    use tokio_util::sync::CancellationToken;
900
901    fn test_producer_ctx() -> ProducerContext {
902        ProducerContext::new()
903    }
904
905    #[test]
906    fn test_file_config_defaults() {
907        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
908        assert_eq!(config.directory, "/tmp/inbox");
909        assert_eq!(config.delay, Duration::from_millis(500));
910        assert_eq!(config.initial_delay, Duration::from_millis(1000));
911        assert!(!config.noop);
912        assert!(!config.delete);
913        assert_eq!(config.move_to, Some(".camel".to_string()));
914        assert!(config.file_name.is_none());
915        assert!(config.include.is_none());
916        assert!(config.exclude.is_none());
917        assert!(!config.recursive);
918        assert_eq!(config.file_exist, FileExistStrategy::Override);
919        assert!(config.temp_prefix.is_none());
920        assert!(config.auto_create);
921        // New timeout defaults
922        assert_eq!(config.read_timeout, Duration::from_secs(30));
923        assert_eq!(config.write_timeout, Duration::from_secs(30));
924    }
925
926    #[test]
927    fn test_file_config_consumer_options() {
928        let config = FileConfig::from_uri(
929            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
930        ).unwrap();
931        assert_eq!(config.directory, "/data/input");
932        assert_eq!(config.delay, Duration::from_millis(1000));
933        assert_eq!(config.initial_delay, Duration::from_millis(2000));
934        assert!(config.noop);
935        assert!(config.recursive);
936        assert_eq!(config.include, Some(".*\\.csv".to_string()));
937    }
938
939    #[test]
940    fn test_file_config_producer_options() {
941        let config = FileConfig::from_uri(
942            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
943        )
944        .unwrap();
945        assert_eq!(config.file_exist, FileExistStrategy::Append);
946        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
947        assert!(!config.auto_create);
948        assert_eq!(config.file_name, Some("out.txt".to_string()));
949    }
950
951    #[test]
952    fn test_file_config_delete_mode() {
953        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
954        assert!(config.delete);
955        assert!(config.move_to.is_none());
956    }
957
958    #[test]
959    fn test_file_config_noop_mode() {
960        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
961        assert!(config.noop);
962        assert!(config.move_to.is_none());
963    }
964
965    #[test]
966    fn test_file_config_wrong_scheme() {
967        let result = FileConfig::from_uri("timer:tick");
968        assert!(result.is_err());
969    }
970
971    #[test]
972    fn test_file_component_scheme() {
973        let component = FileComponent::new();
974        assert_eq!(component.scheme(), "file");
975    }
976
977    #[test]
978    fn test_file_component_creates_endpoint() {
979        let component = FileComponent::new();
980        let ctx = NoOpComponentContext;
981        let endpoint = component.create_endpoint("file:/tmp/test", &ctx);
982        assert!(endpoint.is_ok());
983    }
984
985    // -----------------------------------------------------------------------
986    // Consumer tests
987    // -----------------------------------------------------------------------
988
989    #[tokio::test]
990    async fn test_file_consumer_reads_files() {
991        let dir = tempfile::tempdir().unwrap();
992        let dir_path = dir.path().to_str().unwrap();
993
994        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
995        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
996
997        let component = FileComponent::new();
998        let ctx = NoOpComponentContext;
999        let endpoint = component
1000            .create_endpoint(
1001                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100"),
1002                &ctx,
1003            )
1004            .unwrap();
1005        let mut consumer = endpoint.create_consumer().unwrap();
1006
1007        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1008        let token = CancellationToken::new();
1009        let ctx = ConsumerContext::new(tx, token.clone());
1010
1011        tokio::spawn(async move {
1012            consumer.start(ctx).await.unwrap();
1013        });
1014
1015        let mut received = Vec::new();
1016        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
1017            while let Some(envelope) = rx.recv().await {
1018                received.push(envelope.exchange);
1019                if received.len() == 2 {
1020                    break;
1021                }
1022            }
1023        })
1024        .await;
1025        token.cancel();
1026
1027        assert!(timeout.is_ok(), "Should have received 2 exchanges");
1028        assert_eq!(received.len(), 2);
1029
1030        for ex in &received {
1031            assert!(ex.input.header("CamelFileName").is_some());
1032            assert!(ex.input.header("CamelFileNameOnly").is_some());
1033            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
1034            assert!(ex.input.header("CamelFileLength").is_some());
1035            assert!(ex.input.header("CamelFileLastModified").is_some());
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn noop_second_poll_does_not_re_emit_seen_files() {
1041        let dir = tempfile::tempdir().unwrap();
1042        let file_path = dir.path().join("test.txt");
1043        tokio::fs::write(&file_path, b"hello").await.unwrap();
1044
1045        let uri = format!(
1046            "file:{}?noop=true&initialDelay=0&delay=50",
1047            dir.path().display()
1048        );
1049        let config = FileConfig::from_uri(&uri).unwrap();
1050        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1051        let token = CancellationToken::new();
1052        let ctx = ConsumerContext::new(tx, token);
1053
1054        let include_re = None;
1055        let exclude_re = None;
1056        let mut seen = std::collections::HashSet::new();
1057
1058        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1059            .await
1060            .unwrap();
1061        assert!(rx.try_recv().is_ok(), "first poll should emit file");
1062        assert!(rx.try_recv().is_err(), "should only emit once");
1063
1064        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1065            .await
1066            .unwrap();
1067        assert!(
1068            rx.try_recv().is_err(),
1069            "second poll should not re-emit seen file"
1070        );
1071    }
1072
1073    #[tokio::test]
1074    async fn noop_new_files_picked_up_after_first_poll() {
1075        let dir = tempfile::tempdir().unwrap();
1076        let file1 = dir.path().join("a.txt");
1077        tokio::fs::write(&file1, b"a").await.unwrap();
1078
1079        let uri = format!(
1080            "file:{}?noop=true&initialDelay=0&delay=50",
1081            dir.path().display()
1082        );
1083        let config = FileConfig::from_uri(&uri).unwrap();
1084        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1085        let token = CancellationToken::new();
1086        let ctx = ConsumerContext::new(tx, token);
1087
1088        let include_re = None;
1089        let exclude_re = None;
1090        let mut seen = std::collections::HashSet::new();
1091
1092        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1093            .await
1094            .unwrap();
1095        let _ = rx.try_recv();
1096
1097        let file2 = dir.path().join("b.txt");
1098        tokio::fs::write(&file2, b"b").await.unwrap();
1099
1100        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1101            .await
1102            .unwrap();
1103        assert!(
1104            rx.try_recv().is_ok(),
1105            "b.txt should be emitted on second poll"
1106        );
1107        assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1108    }
1109
1110    #[tokio::test]
1111    async fn test_file_consumer_include_filter() {
1112        let dir = tempfile::tempdir().unwrap();
1113        let dir_path = dir.path().to_str().unwrap();
1114
1115        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1116        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1117
1118        let component = FileComponent::new();
1119        let ctx = NoOpComponentContext;
1120        let endpoint = component
1121            .create_endpoint(
1122                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"),
1123                &ctx,
1124            )
1125            .unwrap();
1126        let mut consumer = endpoint.create_consumer().unwrap();
1127
1128        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1129        let token = CancellationToken::new();
1130        let ctx = ConsumerContext::new(tx, token.clone());
1131
1132        tokio::spawn(async move {
1133            consumer.start(ctx).await.unwrap();
1134        });
1135
1136        let mut received = Vec::new();
1137        let _ = tokio::time::timeout(Duration::from_millis(500), async {
1138            while let Some(envelope) = rx.recv().await {
1139                received.push(envelope.exchange);
1140                if received.len() == 1 {
1141                    break;
1142                }
1143            }
1144        })
1145        .await;
1146        token.cancel();
1147
1148        assert_eq!(received.len(), 1);
1149        let name = received[0]
1150            .input
1151            .header("CamelFileNameOnly")
1152            .and_then(|v| v.as_str())
1153            .unwrap();
1154        assert_eq!(name, "data.csv");
1155    }
1156
1157    #[tokio::test]
1158    async fn test_file_consumer_delete_mode() {
1159        let dir = tempfile::tempdir().unwrap();
1160        let dir_path = dir.path().to_str().unwrap();
1161
1162        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1163
1164        let component = FileComponent::new();
1165        let ctx = NoOpComponentContext;
1166        let endpoint = component
1167            .create_endpoint(
1168                &format!("file:{dir_path}?delete=true&initialDelay=0&delay=100"),
1169                &ctx,
1170            )
1171            .unwrap();
1172        let mut consumer = endpoint.create_consumer().unwrap();
1173
1174        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1175        let token = CancellationToken::new();
1176        let ctx = ConsumerContext::new(tx, token.clone());
1177
1178        tokio::spawn(async move {
1179            consumer.start(ctx).await.unwrap();
1180        });
1181
1182        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1183        token.cancel();
1184
1185        tokio::time::sleep(Duration::from_millis(100)).await;
1186
1187        assert!(
1188            !dir.path().join("deleteme.txt").exists(),
1189            "File should be deleted"
1190        );
1191    }
1192
1193    #[tokio::test]
1194    async fn test_file_consumer_move_mode() {
1195        let dir = tempfile::tempdir().unwrap();
1196        let dir_path = dir.path().to_str().unwrap();
1197
1198        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1199
1200        let component = FileComponent::new();
1201        let ctx = NoOpComponentContext;
1202        let endpoint = component
1203            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"), &ctx)
1204            .unwrap();
1205        let mut consumer = endpoint.create_consumer().unwrap();
1206
1207        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1208        let token = CancellationToken::new();
1209        let ctx = ConsumerContext::new(tx, token.clone());
1210
1211        tokio::spawn(async move {
1212            consumer.start(ctx).await.unwrap();
1213        });
1214
1215        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1216        token.cancel();
1217
1218        tokio::time::sleep(Duration::from_millis(100)).await;
1219
1220        assert!(
1221            !dir.path().join("moveme.txt").exists(),
1222            "Original file should be gone"
1223        );
1224        assert!(
1225            dir.path().join(".camel").join("moveme.txt").exists(),
1226            "File should be in .camel/"
1227        );
1228    }
1229
1230    #[tokio::test]
1231    async fn test_file_consumer_respects_cancellation() {
1232        let dir = tempfile::tempdir().unwrap();
1233        let dir_path = dir.path().to_str().unwrap();
1234
1235        let component = FileComponent::new();
1236        let ctx = NoOpComponentContext;
1237        let endpoint = component
1238            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"), &ctx)
1239            .unwrap();
1240        let mut consumer = endpoint.create_consumer().unwrap();
1241
1242        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1243        let token = CancellationToken::new();
1244        let ctx = ConsumerContext::new(tx, token.clone());
1245
1246        let handle = tokio::spawn(async move {
1247            consumer.start(ctx).await.unwrap();
1248        });
1249
1250        tokio::time::sleep(Duration::from_millis(150)).await;
1251        token.cancel();
1252
1253        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1254        assert!(
1255            result.is_ok(),
1256            "Consumer should have stopped after cancellation"
1257        );
1258    }
1259
1260    // -----------------------------------------------------------------------
1261    // Producer tests
1262    // -----------------------------------------------------------------------
1263
1264    #[tokio::test]
1265    async fn test_file_producer_writes_file() {
1266        use tower::ServiceExt;
1267
1268        let dir = tempfile::tempdir().unwrap();
1269        let dir_path = dir.path().to_str().unwrap();
1270
1271        let component = FileComponent::new();
1272        let ctx = NoOpComponentContext;
1273        let endpoint = component
1274            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1275            .unwrap();
1276        let ctx = test_producer_ctx();
1277        let producer = endpoint.create_producer(&ctx).unwrap();
1278
1279        let mut exchange = Exchange::new(Message::new("file content"));
1280        exchange.input.set_header(
1281            "CamelFileName",
1282            serde_json::Value::String("output.txt".to_string()),
1283        );
1284
1285        let result = producer.oneshot(exchange).await.unwrap();
1286
1287        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1288        assert_eq!(content, "file content");
1289
1290        assert!(result.input.header("CamelFileNameProduced").is_some());
1291    }
1292
1293    #[tokio::test]
1294    async fn test_file_producer_auto_create_dirs() {
1295        use tower::ServiceExt;
1296
1297        let dir = tempfile::tempdir().unwrap();
1298        let dir_path = dir.path().to_str().unwrap();
1299
1300        let component = FileComponent::new();
1301        let ctx = NoOpComponentContext;
1302        let endpoint = component
1303            .create_endpoint(&format!("file:{dir_path}/sub/dir"), &ctx)
1304            .unwrap();
1305        let ctx = test_producer_ctx();
1306        let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308        let mut exchange = Exchange::new(Message::new("nested"));
1309        exchange.input.set_header(
1310            "CamelFileName",
1311            serde_json::Value::String("file.txt".to_string()),
1312        );
1313
1314        producer.oneshot(exchange).await.unwrap();
1315
1316        assert!(dir.path().join("sub/dir/file.txt").exists());
1317    }
1318
1319    #[tokio::test]
1320    async fn test_file_producer_file_exist_fail() {
1321        use tower::ServiceExt;
1322
1323        let dir = tempfile::tempdir().unwrap();
1324        let dir_path = dir.path().to_str().unwrap();
1325
1326        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1327
1328        let component = FileComponent::new();
1329        let ctx = NoOpComponentContext;
1330        let endpoint = component
1331            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"), &ctx)
1332            .unwrap();
1333        let ctx = test_producer_ctx();
1334        let producer = endpoint.create_producer(&ctx).unwrap();
1335
1336        let mut exchange = Exchange::new(Message::new("new"));
1337        exchange.input.set_header(
1338            "CamelFileName",
1339            serde_json::Value::String("existing.txt".to_string()),
1340        );
1341
1342        let result = producer.oneshot(exchange).await;
1343        assert!(
1344            result.is_err(),
1345            "Should fail when file exists with Fail strategy"
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn test_file_producer_file_exist_append() {
1351        use tower::ServiceExt;
1352
1353        let dir = tempfile::tempdir().unwrap();
1354        let dir_path = dir.path().to_str().unwrap();
1355
1356        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1357
1358        let component = FileComponent::new();
1359        let ctx = NoOpComponentContext;
1360        let endpoint = component
1361            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"), &ctx)
1362            .unwrap();
1363        let ctx = test_producer_ctx();
1364        let producer = endpoint.create_producer(&ctx).unwrap();
1365
1366        let mut exchange = Exchange::new(Message::new("new"));
1367        exchange.input.set_header(
1368            "CamelFileName",
1369            serde_json::Value::String("append.txt".to_string()),
1370        );
1371
1372        producer.oneshot(exchange).await.unwrap();
1373
1374        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1375        assert_eq!(content, "oldnew");
1376    }
1377
1378    #[tokio::test]
1379    async fn test_file_producer_temp_prefix() {
1380        use tower::ServiceExt;
1381
1382        let dir = tempfile::tempdir().unwrap();
1383        let dir_path = dir.path().to_str().unwrap();
1384
1385        let component = FileComponent::new();
1386        let ctx = NoOpComponentContext;
1387        let endpoint = component
1388            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"), &ctx)
1389            .unwrap();
1390        let ctx = test_producer_ctx();
1391        let producer = endpoint.create_producer(&ctx).unwrap();
1392
1393        let mut exchange = Exchange::new(Message::new("atomic write"));
1394        exchange.input.set_header(
1395            "CamelFileName",
1396            serde_json::Value::String("final.txt".to_string()),
1397        );
1398
1399        producer.oneshot(exchange).await.unwrap();
1400
1401        assert!(dir.path().join("final.txt").exists());
1402        assert!(!dir.path().join(".tmpfinal.txt").exists());
1403        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1404        assert_eq!(content, "atomic write");
1405    }
1406
1407    #[tokio::test]
1408    async fn test_file_producer_uses_filename_option() {
1409        use tower::ServiceExt;
1410
1411        let dir = tempfile::tempdir().unwrap();
1412        let dir_path = dir.path().to_str().unwrap();
1413
1414        let component = FileComponent::new();
1415        let ctx = NoOpComponentContext;
1416        let endpoint = component
1417            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"), &ctx)
1418            .unwrap();
1419        let ctx = test_producer_ctx();
1420        let producer = endpoint.create_producer(&ctx).unwrap();
1421
1422        let exchange = Exchange::new(Message::new("content"));
1423
1424        producer.oneshot(exchange).await.unwrap();
1425        assert!(dir.path().join("fixed.txt").exists());
1426    }
1427
1428    #[tokio::test]
1429    async fn test_file_producer_no_filename_errors() {
1430        use tower::ServiceExt;
1431
1432        let dir = tempfile::tempdir().unwrap();
1433        let dir_path = dir.path().to_str().unwrap();
1434
1435        let component = FileComponent::new();
1436        let ctx = NoOpComponentContext;
1437        let endpoint = component
1438            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1439            .unwrap();
1440        let ctx = test_producer_ctx();
1441        let producer = endpoint.create_producer(&ctx).unwrap();
1442
1443        let exchange = Exchange::new(Message::new("content"));
1444
1445        let result = producer.oneshot(exchange).await;
1446        assert!(result.is_err(), "Should error when no filename is provided");
1447    }
1448
1449    // -----------------------------------------------------------------------
1450    // Security tests - Path traversal protection
1451    // -----------------------------------------------------------------------
1452
1453    #[tokio::test]
1454    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1455        use tower::ServiceExt;
1456
1457        let dir = tempfile::tempdir().unwrap();
1458        let dir_path = dir.path().to_str().unwrap();
1459
1460        // Create a subdirectory
1461        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1462        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1463
1464        let component = FileComponent::new();
1465        let ctx = NoOpComponentContext;
1466        let endpoint = component
1467            .create_endpoint(&format!("file:{dir_path}/subdir"), &ctx)
1468            .unwrap();
1469        let ctx = test_producer_ctx();
1470        let producer = endpoint.create_producer(&ctx).unwrap();
1471
1472        let mut exchange = Exchange::new(Message::new("malicious"));
1473        exchange.input.set_header(
1474            "CamelFileName",
1475            serde_json::Value::String("../secret.txt".to_string()),
1476        );
1477
1478        let result = producer.oneshot(exchange).await;
1479        assert!(result.is_err(), "Should reject path traversal attempt");
1480
1481        let err = result.unwrap_err();
1482        assert!(
1483            err.to_string().contains("outside"),
1484            "Error should mention path is outside base directory"
1485        );
1486    }
1487
1488    #[tokio::test]
1489    async fn test_file_producer_rejects_absolute_path_outside_base() {
1490        use tower::ServiceExt;
1491
1492        let dir = tempfile::tempdir().unwrap();
1493        let dir_path = dir.path().to_str().unwrap();
1494
1495        let component = FileComponent::new();
1496        let ctx = NoOpComponentContext;
1497        let endpoint = component
1498            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1499            .unwrap();
1500        let ctx = test_producer_ctx();
1501        let producer = endpoint.create_producer(&ctx).unwrap();
1502
1503        let mut exchange = Exchange::new(Message::new("malicious"));
1504        exchange.input.set_header(
1505            "CamelFileName",
1506            serde_json::Value::String("/etc/passwd".to_string()),
1507        );
1508
1509        let result = producer.oneshot(exchange).await;
1510        assert!(result.is_err(), "Should reject absolute path outside base");
1511    }
1512
1513    // -----------------------------------------------------------------------
1514    // Large file streaming tests
1515    // -----------------------------------------------------------------------
1516
1517    #[tokio::test]
1518    #[ignore] // Slow test - run with --ignored flag
1519    async fn test_large_file_streaming_constant_memory() {
1520        use std::io::Write;
1521        use tempfile::NamedTempFile;
1522
1523        // Create a 150MB file (larger than 100MB limit)
1524        let mut temp_file = NamedTempFile::new().unwrap();
1525        let file_size = 150 * 1024 * 1024; // 150MB
1526        let chunk = vec![b'X'; 1024 * 1024]; // 1MB chunk
1527
1528        for _ in 0..150 {
1529            temp_file.write_all(&chunk).unwrap();
1530        }
1531        temp_file.flush().unwrap();
1532
1533        let dir = temp_file.path().parent().unwrap();
1534        let dir_path = dir.to_str().unwrap();
1535        let file_name = temp_file
1536            .path()
1537            .file_name()
1538            .unwrap()
1539            .to_str()
1540            .unwrap()
1541            .to_string();
1542
1543        // Read file as stream (should succeed with lazy evaluation)
1544        let component = FileComponent::new();
1545        let component_ctx = NoOpComponentContext;
1546        let endpoint = component
1547            .create_endpoint(
1548                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1549                &component_ctx,
1550            )
1551            .unwrap();
1552        let mut consumer = endpoint.create_consumer().unwrap();
1553
1554        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1555        let token = CancellationToken::new();
1556        let ctx = ConsumerContext::new(tx, token.clone());
1557
1558        tokio::spawn(async move {
1559            let _ = consumer.start(ctx).await;
1560        });
1561
1562        let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1563            rx.recv().await.unwrap().exchange
1564        })
1565        .await
1566        .expect("Should receive exchange");
1567        token.cancel();
1568
1569        // Verify body is a stream (not materialized)
1570        assert!(matches!(exchange.input.body, Body::Stream(_)));
1571
1572        // Verify we can read metadata without consuming
1573        if let Body::Stream(ref stream_body) = exchange.input.body {
1574            assert!(stream_body.metadata.size_hint.is_some());
1575            let size = stream_body.metadata.size_hint.unwrap();
1576            assert_eq!(size, file_size as u64);
1577        }
1578
1579        // Materializing should fail (exceeds 100MB limit)
1580        if let Body::Stream(stream_body) = exchange.input.body {
1581            let body = Body::Stream(stream_body);
1582            let result = body.into_bytes(100 * 1024 * 1024).await;
1583            assert!(result.is_err());
1584        }
1585
1586        // But we CAN read chunks one at a time (simulating line-by-line processing)
1587        // This demonstrates lazy evaluation - we don't need to load entire file
1588        let component2 = FileComponent::new();
1589        let endpoint2 = component2
1590            .create_endpoint(
1591                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1592                &component_ctx,
1593            )
1594            .unwrap();
1595        let mut consumer2 = endpoint2.create_consumer().unwrap();
1596
1597        let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1598        let token2 = CancellationToken::new();
1599        let ctx2 = ConsumerContext::new(tx2, token2.clone());
1600
1601        tokio::spawn(async move {
1602            let _ = consumer2.start(ctx2).await;
1603        });
1604
1605        let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1606            rx2.recv().await.unwrap().exchange
1607        })
1608        .await
1609        .expect("Should receive exchange");
1610        token2.cancel();
1611
1612        if let Body::Stream(stream_body) = exchange2.input.body {
1613            let mut stream_lock = stream_body.stream.lock().await;
1614            let mut stream = stream_lock.take().unwrap();
1615
1616            // Read first chunk (size varies based on ReaderStream's buffer)
1617            if let Some(chunk_result) = stream.next().await {
1618                let chunk = chunk_result.unwrap();
1619                assert!(!chunk.is_empty());
1620                assert!(chunk.len() < file_size);
1621                // Memory usage is constant - we only have this chunk in memory, not 150MB
1622            }
1623        }
1624    }
1625
1626    // -----------------------------------------------------------------------
1627    // Streaming producer tests
1628    // -----------------------------------------------------------------------
1629
1630    #[tokio::test]
1631    async fn test_producer_writes_stream_body() {
1632        let dir = tempfile::tempdir().unwrap();
1633        let dir_path = dir.path().to_str().unwrap();
1634        let uri = format!("file:{dir_path}?fileName=out.txt");
1635
1636        let component = FileComponent::new();
1637        let ctx = NoOpComponentContext;
1638        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1639        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1640
1641        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1642            Ok(Bytes::from("hello ")),
1643            Ok(Bytes::from("streaming ")),
1644            Ok(Bytes::from("world")),
1645        ];
1646        let stream = futures::stream::iter(chunks);
1647        let body = Body::Stream(StreamBody {
1648            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1649            metadata: StreamMetadata {
1650                size_hint: None,
1651                content_type: None,
1652                origin: None,
1653            },
1654        });
1655
1656        let exchange = Exchange::new(Message::new(body));
1657        tower::ServiceExt::oneshot(producer, exchange)
1658            .await
1659            .unwrap();
1660
1661        let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1662            .await
1663            .unwrap();
1664        assert_eq!(content, "hello streaming world");
1665    }
1666
1667    #[tokio::test]
1668    async fn test_producer_stream_atomic_no_partial_on_error() {
1669        // If the stream errors mid-write, no file should exist at the target path
1670        let dir = tempfile::tempdir().unwrap();
1671        let dir_path = dir.path().to_str().unwrap();
1672        let uri = format!("file:{dir_path}?fileName=out.txt");
1673
1674        let component = FileComponent::new();
1675        let ctx = NoOpComponentContext;
1676        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1677        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1678
1679        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1680            Ok(Bytes::from("partial")),
1681            Err(CamelError::ProcessorError(
1682                "simulated stream error".to_string(),
1683            )),
1684        ];
1685        let stream = futures::stream::iter(chunks);
1686        let body = Body::Stream(StreamBody {
1687            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1688            metadata: StreamMetadata {
1689                size_hint: None,
1690                content_type: None,
1691                origin: None,
1692            },
1693        });
1694
1695        let exchange = Exchange::new(Message::new(body));
1696        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1697        assert!(
1698            result.is_err(),
1699            "expected error when stream fails mid-write"
1700        );
1701
1702        // Target file must NOT exist — write was aborted and temp file cleaned up
1703        assert!(
1704            !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1705            "partial file must not exist after failed write"
1706        );
1707
1708        // Temp file must also be cleaned up
1709        assert!(
1710            !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1711            "temp file must be cleaned up after failed write"
1712        );
1713    }
1714
1715    #[tokio::test]
1716    async fn test_producer_stream_append() {
1717        let dir = tempfile::tempdir().unwrap();
1718        let dir_path = dir.path().to_str().unwrap();
1719        let target = format!("{dir_path}/out.txt");
1720
1721        // Pre-create file with initial content
1722        tokio::fs::write(&target, b"line1\n").await.unwrap();
1723
1724        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1725        let component = FileComponent::new();
1726        let ctx = NoOpComponentContext;
1727        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1728        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1729
1730        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1731        let stream = futures::stream::iter(chunks);
1732        let body = Body::Stream(StreamBody {
1733            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1734            metadata: StreamMetadata {
1735                size_hint: None,
1736                content_type: None,
1737                origin: None,
1738            },
1739        });
1740
1741        let exchange = Exchange::new(Message::new(body));
1742        tower::ServiceExt::oneshot(producer, exchange)
1743            .await
1744            .unwrap();
1745
1746        let content = tokio::fs::read_to_string(&target).await.unwrap();
1747        assert_eq!(content, "line1\nline2\n");
1748    }
1749
1750    #[tokio::test]
1751    async fn test_producer_stream_append_partial_on_error() {
1752        // Append is inherently non-atomic: if the stream errors mid-write,
1753        // the file will contain partial data. This test documents that behavior.
1754        let dir = tempfile::tempdir().unwrap();
1755        let dir_path = dir.path().to_str().unwrap();
1756        let target = format!("{dir_path}/out.txt");
1757
1758        // Pre-create file with initial content
1759        tokio::fs::write(&target, b"initial\n").await.unwrap();
1760
1761        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1762        let component = FileComponent::new();
1763        let ctx = NoOpComponentContext;
1764        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1765        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1766
1767        // Stream with an error in the middle
1768        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1769            Ok(Bytes::from("partial-")), // This will be written
1770            Err(CamelError::ProcessorError("stream error".to_string())), // This causes failure
1771            Ok(Bytes::from("never-written")), // This won't be reached
1772        ];
1773        let stream = futures::stream::iter(chunks);
1774        let body = Body::Stream(StreamBody {
1775            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1776            metadata: StreamMetadata {
1777                size_hint: None,
1778                content_type: None,
1779                origin: None,
1780            },
1781        });
1782
1783        let exchange = Exchange::new(Message::new(body));
1784        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1785
1786        // 1. Producer must return an error
1787        assert!(
1788            result.is_err(),
1789            "expected error when stream fails during append"
1790        );
1791
1792        // 2. File must contain initial content + partial data written before the error
1793        let content = tokio::fs::read_to_string(&target).await.unwrap();
1794        assert_eq!(
1795            content, "initial\npartial-",
1796            "append leaves partial data on stream error (non-atomic by nature)"
1797        );
1798    }
1799
1800    #[tokio::test]
1801    async fn test_producer_stream_already_consumed_errors() {
1802        let dir = tempfile::tempdir().unwrap();
1803        let dir_path = dir.path().to_str().unwrap();
1804        let uri = format!("file:{dir_path}?fileName=out.txt");
1805
1806        let component = FileComponent::new();
1807        let ctx = NoOpComponentContext;
1808        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1809        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1810
1811        // Mutex holds None -> stream already consumed
1812        type MaybeStream = std::sync::Arc<
1813            tokio::sync::Mutex<
1814                Option<
1815                    std::pin::Pin<
1816                        Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1817                    >,
1818                >,
1819            >,
1820        >;
1821        let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1822        let body = Body::Stream(StreamBody {
1823            stream: arc,
1824            metadata: StreamMetadata {
1825                size_hint: None,
1826                content_type: None,
1827                origin: None,
1828            },
1829        });
1830
1831        let exchange = Exchange::new(Message::new(body));
1832        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1833        assert!(
1834            result.is_err(),
1835            "expected error for already-consumed stream"
1836        );
1837    }
1838
1839    // -----------------------------------------------------------------------
1840    // GlobalConfig tests - apply_global_defaults behavior
1841    // -----------------------------------------------------------------------
1842
1843    #[test]
1844    fn test_global_config_applied_to_endpoint() {
1845        // Global config with non-default values
1846        let global = FileGlobalConfig::default()
1847            .with_delay_ms(2000)
1848            .with_initial_delay_ms(5000)
1849            .with_read_timeout_ms(60_000)
1850            .with_write_timeout_ms(45_000);
1851        let component = FileComponent::with_config(global);
1852        let ctx = NoOpComponentContext;
1853        // URI uses no explicit delay/timeout params → macro defaults apply
1854        let endpoint = component.create_endpoint("file:/tmp/inbox", &ctx).unwrap();
1855        // We cannot call endpoint.config directly (FileEndpoint is private),
1856        // but we can test apply_global_defaults on FileConfig directly:
1857        let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1858        let global2 = FileGlobalConfig::default()
1859            .with_delay_ms(2000)
1860            .with_initial_delay_ms(5000)
1861            .with_read_timeout_ms(60_000)
1862            .with_write_timeout_ms(45_000);
1863        config.apply_global_defaults(&global2);
1864        assert_eq!(config.delay, Duration::from_millis(2000));
1865        assert_eq!(config.initial_delay, Duration::from_millis(5000));
1866        assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1867        assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1868        // endpoint creation succeeds too
1869        let _ = endpoint; // just verify create_endpoint didn't fail
1870    }
1871
1872    #[test]
1873    fn test_uri_param_wins_over_global_config() {
1874        // URI explicitly sets delay=1000 (NOT the 500ms macro default)
1875        let mut config =
1876            FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1877        // Global config would want 3000ms delay
1878        let global = FileGlobalConfig::default()
1879            .with_delay_ms(3000)
1880            .with_initial_delay_ms(4000);
1881        config.apply_global_defaults(&global);
1882        // URI value of 1000ms must be preserved (not replaced by 3000ms)
1883        assert_eq!(config.delay, Duration::from_millis(1000));
1884        // URI value of 2000ms must be preserved (not replaced by 4000ms)
1885        assert_eq!(config.initial_delay, Duration::from_millis(2000));
1886        // read_timeout was not set by URI → macro default (30000) → global wins if different
1887        // (read_timeout stays at 30000 since global has same default = 30000)
1888        assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1889    }
1890
1891    #[tokio::test]
1892    async fn test_file_producer_filename_simple_language_from_header() {
1893        use tower::ServiceExt;
1894
1895        let dir = tempfile::tempdir().unwrap();
1896        let dir_path = dir.path().to_str().unwrap();
1897
1898        let component = FileComponent::new();
1899        let ctx = NoOpComponentContext;
1900        let endpoint = component
1901            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1902            .unwrap();
1903        let ctx = test_producer_ctx();
1904        let producer = endpoint.create_producer(&ctx).unwrap();
1905
1906        let mut exchange = Exchange::new(Message::new("content"));
1907        exchange
1908            .input
1909            .set_header("CamelTimerCounter", serde_json::Value::Number(42.into()));
1910        exchange.input.set_header(
1911            "CamelFileName",
1912            serde_json::Value::String("test-${header.CamelTimerCounter}.txt".to_string()),
1913        );
1914
1915        producer.oneshot(exchange).await.unwrap();
1916
1917        assert!(
1918            dir.path().join("test-42.txt").exists(),
1919            "fileName should have been evaluated from Simple Language expression"
1920        );
1921        let content = std::fs::read_to_string(dir.path().join("test-42.txt")).unwrap();
1922        assert_eq!(content, "content");
1923    }
1924
1925    #[tokio::test]
1926    async fn test_file_producer_filename_simple_language_from_uri_param() {
1927        use tower::ServiceExt;
1928
1929        let dir = tempfile::tempdir().unwrap();
1930        let dir_path = dir.path().to_str().unwrap();
1931
1932        let component = FileComponent::new();
1933        let ctx = NoOpComponentContext;
1934        let endpoint = component
1935            .create_endpoint(
1936                &format!("file:{dir_path}?fileName=msg-${{header.id}}.dat"),
1937                &ctx,
1938            )
1939            .unwrap();
1940        let ctx = test_producer_ctx();
1941        let producer = endpoint.create_producer(&ctx).unwrap();
1942
1943        let mut exchange = Exchange::new(Message::new("data"));
1944        exchange
1945            .input
1946            .set_header("id", serde_json::Value::String("abc".to_string()));
1947
1948        producer.oneshot(exchange).await.unwrap();
1949
1950        assert!(
1951            dir.path().join("msg-abc.dat").exists(),
1952            "fileName URI param should have been evaluated from Simple Language expression"
1953        );
1954    }
1955
1956    #[tokio::test]
1957    async fn test_file_producer_filename_literal_without_expression() {
1958        use tower::ServiceExt;
1959
1960        let dir = tempfile::tempdir().unwrap();
1961        let dir_path = dir.path().to_str().unwrap();
1962
1963        let component = FileComponent::new();
1964        let ctx = NoOpComponentContext;
1965        let endpoint = component
1966            .create_endpoint(&format!("file:{dir_path}?fileName=plain.txt"), &ctx)
1967            .unwrap();
1968        let ctx = test_producer_ctx();
1969        let producer = endpoint.create_producer(&ctx).unwrap();
1970
1971        let exchange = Exchange::new(Message::new("data"));
1972        producer.oneshot(exchange).await.unwrap();
1973
1974        assert!(
1975            dir.path().join("plain.txt").exists(),
1976            "literal fileName without expressions should still work"
1977        );
1978    }
1979}