Skip to main content

camel_component_file/
lib.rs

1pub mod bundle;
2
3pub use bundle::FileBundle;
4
5use std::collections::HashSet;
6use std::future::Future;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use futures::StreamExt;
15use regex::Regex;
16use tokio::fs;
17use tokio::fs::OpenOptions;
18use tokio::io;
19use tokio::io::AsyncWriteExt;
20use tokio::time;
21use tokio_util::io::ReaderStream;
22use tower::Service;
23use tracing::{debug, warn};
24
25use camel_component_api::{
26    Body, BoxProcessor, CamelError, Exchange, Message, StreamBody, StreamMetadata,
27};
28use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
29use camel_component_api::{UriConfig, parse_uri};
30
31// ---------------------------------------------------------------------------
32// TempFileGuard — RAII cleanup for temp files (panic-safe)
33// ---------------------------------------------------------------------------
34
35/// RAII guard that ensures temp file cleanup even on panic.
36///
37/// When dropped, removes the file at `path` unless `disarm` is set to true.
38/// This protects against temp file leaks if `io::copy` panics mid-write.
39struct TempFileGuard {
40    path: PathBuf,
41    disarm: bool,
42}
43
44impl TempFileGuard {
45    fn new(path: PathBuf) -> Self {
46        Self {
47            path,
48            disarm: false,
49        }
50    }
51
52    /// Call after successful rename to prevent cleanup.
53    fn disarm(&mut self) {
54        self.disarm = true;
55    }
56}
57
58impl Drop for TempFileGuard {
59    fn drop(&mut self) {
60        if !self.disarm {
61            // Best-effort cleanup; ignore errors (file may not exist)
62            let _ = std::fs::remove_file(&self.path);
63        }
64    }
65}
66
67// ---------------------------------------------------------------------------
68// FileExistStrategy
69// ---------------------------------------------------------------------------
70
71/// Strategy for handling existing files when writing.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
73pub enum FileExistStrategy {
74    /// Overwrite existing file (default).
75    #[default]
76    Override,
77    /// Append to existing file.
78    Append,
79    /// Fail if file exists.
80    Fail,
81}
82
83impl FromStr for FileExistStrategy {
84    type Err = String;
85
86    fn from_str(s: &str) -> Result<Self, Self::Err> {
87        match s {
88            "Override" | "override" => Ok(FileExistStrategy::Override),
89            "Append" | "append" => Ok(FileExistStrategy::Append),
90            "Fail" | "fail" => Ok(FileExistStrategy::Fail),
91            _ => Ok(FileExistStrategy::Override), // Default for unknown values
92        }
93    }
94}
95
96// ---------------------------------------------------------------------------
97// FileGlobalConfig
98// ---------------------------------------------------------------------------
99
100/// Global configuration for File component.
101/// Supports serde deserialization with defaults and builder methods.
102/// These are the fallback defaults when URI params are not set.
103#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
104#[serde(default)]
105pub struct FileGlobalConfig {
106    pub delay_ms: u64,
107    pub initial_delay_ms: u64,
108    pub read_timeout_ms: u64,
109    pub write_timeout_ms: u64,
110}
111
112impl Default for FileGlobalConfig {
113    fn default() -> Self {
114        Self {
115            delay_ms: 500,
116            initial_delay_ms: 1_000,
117            read_timeout_ms: 30_000,
118            write_timeout_ms: 30_000,
119        }
120    }
121}
122
123impl FileGlobalConfig {
124    pub fn new() -> Self {
125        Self::default()
126    }
127    pub fn with_delay_ms(mut self, v: u64) -> Self {
128        self.delay_ms = v;
129        self
130    }
131    pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
132        self.initial_delay_ms = v;
133        self
134    }
135    pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
136        self.read_timeout_ms = v;
137        self
138    }
139    pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
140        self.write_timeout_ms = v;
141        self
142    }
143}
144
145// ---------------------------------------------------------------------------
146// FileConfig
147// ---------------------------------------------------------------------------
148
149/// Configuration for file component endpoints.
150///
151/// # Streaming
152///
153/// Both the file consumer and producer use **native streaming** with no RAM
154/// materialization:
155///
156/// - The **consumer** creates a `Body::Stream` backed by `tokio::fs::File` via
157///   `ReaderStream`. Files of any size are handled without loading them into memory.
158///
159/// - The **producer** writes via `tokio::io::copy` directly to a `tokio::fs::File`
160///   using `Body::into_async_read()`. Writes for the `Override` strategy are
161///   **atomic**: data is written to a temporary file first and renamed only on
162///   success, preventing partial files on failure.
163///
164/// # Write strategies (`fileExist` URI parameter)
165///
166/// | Value | Behavior |
167/// |-------|----------|
168/// | `Override` (default) | Atomic write via temp file + rename |
169/// | `Append` | Appends to existing file; non-atomic by nature |
170/// | `Fail` | Returns error if file already exists |
171#[derive(Debug, Clone, UriConfig)]
172#[uri_scheme = "file"]
173#[uri_config(skip_impl, crate = "camel_component_api")]
174pub struct FileConfig {
175    /// Directory path to read from or write to.
176    pub directory: String,
177
178    /// Polling delay in milliseconds (companion field for `delay`).
179    #[allow(dead_code)]
180    #[uri_param(name = "delay", default = "500")]
181    delay_ms: u64,
182
183    /// Polling delay as Duration.
184    pub delay: Duration,
185
186    /// Initial delay in milliseconds (companion field for `initial_delay`).
187    #[allow(dead_code)]
188    #[uri_param(name = "initialDelay", default = "1000")]
189    initial_delay_ms: u64,
190
191    /// Initial delay as Duration.
192    pub initial_delay: Duration,
193
194    /// If true, don't delete or move files after processing.
195    #[uri_param(default = "false")]
196    pub noop: bool,
197
198    /// If true, delete files after processing.
199    #[uri_param(default = "false")]
200    pub delete: bool,
201
202    /// Directory to move processed files to (only if not noop/delete).
203    /// Default is ".camel" when not specified and noop/delete are false.
204    #[uri_param(name = "move")]
205    move_to: Option<String>,
206
207    /// Fixed filename for producer (optional).
208    #[uri_param(name = "fileName")]
209    pub file_name: Option<String>,
210
211    /// Regex pattern for including files (consumer).
212    #[uri_param]
213    pub include: Option<String>,
214
215    /// Regex pattern for excluding files (consumer).
216    #[uri_param]
217    pub exclude: Option<String>,
218
219    /// Whether to scan directories recursively.
220    #[uri_param(default = "false")]
221    pub recursive: bool,
222
223    /// Strategy for handling existing files when writing.
224    #[uri_param(name = "fileExist", default = "Override")]
225    pub file_exist: FileExistStrategy,
226
227    /// Prefix for temporary files during atomic writes.
228    #[uri_param(name = "tempPrefix")]
229    pub temp_prefix: Option<String>,
230
231    /// Whether to automatically create directories.
232    #[uri_param(name = "autoCreate", default = "true")]
233    pub auto_create: bool,
234
235    /// Read timeout in milliseconds (companion field for `read_timeout`).
236    #[allow(dead_code)]
237    #[uri_param(name = "readTimeout", default = "30000")]
238    read_timeout_ms: u64,
239
240    /// Read timeout as Duration.
241    pub read_timeout: Duration,
242
243    /// Write timeout in milliseconds (companion field for `write_timeout`).
244    #[allow(dead_code)]
245    #[uri_param(name = "writeTimeout", default = "30000")]
246    write_timeout_ms: u64,
247
248    /// Write timeout as Duration.
249    pub write_timeout: Duration,
250}
251
252impl UriConfig for FileConfig {
253    fn scheme() -> &'static str {
254        "file"
255    }
256
257    fn from_uri(uri: &str) -> Result<Self, CamelError> {
258        let parts = parse_uri(uri)?;
259        Self::from_components(parts)
260    }
261
262    fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
263        Self::parse_uri_components(parts)?.validate()
264    }
265
266    fn validate(self) -> Result<Self, CamelError> {
267        // Apply conditional logic for move_to:
268        // - If noop or delete is true, move_to should be None
269        // - Otherwise, if move_to is None, default to ".camel"
270        let move_to = if self.noop || self.delete {
271            None
272        } else {
273            Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
274        };
275
276        Ok(Self { move_to, ..self })
277    }
278}
279
280impl FileConfig {
281    /// Apply global config defaults. Since FileConfig uses a proc macro that bakes in
282    /// defaults, we compare Duration values against the known macro defaults to detect
283    /// "not explicitly set by user". Only overrides when current value == macro default.
284    ///
285    /// **Note**: If a user explicitly sets a URI param to its default value (e.g.,
286    /// `?delay=500`), it is indistinguishable from "not set" and will be overridden
287    /// by global config. This is a known limitation of the Duration comparison approach.
288    pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
289        if self.delay == Duration::from_millis(500) {
290            self.delay = Duration::from_millis(global.delay_ms);
291        }
292        if self.initial_delay == Duration::from_millis(1_000) {
293            self.initial_delay = Duration::from_millis(global.initial_delay_ms);
294        }
295        if self.read_timeout == Duration::from_millis(30_000) {
296            self.read_timeout = Duration::from_millis(global.read_timeout_ms);
297        }
298        if self.write_timeout == Duration::from_millis(30_000) {
299            self.write_timeout = Duration::from_millis(global.write_timeout_ms);
300        }
301    }
302}
303
304// ---------------------------------------------------------------------------
305// FileComponent
306// ---------------------------------------------------------------------------
307
308pub struct FileComponent {
309    config: Option<FileGlobalConfig>,
310}
311
312impl FileComponent {
313    pub fn new() -> Self {
314        Self { config: None }
315    }
316
317    pub fn with_config(config: FileGlobalConfig) -> Self {
318        Self {
319            config: Some(config),
320        }
321    }
322
323    pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
324        Self { config }
325    }
326}
327
328impl Default for FileComponent {
329    fn default() -> Self {
330        Self::new()
331    }
332}
333
334impl Component for FileComponent {
335    fn scheme(&self) -> &str {
336        "file"
337    }
338
339    fn create_endpoint(
340        &self,
341        uri: &str,
342        _ctx: &dyn camel_component_api::ComponentContext,
343    ) -> Result<Box<dyn Endpoint>, CamelError> {
344        let mut config = FileConfig::from_uri(uri)?;
345        if let Some(ref global_config) = self.config {
346            config.apply_global_defaults(global_config);
347        }
348        Ok(Box::new(FileEndpoint {
349            uri: uri.to_string(),
350            config,
351        }))
352    }
353}
354
355// ---------------------------------------------------------------------------
356// FileEndpoint
357// ---------------------------------------------------------------------------
358
359struct FileEndpoint {
360    uri: String,
361    config: FileConfig,
362}
363
364impl Endpoint for FileEndpoint {
365    fn uri(&self) -> &str {
366        &self.uri
367    }
368
369    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
370        Ok(Box::new(FileConsumer {
371            config: self.config.clone(),
372            seen: HashSet::new(),
373        }))
374    }
375
376    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
377        Ok(BoxProcessor::new(FileProducer {
378            config: self.config.clone(),
379        }))
380    }
381}
382
383// ---------------------------------------------------------------------------
384// FileConsumer
385// ---------------------------------------------------------------------------
386
387struct FileConsumer {
388    config: FileConfig,
389    seen: HashSet<PathBuf>,
390}
391
392#[async_trait]
393impl Consumer for FileConsumer {
394    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
395        let config = self.config.clone();
396
397        let include_re = config
398            .include
399            .as_ref()
400            .map(|p| Regex::new(p))
401            .transpose()
402            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
403        let exclude_re = config
404            .exclude
405            .as_ref()
406            .map(|p| Regex::new(p))
407            .transpose()
408            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
409
410        if !config.initial_delay.is_zero() {
411            tokio::select! {
412                _ = time::sleep(config.initial_delay) => {}
413                _ = context.cancelled() => {
414                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
415                    return Ok(());
416                }
417            }
418        }
419
420        let mut interval = time::interval(config.delay);
421
422        loop {
423            tokio::select! {
424                _ = context.cancelled() => {
425                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
426                    break;
427                }
428                _ = interval.tick() => {
429                    if let Err(e) = poll_directory(
430                        &config,
431                        &context,
432                        &include_re,
433                        &exclude_re,
434                        &mut self.seen,
435                    ).await {
436                        warn!(directory = config.directory, error = %e, "Error polling directory");
437                    }
438                }
439            }
440        }
441
442        Ok(())
443    }
444
445    async fn stop(&mut self) -> Result<(), CamelError> {
446        Ok(())
447    }
448}
449
450async fn poll_directory(
451    config: &FileConfig,
452    context: &ConsumerContext,
453    include_re: &Option<Regex>,
454    exclude_re: &Option<Regex>,
455    seen: &mut HashSet<PathBuf>,
456) -> Result<(), CamelError> {
457    let base_path = std::path::Path::new(&config.directory);
458
459    let files = list_files(base_path, config.recursive).await?;
460
461    for file_path in files {
462        let file_name = file_path
463            .file_name()
464            .and_then(|n| n.to_str())
465            .unwrap_or_default()
466            .to_string();
467
468        if let Some(ref target_name) = config.file_name
469            && file_name != *target_name
470        {
471            continue;
472        }
473
474        if let Some(re) = include_re
475            && !re.is_match(&file_name)
476        {
477            continue;
478        }
479
480        if let Some(re) = exclude_re
481            && re.is_match(&file_name)
482        {
483            continue;
484        }
485
486        if let Some(ref move_dir) = config.move_to
487            && file_path.starts_with(base_path.join(move_dir))
488        {
489            continue;
490        }
491
492        // Idempotent consumer: skip already-seen files when noop=true
493        if config.noop && seen.contains(&file_path) {
494            continue;
495        }
496
497        let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
498            let f = fs::File::open(&file_path).await?;
499            let m = f.metadata().await?;
500            Ok::<_, std::io::Error>((f, m))
501        })
502        .await
503        {
504            Ok(Ok((f, m))) => (f, Some(m)),
505            Ok(Err(e)) => {
506                warn!(
507                    file = %file_path.display(),
508                    error = %e,
509                    "Failed to open file"
510                );
511                continue;
512            }
513            Err(_) => {
514                warn!(
515                    file = %file_path.display(),
516                    timeout_ms = config.read_timeout.as_millis(),
517                    "Timeout opening file"
518                );
519                continue;
520            }
521        };
522
523        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
524        let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
525
526        let last_modified = metadata
527            .as_ref()
528            .and_then(|m| m.modified().ok())
529            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
530            .map(|d| d.as_millis() as u64)
531            .unwrap_or(0);
532
533        let relative_path = file_path
534            .strip_prefix(base_path)
535            .unwrap_or(&file_path)
536            .to_string_lossy()
537            .to_string();
538
539        let absolute_path = file_path
540            .canonicalize()
541            .unwrap_or_else(|_| file_path.clone())
542            .to_string_lossy()
543            .to_string();
544
545        let body = Body::Stream(StreamBody {
546            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
547            metadata: StreamMetadata {
548                size_hint: Some(file_len),
549                content_type: None,
550                origin: Some(absolute_path.clone()),
551            },
552        });
553
554        let mut exchange = Exchange::new(Message::new(body));
555        exchange
556            .input
557            .set_header("CamelFileName", serde_json::Value::String(relative_path));
558        exchange.input.set_header(
559            "CamelFileNameOnly",
560            serde_json::Value::String(file_name.clone()),
561        );
562        exchange.input.set_header(
563            "CamelFileAbsolutePath",
564            serde_json::Value::String(absolute_path),
565        );
566        exchange.input.set_header(
567            "CamelFileLength",
568            serde_json::Value::Number(file_len.into()),
569        );
570        exchange.input.set_header(
571            "CamelFileLastModified",
572            serde_json::Value::Number(last_modified.into()),
573        );
574
575        debug!(
576            file = %file_path.display(),
577            correlation_id = %exchange.correlation_id(),
578            "Processing file"
579        );
580
581        if context.send(exchange).await.is_err() {
582            break;
583        }
584
585        if config.noop {
586            seen.insert(file_path.clone());
587        }
588
589        if config.noop {
590            // Do nothing
591        } else if config.delete {
592            if let Err(e) = fs::remove_file(&file_path).await {
593                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
594            }
595        } else if let Some(ref move_dir) = config.move_to {
596            let target_dir = base_path.join(move_dir);
597            if let Err(e) = fs::create_dir_all(&target_dir).await {
598                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
599                continue;
600            }
601            let target_path = target_dir.join(&file_name);
602            if let Err(e) = fs::rename(&file_path, &target_path).await {
603                warn!(
604                    from = %file_path.display(),
605                    to = %target_path.display(),
606                    error = %e,
607                    "Failed to move file"
608                );
609            }
610        }
611    }
612
613    Ok(())
614}
615
616async fn list_files(
617    dir: &std::path::Path,
618    recursive: bool,
619) -> Result<Vec<std::path::PathBuf>, CamelError> {
620    let mut files = Vec::new();
621    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
622
623    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
624        let path = entry.path();
625        if path.is_file() {
626            files.push(path);
627        } else if path.is_dir() && recursive {
628            let mut sub_files = Box::pin(list_files(&path, true)).await?;
629            files.append(&mut sub_files);
630        }
631    }
632
633    files.sort();
634    Ok(files)
635}
636
637// ---------------------------------------------------------------------------
638// Path validation for security
639// ---------------------------------------------------------------------------
640
641fn validate_path_is_within_base(
642    base_dir: &std::path::Path,
643    target_path: &std::path::Path,
644) -> Result<(), CamelError> {
645    let canonical_base = base_dir.canonicalize().map_err(|e| {
646        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
647    })?;
648
649    // For non-existent paths, canonicalize the parent and construct the full path
650    let canonical_target = if target_path.exists() {
651        target_path.canonicalize().map_err(|e| {
652            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
653        })?
654    } else if let Some(parent) = target_path.parent() {
655        // Ensure parent exists (should have been created by auto_create)
656        if !parent.exists() {
657            return Err(CamelError::ProcessorError(format!(
658                "Parent directory '{}' does not exist",
659                parent.display()
660            )));
661        }
662        let canonical_parent = parent.canonicalize().map_err(|e| {
663            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
664        })?;
665        // Reconstruct the full path with the filename
666        if let Some(filename) = target_path.file_name() {
667            canonical_parent.join(filename)
668        } else {
669            return Err(CamelError::ProcessorError(
670                "Invalid target path: no filename".to_string(),
671            ));
672        }
673    } else {
674        return Err(CamelError::ProcessorError(
675            "Invalid target path: no parent directory".to_string(),
676        ));
677    };
678
679    if !canonical_target.starts_with(&canonical_base) {
680        return Err(CamelError::ProcessorError(format!(
681            "Path '{}' is outside base directory '{}'",
682            canonical_target.display(),
683            canonical_base.display()
684        )));
685    }
686
687    Ok(())
688}
689
690// ---------------------------------------------------------------------------
691// FileProducer
692// ---------------------------------------------------------------------------
693
694#[derive(Clone)]
695struct FileProducer {
696    config: FileConfig,
697}
698
699impl FileProducer {
700    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
701        if let Some(name) = exchange
702            .input
703            .header("CamelFileName")
704            .and_then(|v| v.as_str())
705        {
706            return Ok(name.to_string());
707        }
708        if let Some(ref name) = config.file_name {
709            return Ok(name.clone());
710        }
711        Err(CamelError::ProcessorError(
712            "No filename specified: set CamelFileName header or fileName option".to_string(),
713        ))
714    }
715}
716
717impl Service<Exchange> for FileProducer {
718    type Response = Exchange;
719    type Error = CamelError;
720    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
721
722    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
723        Poll::Ready(Ok(()))
724    }
725
726    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
727        let config = self.config.clone();
728
729        Box::pin(async move {
730            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
731            let body = exchange.input.body.clone();
732
733            let dir_path = std::path::Path::new(&config.directory);
734            let target_path = dir_path.join(&file_name);
735
736            // 1. Auto-create directories
737            if config.auto_create
738                && let Some(parent) = target_path.parent()
739            {
740                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
741                    .await
742                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
743                    .map_err(CamelError::from)?;
744            }
745
746            // 2. Security: validate path is within base directory
747            validate_path_is_within_base(dir_path, &target_path)?;
748
749            // 3. Handle file-exist strategy
750            match config.file_exist {
751                FileExistStrategy::Fail if target_path.exists() => {
752                    return Err(CamelError::ProcessorError(format!(
753                        "File already exists: {}",
754                        target_path.display()
755                    )));
756                }
757                FileExistStrategy::Append => {
758                    // Append: write directly without temp file (append is inherently non-atomic)
759                    let mut file = tokio::time::timeout(
760                        config.write_timeout,
761                        OpenOptions::new()
762                            .append(true)
763                            .create(true)
764                            .open(&target_path),
765                    )
766                    .await
767                    .map_err(|_| {
768                        CamelError::ProcessorError("Timeout opening file for append".into())
769                    })?
770                    .map_err(CamelError::from)?;
771
772                    tokio::time::timeout(
773                        config.write_timeout,
774                        io::copy(&mut body.into_async_read(), &mut file),
775                    )
776                    .await
777                    .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
778                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
779
780                    file.flush().await.map_err(CamelError::from)?;
781                }
782                _ => {
783                    // Override (or Fail when file doesn't exist): always atomic via temp file
784                    let temp_name = if let Some(ref prefix) = config.temp_prefix {
785                        format!("{prefix}{file_name}")
786                    } else {
787                        format!(".tmp.{file_name}")
788                    };
789                    let temp_path = dir_path.join(&temp_name);
790
791                    // RAII guard ensures cleanup even on panic
792                    let mut guard = TempFileGuard::new(temp_path.clone());
793
794                    // Write to temp file
795                    let mut file =
796                        tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
797                            .await
798                            .map_err(|_| {
799                                CamelError::ProcessorError("Timeout creating temp file".into())
800                            })?
801                            .map_err(CamelError::from)?;
802
803                    let copy_result = tokio::time::timeout(
804                        config.write_timeout,
805                        io::copy(&mut body.into_async_read(), &mut file),
806                    )
807                    .await;
808
809                    // Flush any kernel buffers (best-effort; actual write errors come from io::copy above)
810                    let _ = file.flush().await;
811
812                    match copy_result {
813                        Ok(Ok(_)) => {}
814                        Ok(Err(e)) => {
815                            // Guard will clean up temp file on drop
816                            return Err(CamelError::ProcessorError(e.to_string()));
817                        }
818                        Err(_) => {
819                            // Guard will clean up temp file on drop
820                            return Err(CamelError::ProcessorError("Timeout writing file".into()));
821                        }
822                    }
823
824                    // Atomic rename: temp → target
825                    let rename_result = tokio::time::timeout(
826                        config.write_timeout,
827                        fs::rename(&temp_path, &target_path),
828                    )
829                    .await;
830
831                    match rename_result {
832                        Ok(Ok(_)) => {
833                            // Success — disarm guard so it doesn't delete the renamed file
834                            guard.disarm();
835                        }
836                        Ok(Err(e)) => {
837                            // Guard will clean up temp file on drop
838                            return Err(CamelError::from(e));
839                        }
840                        Err(_) => {
841                            // Guard will clean up temp file on drop
842                            return Err(CamelError::ProcessorError("Timeout renaming file".into()));
843                        }
844                    }
845                }
846            }
847
848            // 4. Set output header
849            let abs_path = target_path
850                .canonicalize()
851                .unwrap_or_else(|_| target_path.clone())
852                .to_string_lossy()
853                .to_string();
854            exchange
855                .input
856                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
857
858            debug!(
859                file = %target_path.display(),
860                correlation_id = %exchange.correlation_id(),
861                "File written"
862            );
863
864            Ok(exchange)
865        })
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872    use bytes::Bytes;
873    use camel_component_api::NoOpComponentContext;
874    use std::time::Duration;
875    use tokio_util::sync::CancellationToken;
876
877    fn test_producer_ctx() -> ProducerContext {
878        ProducerContext::new()
879    }
880
881    #[test]
882    fn test_file_config_defaults() {
883        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
884        assert_eq!(config.directory, "/tmp/inbox");
885        assert_eq!(config.delay, Duration::from_millis(500));
886        assert_eq!(config.initial_delay, Duration::from_millis(1000));
887        assert!(!config.noop);
888        assert!(!config.delete);
889        assert_eq!(config.move_to, Some(".camel".to_string()));
890        assert!(config.file_name.is_none());
891        assert!(config.include.is_none());
892        assert!(config.exclude.is_none());
893        assert!(!config.recursive);
894        assert_eq!(config.file_exist, FileExistStrategy::Override);
895        assert!(config.temp_prefix.is_none());
896        assert!(config.auto_create);
897        // New timeout defaults
898        assert_eq!(config.read_timeout, Duration::from_secs(30));
899        assert_eq!(config.write_timeout, Duration::from_secs(30));
900    }
901
902    #[test]
903    fn test_file_config_consumer_options() {
904        let config = FileConfig::from_uri(
905            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
906        ).unwrap();
907        assert_eq!(config.directory, "/data/input");
908        assert_eq!(config.delay, Duration::from_millis(1000));
909        assert_eq!(config.initial_delay, Duration::from_millis(2000));
910        assert!(config.noop);
911        assert!(config.recursive);
912        assert_eq!(config.include, Some(".*\\.csv".to_string()));
913    }
914
915    #[test]
916    fn test_file_config_producer_options() {
917        let config = FileConfig::from_uri(
918            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
919        )
920        .unwrap();
921        assert_eq!(config.file_exist, FileExistStrategy::Append);
922        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
923        assert!(!config.auto_create);
924        assert_eq!(config.file_name, Some("out.txt".to_string()));
925    }
926
927    #[test]
928    fn test_file_config_delete_mode() {
929        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
930        assert!(config.delete);
931        assert!(config.move_to.is_none());
932    }
933
934    #[test]
935    fn test_file_config_noop_mode() {
936        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
937        assert!(config.noop);
938        assert!(config.move_to.is_none());
939    }
940
941    #[test]
942    fn test_file_config_wrong_scheme() {
943        let result = FileConfig::from_uri("timer:tick");
944        assert!(result.is_err());
945    }
946
947    #[test]
948    fn test_file_component_scheme() {
949        let component = FileComponent::new();
950        assert_eq!(component.scheme(), "file");
951    }
952
953    #[test]
954    fn test_file_component_creates_endpoint() {
955        let component = FileComponent::new();
956        let ctx = NoOpComponentContext;
957        let endpoint = component.create_endpoint("file:/tmp/test", &ctx);
958        assert!(endpoint.is_ok());
959    }
960
961    // -----------------------------------------------------------------------
962    // Consumer tests
963    // -----------------------------------------------------------------------
964
965    #[tokio::test]
966    async fn test_file_consumer_reads_files() {
967        let dir = tempfile::tempdir().unwrap();
968        let dir_path = dir.path().to_str().unwrap();
969
970        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
971        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
972
973        let component = FileComponent::new();
974        let ctx = NoOpComponentContext;
975        let endpoint = component
976            .create_endpoint(
977                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100"),
978                &ctx,
979            )
980            .unwrap();
981        let mut consumer = endpoint.create_consumer().unwrap();
982
983        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
984        let token = CancellationToken::new();
985        let ctx = ConsumerContext::new(tx, token.clone());
986
987        tokio::spawn(async move {
988            consumer.start(ctx).await.unwrap();
989        });
990
991        let mut received = Vec::new();
992        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
993            while let Some(envelope) = rx.recv().await {
994                received.push(envelope.exchange);
995                if received.len() == 2 {
996                    break;
997                }
998            }
999        })
1000        .await;
1001        token.cancel();
1002
1003        assert!(timeout.is_ok(), "Should have received 2 exchanges");
1004        assert_eq!(received.len(), 2);
1005
1006        for ex in &received {
1007            assert!(ex.input.header("CamelFileName").is_some());
1008            assert!(ex.input.header("CamelFileNameOnly").is_some());
1009            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
1010            assert!(ex.input.header("CamelFileLength").is_some());
1011            assert!(ex.input.header("CamelFileLastModified").is_some());
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn noop_second_poll_does_not_re_emit_seen_files() {
1017        let dir = tempfile::tempdir().unwrap();
1018        let file_path = dir.path().join("test.txt");
1019        tokio::fs::write(&file_path, b"hello").await.unwrap();
1020
1021        let uri = format!(
1022            "file:{}?noop=true&initialDelay=0&delay=50",
1023            dir.path().display()
1024        );
1025        let config = FileConfig::from_uri(&uri).unwrap();
1026        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1027        let token = CancellationToken::new();
1028        let ctx = ConsumerContext::new(tx, token);
1029
1030        let include_re = None;
1031        let exclude_re = None;
1032        let mut seen = std::collections::HashSet::new();
1033
1034        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1035            .await
1036            .unwrap();
1037        assert!(rx.try_recv().is_ok(), "first poll should emit file");
1038        assert!(rx.try_recv().is_err(), "should only emit once");
1039
1040        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1041            .await
1042            .unwrap();
1043        assert!(
1044            rx.try_recv().is_err(),
1045            "second poll should not re-emit seen file"
1046        );
1047    }
1048
1049    #[tokio::test]
1050    async fn noop_new_files_picked_up_after_first_poll() {
1051        let dir = tempfile::tempdir().unwrap();
1052        let file1 = dir.path().join("a.txt");
1053        tokio::fs::write(&file1, b"a").await.unwrap();
1054
1055        let uri = format!(
1056            "file:{}?noop=true&initialDelay=0&delay=50",
1057            dir.path().display()
1058        );
1059        let config = FileConfig::from_uri(&uri).unwrap();
1060        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1061        let token = CancellationToken::new();
1062        let ctx = ConsumerContext::new(tx, token);
1063
1064        let include_re = None;
1065        let exclude_re = None;
1066        let mut seen = std::collections::HashSet::new();
1067
1068        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1069            .await
1070            .unwrap();
1071        let _ = rx.try_recv();
1072
1073        let file2 = dir.path().join("b.txt");
1074        tokio::fs::write(&file2, b"b").await.unwrap();
1075
1076        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1077            .await
1078            .unwrap();
1079        assert!(
1080            rx.try_recv().is_ok(),
1081            "b.txt should be emitted on second poll"
1082        );
1083        assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1084    }
1085
1086    #[tokio::test]
1087    async fn test_file_consumer_include_filter() {
1088        let dir = tempfile::tempdir().unwrap();
1089        let dir_path = dir.path().to_str().unwrap();
1090
1091        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1092        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1093
1094        let component = FileComponent::new();
1095        let ctx = NoOpComponentContext;
1096        let endpoint = component
1097            .create_endpoint(
1098                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"),
1099                &ctx,
1100            )
1101            .unwrap();
1102        let mut consumer = endpoint.create_consumer().unwrap();
1103
1104        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1105        let token = CancellationToken::new();
1106        let ctx = ConsumerContext::new(tx, token.clone());
1107
1108        tokio::spawn(async move {
1109            consumer.start(ctx).await.unwrap();
1110        });
1111
1112        let mut received = Vec::new();
1113        let _ = tokio::time::timeout(Duration::from_millis(500), async {
1114            while let Some(envelope) = rx.recv().await {
1115                received.push(envelope.exchange);
1116                if received.len() == 1 {
1117                    break;
1118                }
1119            }
1120        })
1121        .await;
1122        token.cancel();
1123
1124        assert_eq!(received.len(), 1);
1125        let name = received[0]
1126            .input
1127            .header("CamelFileNameOnly")
1128            .and_then(|v| v.as_str())
1129            .unwrap();
1130        assert_eq!(name, "data.csv");
1131    }
1132
1133    #[tokio::test]
1134    async fn test_file_consumer_delete_mode() {
1135        let dir = tempfile::tempdir().unwrap();
1136        let dir_path = dir.path().to_str().unwrap();
1137
1138        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1139
1140        let component = FileComponent::new();
1141        let ctx = NoOpComponentContext;
1142        let endpoint = component
1143            .create_endpoint(
1144                &format!("file:{dir_path}?delete=true&initialDelay=0&delay=100"),
1145                &ctx,
1146            )
1147            .unwrap();
1148        let mut consumer = endpoint.create_consumer().unwrap();
1149
1150        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1151        let token = CancellationToken::new();
1152        let ctx = ConsumerContext::new(tx, token.clone());
1153
1154        tokio::spawn(async move {
1155            consumer.start(ctx).await.unwrap();
1156        });
1157
1158        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1159        token.cancel();
1160
1161        tokio::time::sleep(Duration::from_millis(100)).await;
1162
1163        assert!(
1164            !dir.path().join("deleteme.txt").exists(),
1165            "File should be deleted"
1166        );
1167    }
1168
1169    #[tokio::test]
1170    async fn test_file_consumer_move_mode() {
1171        let dir = tempfile::tempdir().unwrap();
1172        let dir_path = dir.path().to_str().unwrap();
1173
1174        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1175
1176        let component = FileComponent::new();
1177        let ctx = NoOpComponentContext;
1178        let endpoint = component
1179            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"), &ctx)
1180            .unwrap();
1181        let mut consumer = endpoint.create_consumer().unwrap();
1182
1183        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1184        let token = CancellationToken::new();
1185        let ctx = ConsumerContext::new(tx, token.clone());
1186
1187        tokio::spawn(async move {
1188            consumer.start(ctx).await.unwrap();
1189        });
1190
1191        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1192        token.cancel();
1193
1194        tokio::time::sleep(Duration::from_millis(100)).await;
1195
1196        assert!(
1197            !dir.path().join("moveme.txt").exists(),
1198            "Original file should be gone"
1199        );
1200        assert!(
1201            dir.path().join(".camel").join("moveme.txt").exists(),
1202            "File should be in .camel/"
1203        );
1204    }
1205
1206    #[tokio::test]
1207    async fn test_file_consumer_respects_cancellation() {
1208        let dir = tempfile::tempdir().unwrap();
1209        let dir_path = dir.path().to_str().unwrap();
1210
1211        let component = FileComponent::new();
1212        let ctx = NoOpComponentContext;
1213        let endpoint = component
1214            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"), &ctx)
1215            .unwrap();
1216        let mut consumer = endpoint.create_consumer().unwrap();
1217
1218        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1219        let token = CancellationToken::new();
1220        let ctx = ConsumerContext::new(tx, token.clone());
1221
1222        let handle = tokio::spawn(async move {
1223            consumer.start(ctx).await.unwrap();
1224        });
1225
1226        tokio::time::sleep(Duration::from_millis(150)).await;
1227        token.cancel();
1228
1229        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1230        assert!(
1231            result.is_ok(),
1232            "Consumer should have stopped after cancellation"
1233        );
1234    }
1235
1236    // -----------------------------------------------------------------------
1237    // Producer tests
1238    // -----------------------------------------------------------------------
1239
1240    #[tokio::test]
1241    async fn test_file_producer_writes_file() {
1242        use tower::ServiceExt;
1243
1244        let dir = tempfile::tempdir().unwrap();
1245        let dir_path = dir.path().to_str().unwrap();
1246
1247        let component = FileComponent::new();
1248        let ctx = NoOpComponentContext;
1249        let endpoint = component
1250            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1251            .unwrap();
1252        let ctx = test_producer_ctx();
1253        let producer = endpoint.create_producer(&ctx).unwrap();
1254
1255        let mut exchange = Exchange::new(Message::new("file content"));
1256        exchange.input.set_header(
1257            "CamelFileName",
1258            serde_json::Value::String("output.txt".to_string()),
1259        );
1260
1261        let result = producer.oneshot(exchange).await.unwrap();
1262
1263        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1264        assert_eq!(content, "file content");
1265
1266        assert!(result.input.header("CamelFileNameProduced").is_some());
1267    }
1268
1269    #[tokio::test]
1270    async fn test_file_producer_auto_create_dirs() {
1271        use tower::ServiceExt;
1272
1273        let dir = tempfile::tempdir().unwrap();
1274        let dir_path = dir.path().to_str().unwrap();
1275
1276        let component = FileComponent::new();
1277        let ctx = NoOpComponentContext;
1278        let endpoint = component
1279            .create_endpoint(&format!("file:{dir_path}/sub/dir"), &ctx)
1280            .unwrap();
1281        let ctx = test_producer_ctx();
1282        let producer = endpoint.create_producer(&ctx).unwrap();
1283
1284        let mut exchange = Exchange::new(Message::new("nested"));
1285        exchange.input.set_header(
1286            "CamelFileName",
1287            serde_json::Value::String("file.txt".to_string()),
1288        );
1289
1290        producer.oneshot(exchange).await.unwrap();
1291
1292        assert!(dir.path().join("sub/dir/file.txt").exists());
1293    }
1294
1295    #[tokio::test]
1296    async fn test_file_producer_file_exist_fail() {
1297        use tower::ServiceExt;
1298
1299        let dir = tempfile::tempdir().unwrap();
1300        let dir_path = dir.path().to_str().unwrap();
1301
1302        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1303
1304        let component = FileComponent::new();
1305        let ctx = NoOpComponentContext;
1306        let endpoint = component
1307            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"), &ctx)
1308            .unwrap();
1309        let ctx = test_producer_ctx();
1310        let producer = endpoint.create_producer(&ctx).unwrap();
1311
1312        let mut exchange = Exchange::new(Message::new("new"));
1313        exchange.input.set_header(
1314            "CamelFileName",
1315            serde_json::Value::String("existing.txt".to_string()),
1316        );
1317
1318        let result = producer.oneshot(exchange).await;
1319        assert!(
1320            result.is_err(),
1321            "Should fail when file exists with Fail strategy"
1322        );
1323    }
1324
1325    #[tokio::test]
1326    async fn test_file_producer_file_exist_append() {
1327        use tower::ServiceExt;
1328
1329        let dir = tempfile::tempdir().unwrap();
1330        let dir_path = dir.path().to_str().unwrap();
1331
1332        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1333
1334        let component = FileComponent::new();
1335        let ctx = NoOpComponentContext;
1336        let endpoint = component
1337            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"), &ctx)
1338            .unwrap();
1339        let ctx = test_producer_ctx();
1340        let producer = endpoint.create_producer(&ctx).unwrap();
1341
1342        let mut exchange = Exchange::new(Message::new("new"));
1343        exchange.input.set_header(
1344            "CamelFileName",
1345            serde_json::Value::String("append.txt".to_string()),
1346        );
1347
1348        producer.oneshot(exchange).await.unwrap();
1349
1350        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1351        assert_eq!(content, "oldnew");
1352    }
1353
1354    #[tokio::test]
1355    async fn test_file_producer_temp_prefix() {
1356        use tower::ServiceExt;
1357
1358        let dir = tempfile::tempdir().unwrap();
1359        let dir_path = dir.path().to_str().unwrap();
1360
1361        let component = FileComponent::new();
1362        let ctx = NoOpComponentContext;
1363        let endpoint = component
1364            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"), &ctx)
1365            .unwrap();
1366        let ctx = test_producer_ctx();
1367        let producer = endpoint.create_producer(&ctx).unwrap();
1368
1369        let mut exchange = Exchange::new(Message::new("atomic write"));
1370        exchange.input.set_header(
1371            "CamelFileName",
1372            serde_json::Value::String("final.txt".to_string()),
1373        );
1374
1375        producer.oneshot(exchange).await.unwrap();
1376
1377        assert!(dir.path().join("final.txt").exists());
1378        assert!(!dir.path().join(".tmpfinal.txt").exists());
1379        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1380        assert_eq!(content, "atomic write");
1381    }
1382
1383    #[tokio::test]
1384    async fn test_file_producer_uses_filename_option() {
1385        use tower::ServiceExt;
1386
1387        let dir = tempfile::tempdir().unwrap();
1388        let dir_path = dir.path().to_str().unwrap();
1389
1390        let component = FileComponent::new();
1391        let ctx = NoOpComponentContext;
1392        let endpoint = component
1393            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"), &ctx)
1394            .unwrap();
1395        let ctx = test_producer_ctx();
1396        let producer = endpoint.create_producer(&ctx).unwrap();
1397
1398        let exchange = Exchange::new(Message::new("content"));
1399
1400        producer.oneshot(exchange).await.unwrap();
1401        assert!(dir.path().join("fixed.txt").exists());
1402    }
1403
1404    #[tokio::test]
1405    async fn test_file_producer_no_filename_errors() {
1406        use tower::ServiceExt;
1407
1408        let dir = tempfile::tempdir().unwrap();
1409        let dir_path = dir.path().to_str().unwrap();
1410
1411        let component = FileComponent::new();
1412        let ctx = NoOpComponentContext;
1413        let endpoint = component
1414            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1415            .unwrap();
1416        let ctx = test_producer_ctx();
1417        let producer = endpoint.create_producer(&ctx).unwrap();
1418
1419        let exchange = Exchange::new(Message::new("content"));
1420
1421        let result = producer.oneshot(exchange).await;
1422        assert!(result.is_err(), "Should error when no filename is provided");
1423    }
1424
1425    // -----------------------------------------------------------------------
1426    // Security tests - Path traversal protection
1427    // -----------------------------------------------------------------------
1428
1429    #[tokio::test]
1430    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1431        use tower::ServiceExt;
1432
1433        let dir = tempfile::tempdir().unwrap();
1434        let dir_path = dir.path().to_str().unwrap();
1435
1436        // Create a subdirectory
1437        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1438        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1439
1440        let component = FileComponent::new();
1441        let ctx = NoOpComponentContext;
1442        let endpoint = component
1443            .create_endpoint(&format!("file:{dir_path}/subdir"), &ctx)
1444            .unwrap();
1445        let ctx = test_producer_ctx();
1446        let producer = endpoint.create_producer(&ctx).unwrap();
1447
1448        let mut exchange = Exchange::new(Message::new("malicious"));
1449        exchange.input.set_header(
1450            "CamelFileName",
1451            serde_json::Value::String("../secret.txt".to_string()),
1452        );
1453
1454        let result = producer.oneshot(exchange).await;
1455        assert!(result.is_err(), "Should reject path traversal attempt");
1456
1457        let err = result.unwrap_err();
1458        assert!(
1459            err.to_string().contains("outside"),
1460            "Error should mention path is outside base directory"
1461        );
1462    }
1463
1464    #[tokio::test]
1465    async fn test_file_producer_rejects_absolute_path_outside_base() {
1466        use tower::ServiceExt;
1467
1468        let dir = tempfile::tempdir().unwrap();
1469        let dir_path = dir.path().to_str().unwrap();
1470
1471        let component = FileComponent::new();
1472        let ctx = NoOpComponentContext;
1473        let endpoint = component
1474            .create_endpoint(&format!("file:{dir_path}"), &ctx)
1475            .unwrap();
1476        let ctx = test_producer_ctx();
1477        let producer = endpoint.create_producer(&ctx).unwrap();
1478
1479        let mut exchange = Exchange::new(Message::new("malicious"));
1480        exchange.input.set_header(
1481            "CamelFileName",
1482            serde_json::Value::String("/etc/passwd".to_string()),
1483        );
1484
1485        let result = producer.oneshot(exchange).await;
1486        assert!(result.is_err(), "Should reject absolute path outside base");
1487    }
1488
1489    // -----------------------------------------------------------------------
1490    // Large file streaming tests
1491    // -----------------------------------------------------------------------
1492
1493    #[tokio::test]
1494    #[ignore] // Slow test - run with --ignored flag
1495    async fn test_large_file_streaming_constant_memory() {
1496        use std::io::Write;
1497        use tempfile::NamedTempFile;
1498
1499        // Create a 150MB file (larger than 100MB limit)
1500        let mut temp_file = NamedTempFile::new().unwrap();
1501        let file_size = 150 * 1024 * 1024; // 150MB
1502        let chunk = vec![b'X'; 1024 * 1024]; // 1MB chunk
1503
1504        for _ in 0..150 {
1505            temp_file.write_all(&chunk).unwrap();
1506        }
1507        temp_file.flush().unwrap();
1508
1509        let dir = temp_file.path().parent().unwrap();
1510        let dir_path = dir.to_str().unwrap();
1511        let file_name = temp_file
1512            .path()
1513            .file_name()
1514            .unwrap()
1515            .to_str()
1516            .unwrap()
1517            .to_string();
1518
1519        // Read file as stream (should succeed with lazy evaluation)
1520        let component = FileComponent::new();
1521        let component_ctx = NoOpComponentContext;
1522        let endpoint = component
1523            .create_endpoint(
1524                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1525                &component_ctx,
1526            )
1527            .unwrap();
1528        let mut consumer = endpoint.create_consumer().unwrap();
1529
1530        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1531        let token = CancellationToken::new();
1532        let ctx = ConsumerContext::new(tx, token.clone());
1533
1534        tokio::spawn(async move {
1535            let _ = consumer.start(ctx).await;
1536        });
1537
1538        let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1539            rx.recv().await.unwrap().exchange
1540        })
1541        .await
1542        .expect("Should receive exchange");
1543        token.cancel();
1544
1545        // Verify body is a stream (not materialized)
1546        assert!(matches!(exchange.input.body, Body::Stream(_)));
1547
1548        // Verify we can read metadata without consuming
1549        if let Body::Stream(ref stream_body) = exchange.input.body {
1550            assert!(stream_body.metadata.size_hint.is_some());
1551            let size = stream_body.metadata.size_hint.unwrap();
1552            assert_eq!(size, file_size as u64);
1553        }
1554
1555        // Materializing should fail (exceeds 100MB limit)
1556        if let Body::Stream(stream_body) = exchange.input.body {
1557            let body = Body::Stream(stream_body);
1558            let result = body.into_bytes(100 * 1024 * 1024).await;
1559            assert!(result.is_err());
1560        }
1561
1562        // But we CAN read chunks one at a time (simulating line-by-line processing)
1563        // This demonstrates lazy evaluation - we don't need to load entire file
1564        let component2 = FileComponent::new();
1565        let endpoint2 = component2
1566            .create_endpoint(
1567                &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1568                &component_ctx,
1569            )
1570            .unwrap();
1571        let mut consumer2 = endpoint2.create_consumer().unwrap();
1572
1573        let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1574        let token2 = CancellationToken::new();
1575        let ctx2 = ConsumerContext::new(tx2, token2.clone());
1576
1577        tokio::spawn(async move {
1578            let _ = consumer2.start(ctx2).await;
1579        });
1580
1581        let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1582            rx2.recv().await.unwrap().exchange
1583        })
1584        .await
1585        .expect("Should receive exchange");
1586        token2.cancel();
1587
1588        if let Body::Stream(stream_body) = exchange2.input.body {
1589            let mut stream_lock = stream_body.stream.lock().await;
1590            let mut stream = stream_lock.take().unwrap();
1591
1592            // Read first chunk (size varies based on ReaderStream's buffer)
1593            if let Some(chunk_result) = stream.next().await {
1594                let chunk = chunk_result.unwrap();
1595                assert!(!chunk.is_empty());
1596                assert!(chunk.len() < file_size);
1597                // Memory usage is constant - we only have this chunk in memory, not 150MB
1598            }
1599        }
1600    }
1601
1602    // -----------------------------------------------------------------------
1603    // Streaming producer tests
1604    // -----------------------------------------------------------------------
1605
1606    #[tokio::test]
1607    async fn test_producer_writes_stream_body() {
1608        let dir = tempfile::tempdir().unwrap();
1609        let dir_path = dir.path().to_str().unwrap();
1610        let uri = format!("file:{dir_path}?fileName=out.txt");
1611
1612        let component = FileComponent::new();
1613        let ctx = NoOpComponentContext;
1614        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1615        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1616
1617        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1618            Ok(Bytes::from("hello ")),
1619            Ok(Bytes::from("streaming ")),
1620            Ok(Bytes::from("world")),
1621        ];
1622        let stream = futures::stream::iter(chunks);
1623        let body = Body::Stream(StreamBody {
1624            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1625            metadata: StreamMetadata {
1626                size_hint: None,
1627                content_type: None,
1628                origin: None,
1629            },
1630        });
1631
1632        let exchange = Exchange::new(Message::new(body));
1633        tower::ServiceExt::oneshot(producer, exchange)
1634            .await
1635            .unwrap();
1636
1637        let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1638            .await
1639            .unwrap();
1640        assert_eq!(content, "hello streaming world");
1641    }
1642
1643    #[tokio::test]
1644    async fn test_producer_stream_atomic_no_partial_on_error() {
1645        // If the stream errors mid-write, no file should exist at the target path
1646        let dir = tempfile::tempdir().unwrap();
1647        let dir_path = dir.path().to_str().unwrap();
1648        let uri = format!("file:{dir_path}?fileName=out.txt");
1649
1650        let component = FileComponent::new();
1651        let ctx = NoOpComponentContext;
1652        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1653        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1654
1655        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1656            Ok(Bytes::from("partial")),
1657            Err(CamelError::ProcessorError(
1658                "simulated stream error".to_string(),
1659            )),
1660        ];
1661        let stream = futures::stream::iter(chunks);
1662        let body = Body::Stream(StreamBody {
1663            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1664            metadata: StreamMetadata {
1665                size_hint: None,
1666                content_type: None,
1667                origin: None,
1668            },
1669        });
1670
1671        let exchange = Exchange::new(Message::new(body));
1672        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1673        assert!(
1674            result.is_err(),
1675            "expected error when stream fails mid-write"
1676        );
1677
1678        // Target file must NOT exist — write was aborted and temp file cleaned up
1679        assert!(
1680            !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1681            "partial file must not exist after failed write"
1682        );
1683
1684        // Temp file must also be cleaned up
1685        assert!(
1686            !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1687            "temp file must be cleaned up after failed write"
1688        );
1689    }
1690
1691    #[tokio::test]
1692    async fn test_producer_stream_append() {
1693        let dir = tempfile::tempdir().unwrap();
1694        let dir_path = dir.path().to_str().unwrap();
1695        let target = format!("{dir_path}/out.txt");
1696
1697        // Pre-create file with initial content
1698        tokio::fs::write(&target, b"line1\n").await.unwrap();
1699
1700        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1701        let component = FileComponent::new();
1702        let ctx = NoOpComponentContext;
1703        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1704        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1705
1706        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1707        let stream = futures::stream::iter(chunks);
1708        let body = Body::Stream(StreamBody {
1709            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1710            metadata: StreamMetadata {
1711                size_hint: None,
1712                content_type: None,
1713                origin: None,
1714            },
1715        });
1716
1717        let exchange = Exchange::new(Message::new(body));
1718        tower::ServiceExt::oneshot(producer, exchange)
1719            .await
1720            .unwrap();
1721
1722        let content = tokio::fs::read_to_string(&target).await.unwrap();
1723        assert_eq!(content, "line1\nline2\n");
1724    }
1725
1726    #[tokio::test]
1727    async fn test_producer_stream_append_partial_on_error() {
1728        // Append is inherently non-atomic: if the stream errors mid-write,
1729        // the file will contain partial data. This test documents that behavior.
1730        let dir = tempfile::tempdir().unwrap();
1731        let dir_path = dir.path().to_str().unwrap();
1732        let target = format!("{dir_path}/out.txt");
1733
1734        // Pre-create file with initial content
1735        tokio::fs::write(&target, b"initial\n").await.unwrap();
1736
1737        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1738        let component = FileComponent::new();
1739        let ctx = NoOpComponentContext;
1740        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1741        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1742
1743        // Stream with an error in the middle
1744        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1745            Ok(Bytes::from("partial-")), // This will be written
1746            Err(CamelError::ProcessorError("stream error".to_string())), // This causes failure
1747            Ok(Bytes::from("never-written")), // This won't be reached
1748        ];
1749        let stream = futures::stream::iter(chunks);
1750        let body = Body::Stream(StreamBody {
1751            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1752            metadata: StreamMetadata {
1753                size_hint: None,
1754                content_type: None,
1755                origin: None,
1756            },
1757        });
1758
1759        let exchange = Exchange::new(Message::new(body));
1760        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1761
1762        // 1. Producer must return an error
1763        assert!(
1764            result.is_err(),
1765            "expected error when stream fails during append"
1766        );
1767
1768        // 2. File must contain initial content + partial data written before the error
1769        let content = tokio::fs::read_to_string(&target).await.unwrap();
1770        assert_eq!(
1771            content, "initial\npartial-",
1772            "append leaves partial data on stream error (non-atomic by nature)"
1773        );
1774    }
1775
1776    #[tokio::test]
1777    async fn test_producer_stream_already_consumed_errors() {
1778        let dir = tempfile::tempdir().unwrap();
1779        let dir_path = dir.path().to_str().unwrap();
1780        let uri = format!("file:{dir_path}?fileName=out.txt");
1781
1782        let component = FileComponent::new();
1783        let ctx = NoOpComponentContext;
1784        let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1785        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1786
1787        // Mutex holds None -> stream already consumed
1788        type MaybeStream = std::sync::Arc<
1789            tokio::sync::Mutex<
1790                Option<
1791                    std::pin::Pin<
1792                        Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1793                    >,
1794                >,
1795            >,
1796        >;
1797        let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1798        let body = Body::Stream(StreamBody {
1799            stream: arc,
1800            metadata: StreamMetadata {
1801                size_hint: None,
1802                content_type: None,
1803                origin: None,
1804            },
1805        });
1806
1807        let exchange = Exchange::new(Message::new(body));
1808        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1809        assert!(
1810            result.is_err(),
1811            "expected error for already-consumed stream"
1812        );
1813    }
1814
1815    // -----------------------------------------------------------------------
1816    // GlobalConfig tests - apply_global_defaults behavior
1817    // -----------------------------------------------------------------------
1818
1819    #[test]
1820    fn test_global_config_applied_to_endpoint() {
1821        // Global config with non-default values
1822        let global = FileGlobalConfig::default()
1823            .with_delay_ms(2000)
1824            .with_initial_delay_ms(5000)
1825            .with_read_timeout_ms(60_000)
1826            .with_write_timeout_ms(45_000);
1827        let component = FileComponent::with_config(global);
1828        let ctx = NoOpComponentContext;
1829        // URI uses no explicit delay/timeout params → macro defaults apply
1830        let endpoint = component.create_endpoint("file:/tmp/inbox", &ctx).unwrap();
1831        // We cannot call endpoint.config directly (FileEndpoint is private),
1832        // but we can test apply_global_defaults on FileConfig directly:
1833        let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1834        let global2 = FileGlobalConfig::default()
1835            .with_delay_ms(2000)
1836            .with_initial_delay_ms(5000)
1837            .with_read_timeout_ms(60_000)
1838            .with_write_timeout_ms(45_000);
1839        config.apply_global_defaults(&global2);
1840        assert_eq!(config.delay, Duration::from_millis(2000));
1841        assert_eq!(config.initial_delay, Duration::from_millis(5000));
1842        assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1843        assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1844        // endpoint creation succeeds too
1845        let _ = endpoint; // just verify create_endpoint didn't fail
1846    }
1847
1848    #[test]
1849    fn test_uri_param_wins_over_global_config() {
1850        // URI explicitly sets delay=1000 (NOT the 500ms macro default)
1851        let mut config =
1852            FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1853        // Global config would want 3000ms delay
1854        let global = FileGlobalConfig::default()
1855            .with_delay_ms(3000)
1856            .with_initial_delay_ms(4000);
1857        config.apply_global_defaults(&global);
1858        // URI value of 1000ms must be preserved (not replaced by 3000ms)
1859        assert_eq!(config.delay, Duration::from_millis(1000));
1860        // URI value of 2000ms must be preserved (not replaced by 4000ms)
1861        assert_eq!(config.initial_delay, Duration::from_millis(2000));
1862        // read_timeout was not set by URI → macro default (30000) → global wins if different
1863        // (read_timeout stays at 30000 since global has same default = 30000)
1864        assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1865    }
1866}