Skip to main content

camel_component_file/
lib.rs

1use std::collections::HashSet;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::str::FromStr;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use futures::StreamExt;
11use regex::Regex;
12use tokio::fs;
13use tokio::fs::OpenOptions;
14use tokio::io;
15use tokio::io::AsyncWriteExt;
16use tokio::time;
17use tokio_util::io::ReaderStream;
18use tower::Service;
19use tracing::{debug, warn};
20
21use camel_api::{
22    BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
23};
24use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
25use camel_endpoint::{UriConfig, parse_uri};
26
27// ---------------------------------------------------------------------------
28// TempFileGuard — RAII cleanup for temp files (panic-safe)
29// ---------------------------------------------------------------------------
30
31/// RAII guard that ensures temp file cleanup even on panic.
32///
33/// When dropped, removes the file at `path` unless `disarm` is set to true.
34/// This protects against temp file leaks if `io::copy` panics mid-write.
35struct TempFileGuard {
36    path: PathBuf,
37    disarm: bool,
38}
39
40impl TempFileGuard {
41    fn new(path: PathBuf) -> Self {
42        Self {
43            path,
44            disarm: false,
45        }
46    }
47
48    /// Call after successful rename to prevent cleanup.
49    fn disarm(&mut self) {
50        self.disarm = true;
51    }
52}
53
54impl Drop for TempFileGuard {
55    fn drop(&mut self) {
56        if !self.disarm {
57            // Best-effort cleanup; ignore errors (file may not exist)
58            let _ = std::fs::remove_file(&self.path);
59        }
60    }
61}
62
63// ---------------------------------------------------------------------------
64// FileExistStrategy
65// ---------------------------------------------------------------------------
66
67/// Strategy for handling existing files when writing.
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
69pub enum FileExistStrategy {
70    /// Overwrite existing file (default).
71    #[default]
72    Override,
73    /// Append to existing file.
74    Append,
75    /// Fail if file exists.
76    Fail,
77}
78
79impl FromStr for FileExistStrategy {
80    type Err = String;
81
82    fn from_str(s: &str) -> Result<Self, Self::Err> {
83        match s {
84            "Override" | "override" => Ok(FileExistStrategy::Override),
85            "Append" | "append" => Ok(FileExistStrategy::Append),
86            "Fail" | "fail" => Ok(FileExistStrategy::Fail),
87            _ => Ok(FileExistStrategy::Override), // Default for unknown values
88        }
89    }
90}
91
92// ---------------------------------------------------------------------------
93// FileGlobalConfig
94// ---------------------------------------------------------------------------
95
96/// Global configuration for File component.
97/// Plain Rust, no serde, with Default impl and builder methods.
98/// These are the fallback defaults when URI params are not set.
99#[derive(Debug, Clone, PartialEq)]
100pub struct FileGlobalConfig {
101    pub delay_ms: u64,
102    pub initial_delay_ms: u64,
103    pub read_timeout_ms: u64,
104    pub write_timeout_ms: u64,
105}
106
107impl Default for FileGlobalConfig {
108    fn default() -> Self {
109        Self {
110            delay_ms: 500,
111            initial_delay_ms: 1_000,
112            read_timeout_ms: 30_000,
113            write_timeout_ms: 30_000,
114        }
115    }
116}
117
118impl FileGlobalConfig {
119    pub fn new() -> Self {
120        Self::default()
121    }
122    pub fn with_delay_ms(mut self, v: u64) -> Self {
123        self.delay_ms = v;
124        self
125    }
126    pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
127        self.initial_delay_ms = v;
128        self
129    }
130    pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
131        self.read_timeout_ms = v;
132        self
133    }
134    pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
135        self.write_timeout_ms = v;
136        self
137    }
138}
139
140// ---------------------------------------------------------------------------
141// FileConfig
142// ---------------------------------------------------------------------------
143
144/// Configuration for file component endpoints.
145///
146/// # Streaming
147///
148/// Both the file consumer and producer use **native streaming** with no RAM
149/// materialization:
150///
151/// - The **consumer** creates a `Body::Stream` backed by `tokio::fs::File` via
152///   `ReaderStream`. Files of any size are handled without loading them into memory.
153///
154/// - The **producer** writes via `tokio::io::copy` directly to a `tokio::fs::File`
155///   using `Body::into_async_read()`. Writes for the `Override` strategy are
156///   **atomic**: data is written to a temporary file first and renamed only on
157///   success, preventing partial files on failure.
158///
159/// # Write strategies (`fileExist` URI parameter)
160///
161/// | Value | Behavior |
162/// |-------|----------|
163/// | `Override` (default) | Atomic write via temp file + rename |
164/// | `Append` | Appends to existing file; non-atomic by nature |
165/// | `Fail` | Returns error if file already exists |
166#[derive(Debug, Clone, UriConfig)]
167#[uri_scheme = "file"]
168#[uri_config(skip_impl)]
169pub struct FileConfig {
170    /// Directory path to read from or write to.
171    pub directory: String,
172
173    /// Polling delay in milliseconds (companion field for `delay`).
174    #[allow(dead_code)]
175    #[uri_param(name = "delay", default = "500")]
176    delay_ms: u64,
177
178    /// Polling delay as Duration.
179    pub delay: Duration,
180
181    /// Initial delay in milliseconds (companion field for `initial_delay`).
182    #[allow(dead_code)]
183    #[uri_param(name = "initialDelay", default = "1000")]
184    initial_delay_ms: u64,
185
186    /// Initial delay as Duration.
187    pub initial_delay: Duration,
188
189    /// If true, don't delete or move files after processing.
190    #[uri_param(default = "false")]
191    pub noop: bool,
192
193    /// If true, delete files after processing.
194    #[uri_param(default = "false")]
195    pub delete: bool,
196
197    /// Directory to move processed files to (only if not noop/delete).
198    /// Default is ".camel" when not specified and noop/delete are false.
199    #[uri_param(name = "move")]
200    move_to: Option<String>,
201
202    /// Fixed filename for producer (optional).
203    #[uri_param(name = "fileName")]
204    pub file_name: Option<String>,
205
206    /// Regex pattern for including files (consumer).
207    #[uri_param]
208    pub include: Option<String>,
209
210    /// Regex pattern for excluding files (consumer).
211    #[uri_param]
212    pub exclude: Option<String>,
213
214    /// Whether to scan directories recursively.
215    #[uri_param(default = "false")]
216    pub recursive: bool,
217
218    /// Strategy for handling existing files when writing.
219    #[uri_param(name = "fileExist", default = "Override")]
220    pub file_exist: FileExistStrategy,
221
222    /// Prefix for temporary files during atomic writes.
223    #[uri_param(name = "tempPrefix")]
224    pub temp_prefix: Option<String>,
225
226    /// Whether to automatically create directories.
227    #[uri_param(name = "autoCreate", default = "true")]
228    pub auto_create: bool,
229
230    /// Read timeout in milliseconds (companion field for `read_timeout`).
231    #[allow(dead_code)]
232    #[uri_param(name = "readTimeout", default = "30000")]
233    read_timeout_ms: u64,
234
235    /// Read timeout as Duration.
236    pub read_timeout: Duration,
237
238    /// Write timeout in milliseconds (companion field for `write_timeout`).
239    #[allow(dead_code)]
240    #[uri_param(name = "writeTimeout", default = "30000")]
241    write_timeout_ms: u64,
242
243    /// Write timeout as Duration.
244    pub write_timeout: Duration,
245}
246
247impl UriConfig for FileConfig {
248    fn scheme() -> &'static str {
249        "file"
250    }
251
252    fn from_uri(uri: &str) -> Result<Self, CamelError> {
253        let parts = parse_uri(uri)?;
254        Self::from_components(parts)
255    }
256
257    fn from_components(parts: camel_endpoint::UriComponents) -> Result<Self, CamelError> {
258        Self::parse_uri_components(parts)?.validate()
259    }
260
261    fn validate(self) -> Result<Self, CamelError> {
262        // Apply conditional logic for move_to:
263        // - If noop or delete is true, move_to should be None
264        // - Otherwise, if move_to is None, default to ".camel"
265        let move_to = if self.noop || self.delete {
266            None
267        } else {
268            Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
269        };
270
271        Ok(Self { move_to, ..self })
272    }
273}
274
275impl FileConfig {
276    /// Apply global config defaults. Since FileConfig uses a proc macro that bakes in
277    /// defaults, we compare Duration values against the known macro defaults to detect
278    /// "not explicitly set by user". Only overrides when current value == macro default.
279    ///
280    /// **Note**: If a user explicitly sets a URI param to its default value (e.g.,
281    /// `?delay=500`), it is indistinguishable from "not set" and will be overridden
282    /// by global config. This is a known limitation of the Duration comparison approach.
283    pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
284        if self.delay == Duration::from_millis(500) {
285            self.delay = Duration::from_millis(global.delay_ms);
286        }
287        if self.initial_delay == Duration::from_millis(1_000) {
288            self.initial_delay = Duration::from_millis(global.initial_delay_ms);
289        }
290        if self.read_timeout == Duration::from_millis(30_000) {
291            self.read_timeout = Duration::from_millis(global.read_timeout_ms);
292        }
293        if self.write_timeout == Duration::from_millis(30_000) {
294            self.write_timeout = Duration::from_millis(global.write_timeout_ms);
295        }
296    }
297}
298
299// ---------------------------------------------------------------------------
300// FileComponent
301// ---------------------------------------------------------------------------
302
303pub struct FileComponent {
304    config: Option<FileGlobalConfig>,
305}
306
307impl FileComponent {
308    pub fn new() -> Self {
309        Self { config: None }
310    }
311
312    pub fn with_config(config: FileGlobalConfig) -> Self {
313        Self {
314            config: Some(config),
315        }
316    }
317
318    pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
319        Self { config }
320    }
321}
322
323impl Default for FileComponent {
324    fn default() -> Self {
325        Self::new()
326    }
327}
328
329impl Component for FileComponent {
330    fn scheme(&self) -> &str {
331        "file"
332    }
333
334    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
335        let mut config = FileConfig::from_uri(uri)?;
336        if let Some(ref global_config) = self.config {
337            config.apply_global_defaults(global_config);
338        }
339        Ok(Box::new(FileEndpoint {
340            uri: uri.to_string(),
341            config,
342        }))
343    }
344}
345
346// ---------------------------------------------------------------------------
347// FileEndpoint
348// ---------------------------------------------------------------------------
349
350struct FileEndpoint {
351    uri: String,
352    config: FileConfig,
353}
354
355impl Endpoint for FileEndpoint {
356    fn uri(&self) -> &str {
357        &self.uri
358    }
359
360    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
361        Ok(Box::new(FileConsumer {
362            config: self.config.clone(),
363            seen: HashSet::new(),
364        }))
365    }
366
367    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
368        Ok(BoxProcessor::new(FileProducer {
369            config: self.config.clone(),
370        }))
371    }
372}
373
374// ---------------------------------------------------------------------------
375// FileConsumer
376// ---------------------------------------------------------------------------
377
378struct FileConsumer {
379    config: FileConfig,
380    seen: HashSet<PathBuf>,
381}
382
383#[async_trait]
384impl Consumer for FileConsumer {
385    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
386        let config = self.config.clone();
387
388        let include_re = config
389            .include
390            .as_ref()
391            .map(|p| Regex::new(p))
392            .transpose()
393            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
394        let exclude_re = config
395            .exclude
396            .as_ref()
397            .map(|p| Regex::new(p))
398            .transpose()
399            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
400
401        if !config.initial_delay.is_zero() {
402            tokio::select! {
403                _ = time::sleep(config.initial_delay) => {}
404                _ = context.cancelled() => {
405                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
406                    return Ok(());
407                }
408            }
409        }
410
411        let mut interval = time::interval(config.delay);
412
413        loop {
414            tokio::select! {
415                _ = context.cancelled() => {
416                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
417                    break;
418                }
419                _ = interval.tick() => {
420                    if let Err(e) = poll_directory(
421                        &config,
422                        &context,
423                        &include_re,
424                        &exclude_re,
425                        &mut self.seen,
426                    ).await {
427                        warn!(directory = config.directory, error = %e, "Error polling directory");
428                    }
429                }
430            }
431        }
432
433        Ok(())
434    }
435
436    async fn stop(&mut self) -> Result<(), CamelError> {
437        Ok(())
438    }
439}
440
441async fn poll_directory(
442    config: &FileConfig,
443    context: &ConsumerContext,
444    include_re: &Option<Regex>,
445    exclude_re: &Option<Regex>,
446    seen: &mut HashSet<PathBuf>,
447) -> Result<(), CamelError> {
448    let base_path = std::path::Path::new(&config.directory);
449
450    let files = list_files(base_path, config.recursive).await?;
451
452    for file_path in files {
453        let file_name = file_path
454            .file_name()
455            .and_then(|n| n.to_str())
456            .unwrap_or_default()
457            .to_string();
458
459        if let Some(ref target_name) = config.file_name
460            && file_name != *target_name
461        {
462            continue;
463        }
464
465        if let Some(re) = include_re
466            && !re.is_match(&file_name)
467        {
468            continue;
469        }
470
471        if let Some(re) = exclude_re
472            && re.is_match(&file_name)
473        {
474            continue;
475        }
476
477        if let Some(ref move_dir) = config.move_to
478            && file_path.starts_with(base_path.join(move_dir))
479        {
480            continue;
481        }
482
483        // Idempotent consumer: skip already-seen files when noop=true
484        if config.noop && seen.contains(&file_path) {
485            continue;
486        }
487
488        let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
489            let f = fs::File::open(&file_path).await?;
490            let m = f.metadata().await?;
491            Ok::<_, std::io::Error>((f, m))
492        })
493        .await
494        {
495            Ok(Ok((f, m))) => (f, Some(m)),
496            Ok(Err(e)) => {
497                warn!(
498                    file = %file_path.display(),
499                    error = %e,
500                    "Failed to open file"
501                );
502                continue;
503            }
504            Err(_) => {
505                warn!(
506                    file = %file_path.display(),
507                    timeout_ms = config.read_timeout.as_millis(),
508                    "Timeout opening file"
509                );
510                continue;
511            }
512        };
513
514        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
515        let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
516
517        let last_modified = metadata
518            .as_ref()
519            .and_then(|m| m.modified().ok())
520            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
521            .map(|d| d.as_millis() as u64)
522            .unwrap_or(0);
523
524        let relative_path = file_path
525            .strip_prefix(base_path)
526            .unwrap_or(&file_path)
527            .to_string_lossy()
528            .to_string();
529
530        let absolute_path = file_path
531            .canonicalize()
532            .unwrap_or_else(|_| file_path.clone())
533            .to_string_lossy()
534            .to_string();
535
536        let body = Body::Stream(StreamBody {
537            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
538            metadata: StreamMetadata {
539                size_hint: Some(file_len),
540                content_type: None,
541                origin: Some(absolute_path.clone()),
542            },
543        });
544
545        let mut exchange = Exchange::new(Message::new(body));
546        exchange
547            .input
548            .set_header("CamelFileName", serde_json::Value::String(relative_path));
549        exchange.input.set_header(
550            "CamelFileNameOnly",
551            serde_json::Value::String(file_name.clone()),
552        );
553        exchange.input.set_header(
554            "CamelFileAbsolutePath",
555            serde_json::Value::String(absolute_path),
556        );
557        exchange.input.set_header(
558            "CamelFileLength",
559            serde_json::Value::Number(file_len.into()),
560        );
561        exchange.input.set_header(
562            "CamelFileLastModified",
563            serde_json::Value::Number(last_modified.into()),
564        );
565
566        debug!(
567            file = %file_path.display(),
568            correlation_id = %exchange.correlation_id(),
569            "Processing file"
570        );
571
572        if context.send(exchange).await.is_err() {
573            break;
574        }
575
576        if config.noop {
577            seen.insert(file_path.clone());
578        }
579
580        if config.noop {
581            // Do nothing
582        } else if config.delete {
583            if let Err(e) = fs::remove_file(&file_path).await {
584                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
585            }
586        } else if let Some(ref move_dir) = config.move_to {
587            let target_dir = base_path.join(move_dir);
588            if let Err(e) = fs::create_dir_all(&target_dir).await {
589                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
590                continue;
591            }
592            let target_path = target_dir.join(&file_name);
593            if let Err(e) = fs::rename(&file_path, &target_path).await {
594                warn!(
595                    from = %file_path.display(),
596                    to = %target_path.display(),
597                    error = %e,
598                    "Failed to move file"
599                );
600            }
601        }
602    }
603
604    Ok(())
605}
606
607async fn list_files(
608    dir: &std::path::Path,
609    recursive: bool,
610) -> Result<Vec<std::path::PathBuf>, CamelError> {
611    let mut files = Vec::new();
612    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
613
614    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
615        let path = entry.path();
616        if path.is_file() {
617            files.push(path);
618        } else if path.is_dir() && recursive {
619            let mut sub_files = Box::pin(list_files(&path, true)).await?;
620            files.append(&mut sub_files);
621        }
622    }
623
624    files.sort();
625    Ok(files)
626}
627
628// ---------------------------------------------------------------------------
629// Path validation for security
630// ---------------------------------------------------------------------------
631
632fn validate_path_is_within_base(
633    base_dir: &std::path::Path,
634    target_path: &std::path::Path,
635) -> Result<(), CamelError> {
636    let canonical_base = base_dir.canonicalize().map_err(|e| {
637        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
638    })?;
639
640    // For non-existent paths, canonicalize the parent and construct the full path
641    let canonical_target = if target_path.exists() {
642        target_path.canonicalize().map_err(|e| {
643            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
644        })?
645    } else if let Some(parent) = target_path.parent() {
646        // Ensure parent exists (should have been created by auto_create)
647        if !parent.exists() {
648            return Err(CamelError::ProcessorError(format!(
649                "Parent directory '{}' does not exist",
650                parent.display()
651            )));
652        }
653        let canonical_parent = parent.canonicalize().map_err(|e| {
654            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
655        })?;
656        // Reconstruct the full path with the filename
657        if let Some(filename) = target_path.file_name() {
658            canonical_parent.join(filename)
659        } else {
660            return Err(CamelError::ProcessorError(
661                "Invalid target path: no filename".to_string(),
662            ));
663        }
664    } else {
665        return Err(CamelError::ProcessorError(
666            "Invalid target path: no parent directory".to_string(),
667        ));
668    };
669
670    if !canonical_target.starts_with(&canonical_base) {
671        return Err(CamelError::ProcessorError(format!(
672            "Path '{}' is outside base directory '{}'",
673            canonical_target.display(),
674            canonical_base.display()
675        )));
676    }
677
678    Ok(())
679}
680
681// ---------------------------------------------------------------------------
682// FileProducer
683// ---------------------------------------------------------------------------
684
685#[derive(Clone)]
686struct FileProducer {
687    config: FileConfig,
688}
689
690impl FileProducer {
691    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
692        if let Some(name) = exchange
693            .input
694            .header("CamelFileName")
695            .and_then(|v| v.as_str())
696        {
697            return Ok(name.to_string());
698        }
699        if let Some(ref name) = config.file_name {
700            return Ok(name.clone());
701        }
702        Err(CamelError::ProcessorError(
703            "No filename specified: set CamelFileName header or fileName option".to_string(),
704        ))
705    }
706}
707
708impl Service<Exchange> for FileProducer {
709    type Response = Exchange;
710    type Error = CamelError;
711    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
712
713    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
714        Poll::Ready(Ok(()))
715    }
716
717    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
718        let config = self.config.clone();
719
720        Box::pin(async move {
721            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
722            let body = exchange.input.body.clone();
723
724            let dir_path = std::path::Path::new(&config.directory);
725            let target_path = dir_path.join(&file_name);
726
727            // 1. Auto-create directories
728            if config.auto_create
729                && let Some(parent) = target_path.parent()
730            {
731                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
732                    .await
733                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
734                    .map_err(CamelError::from)?;
735            }
736
737            // 2. Security: validate path is within base directory
738            validate_path_is_within_base(dir_path, &target_path)?;
739
740            // 3. Handle file-exist strategy
741            match config.file_exist {
742                FileExistStrategy::Fail if target_path.exists() => {
743                    return Err(CamelError::ProcessorError(format!(
744                        "File already exists: {}",
745                        target_path.display()
746                    )));
747                }
748                FileExistStrategy::Append => {
749                    // Append: write directly without temp file (append is inherently non-atomic)
750                    let mut file = tokio::time::timeout(
751                        config.write_timeout,
752                        OpenOptions::new()
753                            .append(true)
754                            .create(true)
755                            .open(&target_path),
756                    )
757                    .await
758                    .map_err(|_| {
759                        CamelError::ProcessorError("Timeout opening file for append".into())
760                    })?
761                    .map_err(CamelError::from)?;
762
763                    tokio::time::timeout(
764                        config.write_timeout,
765                        io::copy(&mut body.into_async_read(), &mut file),
766                    )
767                    .await
768                    .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
769                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
770
771                    file.flush().await.map_err(CamelError::from)?;
772                }
773                _ => {
774                    // Override (or Fail when file doesn't exist): always atomic via temp file
775                    let temp_name = if let Some(ref prefix) = config.temp_prefix {
776                        format!("{prefix}{file_name}")
777                    } else {
778                        format!(".tmp.{file_name}")
779                    };
780                    let temp_path = dir_path.join(&temp_name);
781
782                    // RAII guard ensures cleanup even on panic
783                    let mut guard = TempFileGuard::new(temp_path.clone());
784
785                    // Write to temp file
786                    let mut file =
787                        tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
788                            .await
789                            .map_err(|_| {
790                                CamelError::ProcessorError("Timeout creating temp file".into())
791                            })?
792                            .map_err(CamelError::from)?;
793
794                    let copy_result = tokio::time::timeout(
795                        config.write_timeout,
796                        io::copy(&mut body.into_async_read(), &mut file),
797                    )
798                    .await;
799
800                    // Flush any kernel buffers (best-effort; actual write errors come from io::copy above)
801                    let _ = file.flush().await;
802
803                    match copy_result {
804                        Ok(Ok(_)) => {}
805                        Ok(Err(e)) => {
806                            // Guard will clean up temp file on drop
807                            return Err(CamelError::ProcessorError(e.to_string()));
808                        }
809                        Err(_) => {
810                            // Guard will clean up temp file on drop
811                            return Err(CamelError::ProcessorError("Timeout writing file".into()));
812                        }
813                    }
814
815                    // Atomic rename: temp → target
816                    let rename_result = tokio::time::timeout(
817                        config.write_timeout,
818                        fs::rename(&temp_path, &target_path),
819                    )
820                    .await;
821
822                    match rename_result {
823                        Ok(Ok(_)) => {
824                            // Success — disarm guard so it doesn't delete the renamed file
825                            guard.disarm();
826                        }
827                        Ok(Err(e)) => {
828                            // Guard will clean up temp file on drop
829                            return Err(CamelError::from(e));
830                        }
831                        Err(_) => {
832                            // Guard will clean up temp file on drop
833                            return Err(CamelError::ProcessorError("Timeout renaming file".into()));
834                        }
835                    }
836                }
837            }
838
839            // 4. Set output header
840            let abs_path = target_path
841                .canonicalize()
842                .unwrap_or_else(|_| target_path.clone())
843                .to_string_lossy()
844                .to_string();
845            exchange
846                .input
847                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
848
849            debug!(
850                file = %target_path.display(),
851                correlation_id = %exchange.correlation_id(),
852                "File written"
853            );
854
855            Ok(exchange)
856        })
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863    use bytes::Bytes;
864    use std::time::Duration;
865    use tokio_util::sync::CancellationToken;
866
867    fn test_producer_ctx() -> ProducerContext {
868        ProducerContext::new()
869    }
870
871    #[test]
872    fn test_file_config_defaults() {
873        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
874        assert_eq!(config.directory, "/tmp/inbox");
875        assert_eq!(config.delay, Duration::from_millis(500));
876        assert_eq!(config.initial_delay, Duration::from_millis(1000));
877        assert!(!config.noop);
878        assert!(!config.delete);
879        assert_eq!(config.move_to, Some(".camel".to_string()));
880        assert!(config.file_name.is_none());
881        assert!(config.include.is_none());
882        assert!(config.exclude.is_none());
883        assert!(!config.recursive);
884        assert_eq!(config.file_exist, FileExistStrategy::Override);
885        assert!(config.temp_prefix.is_none());
886        assert!(config.auto_create);
887        // New timeout defaults
888        assert_eq!(config.read_timeout, Duration::from_secs(30));
889        assert_eq!(config.write_timeout, Duration::from_secs(30));
890    }
891
892    #[test]
893    fn test_file_config_consumer_options() {
894        let config = FileConfig::from_uri(
895            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
896        ).unwrap();
897        assert_eq!(config.directory, "/data/input");
898        assert_eq!(config.delay, Duration::from_millis(1000));
899        assert_eq!(config.initial_delay, Duration::from_millis(2000));
900        assert!(config.noop);
901        assert!(config.recursive);
902        assert_eq!(config.include, Some(".*\\.csv".to_string()));
903    }
904
905    #[test]
906    fn test_file_config_producer_options() {
907        let config = FileConfig::from_uri(
908            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
909        )
910        .unwrap();
911        assert_eq!(config.file_exist, FileExistStrategy::Append);
912        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
913        assert!(!config.auto_create);
914        assert_eq!(config.file_name, Some("out.txt".to_string()));
915    }
916
917    #[test]
918    fn test_file_config_delete_mode() {
919        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
920        assert!(config.delete);
921        assert!(config.move_to.is_none());
922    }
923
924    #[test]
925    fn test_file_config_noop_mode() {
926        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
927        assert!(config.noop);
928        assert!(config.move_to.is_none());
929    }
930
931    #[test]
932    fn test_file_config_wrong_scheme() {
933        let result = FileConfig::from_uri("timer:tick");
934        assert!(result.is_err());
935    }
936
937    #[test]
938    fn test_file_component_scheme() {
939        let component = FileComponent::new();
940        assert_eq!(component.scheme(), "file");
941    }
942
943    #[test]
944    fn test_file_component_creates_endpoint() {
945        let component = FileComponent::new();
946        let endpoint = component.create_endpoint("file:/tmp/test");
947        assert!(endpoint.is_ok());
948    }
949
950    // -----------------------------------------------------------------------
951    // Consumer tests
952    // -----------------------------------------------------------------------
953
954    #[tokio::test]
955    async fn test_file_consumer_reads_files() {
956        let dir = tempfile::tempdir().unwrap();
957        let dir_path = dir.path().to_str().unwrap();
958
959        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
960        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
961
962        let component = FileComponent::new();
963        let endpoint = component
964            .create_endpoint(&format!(
965                "file:{dir_path}?noop=true&initialDelay=0&delay=100"
966            ))
967            .unwrap();
968        let mut consumer = endpoint.create_consumer().unwrap();
969
970        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
971        let token = CancellationToken::new();
972        let ctx = ConsumerContext::new(tx, token.clone());
973
974        tokio::spawn(async move {
975            consumer.start(ctx).await.unwrap();
976        });
977
978        let mut received = Vec::new();
979        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
980            while let Some(envelope) = rx.recv().await {
981                received.push(envelope.exchange);
982                if received.len() == 2 {
983                    break;
984                }
985            }
986        })
987        .await;
988        token.cancel();
989
990        assert!(timeout.is_ok(), "Should have received 2 exchanges");
991        assert_eq!(received.len(), 2);
992
993        for ex in &received {
994            assert!(ex.input.header("CamelFileName").is_some());
995            assert!(ex.input.header("CamelFileNameOnly").is_some());
996            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
997            assert!(ex.input.header("CamelFileLength").is_some());
998            assert!(ex.input.header("CamelFileLastModified").is_some());
999        }
1000    }
1001
1002    #[tokio::test]
1003    async fn noop_second_poll_does_not_re_emit_seen_files() {
1004        let dir = tempfile::tempdir().unwrap();
1005        let file_path = dir.path().join("test.txt");
1006        tokio::fs::write(&file_path, b"hello").await.unwrap();
1007
1008        let uri = format!(
1009            "file:{}?noop=true&initialDelay=0&delay=50",
1010            dir.path().display()
1011        );
1012        let config = FileConfig::from_uri(&uri).unwrap();
1013        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1014        let token = CancellationToken::new();
1015        let ctx = ConsumerContext::new(tx, token);
1016
1017        let include_re = None;
1018        let exclude_re = None;
1019        let mut seen = std::collections::HashSet::new();
1020
1021        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1022            .await
1023            .unwrap();
1024        assert!(rx.try_recv().is_ok(), "first poll should emit file");
1025        assert!(rx.try_recv().is_err(), "should only emit once");
1026
1027        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1028            .await
1029            .unwrap();
1030        assert!(
1031            rx.try_recv().is_err(),
1032            "second poll should not re-emit seen file"
1033        );
1034    }
1035
1036    #[tokio::test]
1037    async fn noop_new_files_picked_up_after_first_poll() {
1038        let dir = tempfile::tempdir().unwrap();
1039        let file1 = dir.path().join("a.txt");
1040        tokio::fs::write(&file1, b"a").await.unwrap();
1041
1042        let uri = format!(
1043            "file:{}?noop=true&initialDelay=0&delay=50",
1044            dir.path().display()
1045        );
1046        let config = FileConfig::from_uri(&uri).unwrap();
1047        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1048        let token = CancellationToken::new();
1049        let ctx = ConsumerContext::new(tx, token);
1050
1051        let include_re = None;
1052        let exclude_re = None;
1053        let mut seen = std::collections::HashSet::new();
1054
1055        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1056            .await
1057            .unwrap();
1058        let _ = rx.try_recv();
1059
1060        let file2 = dir.path().join("b.txt");
1061        tokio::fs::write(&file2, b"b").await.unwrap();
1062
1063        poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1064            .await
1065            .unwrap();
1066        assert!(
1067            rx.try_recv().is_ok(),
1068            "b.txt should be emitted on second poll"
1069        );
1070        assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_file_consumer_include_filter() {
1075        let dir = tempfile::tempdir().unwrap();
1076        let dir_path = dir.path().to_str().unwrap();
1077
1078        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1079        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1080
1081        let component = FileComponent::new();
1082        let endpoint = component
1083            .create_endpoint(&format!(
1084                "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
1085            ))
1086            .unwrap();
1087        let mut consumer = endpoint.create_consumer().unwrap();
1088
1089        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1090        let token = CancellationToken::new();
1091        let ctx = ConsumerContext::new(tx, token.clone());
1092
1093        tokio::spawn(async move {
1094            consumer.start(ctx).await.unwrap();
1095        });
1096
1097        let mut received = Vec::new();
1098        let _ = tokio::time::timeout(Duration::from_millis(500), async {
1099            while let Some(envelope) = rx.recv().await {
1100                received.push(envelope.exchange);
1101                if received.len() == 1 {
1102                    break;
1103                }
1104            }
1105        })
1106        .await;
1107        token.cancel();
1108
1109        assert_eq!(received.len(), 1);
1110        let name = received[0]
1111            .input
1112            .header("CamelFileNameOnly")
1113            .and_then(|v| v.as_str())
1114            .unwrap();
1115        assert_eq!(name, "data.csv");
1116    }
1117
1118    #[tokio::test]
1119    async fn test_file_consumer_delete_mode() {
1120        let dir = tempfile::tempdir().unwrap();
1121        let dir_path = dir.path().to_str().unwrap();
1122
1123        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1124
1125        let component = FileComponent::new();
1126        let endpoint = component
1127            .create_endpoint(&format!(
1128                "file:{dir_path}?delete=true&initialDelay=0&delay=100"
1129            ))
1130            .unwrap();
1131        let mut consumer = endpoint.create_consumer().unwrap();
1132
1133        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1134        let token = CancellationToken::new();
1135        let ctx = ConsumerContext::new(tx, token.clone());
1136
1137        tokio::spawn(async move {
1138            consumer.start(ctx).await.unwrap();
1139        });
1140
1141        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1142        token.cancel();
1143
1144        tokio::time::sleep(Duration::from_millis(100)).await;
1145
1146        assert!(
1147            !dir.path().join("deleteme.txt").exists(),
1148            "File should be deleted"
1149        );
1150    }
1151
1152    #[tokio::test]
1153    async fn test_file_consumer_move_mode() {
1154        let dir = tempfile::tempdir().unwrap();
1155        let dir_path = dir.path().to_str().unwrap();
1156
1157        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1158
1159        let component = FileComponent::new();
1160        let endpoint = component
1161            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
1162            .unwrap();
1163        let mut consumer = endpoint.create_consumer().unwrap();
1164
1165        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1166        let token = CancellationToken::new();
1167        let ctx = ConsumerContext::new(tx, token.clone());
1168
1169        tokio::spawn(async move {
1170            consumer.start(ctx).await.unwrap();
1171        });
1172
1173        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1174        token.cancel();
1175
1176        tokio::time::sleep(Duration::from_millis(100)).await;
1177
1178        assert!(
1179            !dir.path().join("moveme.txt").exists(),
1180            "Original file should be gone"
1181        );
1182        assert!(
1183            dir.path().join(".camel").join("moveme.txt").exists(),
1184            "File should be in .camel/"
1185        );
1186    }
1187
1188    #[tokio::test]
1189    async fn test_file_consumer_respects_cancellation() {
1190        let dir = tempfile::tempdir().unwrap();
1191        let dir_path = dir.path().to_str().unwrap();
1192
1193        let component = FileComponent::new();
1194        let endpoint = component
1195            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1196            .unwrap();
1197        let mut consumer = endpoint.create_consumer().unwrap();
1198
1199        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1200        let token = CancellationToken::new();
1201        let ctx = ConsumerContext::new(tx, token.clone());
1202
1203        let handle = tokio::spawn(async move {
1204            consumer.start(ctx).await.unwrap();
1205        });
1206
1207        tokio::time::sleep(Duration::from_millis(150)).await;
1208        token.cancel();
1209
1210        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1211        assert!(
1212            result.is_ok(),
1213            "Consumer should have stopped after cancellation"
1214        );
1215    }
1216
1217    // -----------------------------------------------------------------------
1218    // Producer tests
1219    // -----------------------------------------------------------------------
1220
1221    #[tokio::test]
1222    async fn test_file_producer_writes_file() {
1223        use tower::ServiceExt;
1224
1225        let dir = tempfile::tempdir().unwrap();
1226        let dir_path = dir.path().to_str().unwrap();
1227
1228        let component = FileComponent::new();
1229        let endpoint = component
1230            .create_endpoint(&format!("file:{dir_path}"))
1231            .unwrap();
1232        let ctx = test_producer_ctx();
1233        let producer = endpoint.create_producer(&ctx).unwrap();
1234
1235        let mut exchange = Exchange::new(Message::new("file content"));
1236        exchange.input.set_header(
1237            "CamelFileName",
1238            serde_json::Value::String("output.txt".to_string()),
1239        );
1240
1241        let result = producer.oneshot(exchange).await.unwrap();
1242
1243        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1244        assert_eq!(content, "file content");
1245
1246        assert!(result.input.header("CamelFileNameProduced").is_some());
1247    }
1248
1249    #[tokio::test]
1250    async fn test_file_producer_auto_create_dirs() {
1251        use tower::ServiceExt;
1252
1253        let dir = tempfile::tempdir().unwrap();
1254        let dir_path = dir.path().to_str().unwrap();
1255
1256        let component = FileComponent::new();
1257        let endpoint = component
1258            .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1259            .unwrap();
1260        let ctx = test_producer_ctx();
1261        let producer = endpoint.create_producer(&ctx).unwrap();
1262
1263        let mut exchange = Exchange::new(Message::new("nested"));
1264        exchange.input.set_header(
1265            "CamelFileName",
1266            serde_json::Value::String("file.txt".to_string()),
1267        );
1268
1269        producer.oneshot(exchange).await.unwrap();
1270
1271        assert!(dir.path().join("sub/dir/file.txt").exists());
1272    }
1273
1274    #[tokio::test]
1275    async fn test_file_producer_file_exist_fail() {
1276        use tower::ServiceExt;
1277
1278        let dir = tempfile::tempdir().unwrap();
1279        let dir_path = dir.path().to_str().unwrap();
1280
1281        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1282
1283        let component = FileComponent::new();
1284        let endpoint = component
1285            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1286            .unwrap();
1287        let ctx = test_producer_ctx();
1288        let producer = endpoint.create_producer(&ctx).unwrap();
1289
1290        let mut exchange = Exchange::new(Message::new("new"));
1291        exchange.input.set_header(
1292            "CamelFileName",
1293            serde_json::Value::String("existing.txt".to_string()),
1294        );
1295
1296        let result = producer.oneshot(exchange).await;
1297        assert!(
1298            result.is_err(),
1299            "Should fail when file exists with Fail strategy"
1300        );
1301    }
1302
1303    #[tokio::test]
1304    async fn test_file_producer_file_exist_append() {
1305        use tower::ServiceExt;
1306
1307        let dir = tempfile::tempdir().unwrap();
1308        let dir_path = dir.path().to_str().unwrap();
1309
1310        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1311
1312        let component = FileComponent::new();
1313        let endpoint = component
1314            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1315            .unwrap();
1316        let ctx = test_producer_ctx();
1317        let producer = endpoint.create_producer(&ctx).unwrap();
1318
1319        let mut exchange = Exchange::new(Message::new("new"));
1320        exchange.input.set_header(
1321            "CamelFileName",
1322            serde_json::Value::String("append.txt".to_string()),
1323        );
1324
1325        producer.oneshot(exchange).await.unwrap();
1326
1327        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1328        assert_eq!(content, "oldnew");
1329    }
1330
1331    #[tokio::test]
1332    async fn test_file_producer_temp_prefix() {
1333        use tower::ServiceExt;
1334
1335        let dir = tempfile::tempdir().unwrap();
1336        let dir_path = dir.path().to_str().unwrap();
1337
1338        let component = FileComponent::new();
1339        let endpoint = component
1340            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1341            .unwrap();
1342        let ctx = test_producer_ctx();
1343        let producer = endpoint.create_producer(&ctx).unwrap();
1344
1345        let mut exchange = Exchange::new(Message::new("atomic write"));
1346        exchange.input.set_header(
1347            "CamelFileName",
1348            serde_json::Value::String("final.txt".to_string()),
1349        );
1350
1351        producer.oneshot(exchange).await.unwrap();
1352
1353        assert!(dir.path().join("final.txt").exists());
1354        assert!(!dir.path().join(".tmpfinal.txt").exists());
1355        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1356        assert_eq!(content, "atomic write");
1357    }
1358
1359    #[tokio::test]
1360    async fn test_file_producer_uses_filename_option() {
1361        use tower::ServiceExt;
1362
1363        let dir = tempfile::tempdir().unwrap();
1364        let dir_path = dir.path().to_str().unwrap();
1365
1366        let component = FileComponent::new();
1367        let endpoint = component
1368            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1369            .unwrap();
1370        let ctx = test_producer_ctx();
1371        let producer = endpoint.create_producer(&ctx).unwrap();
1372
1373        let exchange = Exchange::new(Message::new("content"));
1374
1375        producer.oneshot(exchange).await.unwrap();
1376        assert!(dir.path().join("fixed.txt").exists());
1377    }
1378
1379    #[tokio::test]
1380    async fn test_file_producer_no_filename_errors() {
1381        use tower::ServiceExt;
1382
1383        let dir = tempfile::tempdir().unwrap();
1384        let dir_path = dir.path().to_str().unwrap();
1385
1386        let component = FileComponent::new();
1387        let endpoint = component
1388            .create_endpoint(&format!("file:{dir_path}"))
1389            .unwrap();
1390        let ctx = test_producer_ctx();
1391        let producer = endpoint.create_producer(&ctx).unwrap();
1392
1393        let exchange = Exchange::new(Message::new("content"));
1394
1395        let result = producer.oneshot(exchange).await;
1396        assert!(result.is_err(), "Should error when no filename is provided");
1397    }
1398
1399    // -----------------------------------------------------------------------
1400    // Security tests - Path traversal protection
1401    // -----------------------------------------------------------------------
1402
1403    #[tokio::test]
1404    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1405        use tower::ServiceExt;
1406
1407        let dir = tempfile::tempdir().unwrap();
1408        let dir_path = dir.path().to_str().unwrap();
1409
1410        // Create a subdirectory
1411        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1412        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1413
1414        let component = FileComponent::new();
1415        let endpoint = component
1416            .create_endpoint(&format!("file:{dir_path}/subdir"))
1417            .unwrap();
1418        let ctx = test_producer_ctx();
1419        let producer = endpoint.create_producer(&ctx).unwrap();
1420
1421        let mut exchange = Exchange::new(Message::new("malicious"));
1422        exchange.input.set_header(
1423            "CamelFileName",
1424            serde_json::Value::String("../secret.txt".to_string()),
1425        );
1426
1427        let result = producer.oneshot(exchange).await;
1428        assert!(result.is_err(), "Should reject path traversal attempt");
1429
1430        let err = result.unwrap_err();
1431        assert!(
1432            err.to_string().contains("outside"),
1433            "Error should mention path is outside base directory"
1434        );
1435    }
1436
1437    #[tokio::test]
1438    async fn test_file_producer_rejects_absolute_path_outside_base() {
1439        use tower::ServiceExt;
1440
1441        let dir = tempfile::tempdir().unwrap();
1442        let dir_path = dir.path().to_str().unwrap();
1443
1444        let component = FileComponent::new();
1445        let endpoint = component
1446            .create_endpoint(&format!("file:{dir_path}"))
1447            .unwrap();
1448        let ctx = test_producer_ctx();
1449        let producer = endpoint.create_producer(&ctx).unwrap();
1450
1451        let mut exchange = Exchange::new(Message::new("malicious"));
1452        exchange.input.set_header(
1453            "CamelFileName",
1454            serde_json::Value::String("/etc/passwd".to_string()),
1455        );
1456
1457        let result = producer.oneshot(exchange).await;
1458        assert!(result.is_err(), "Should reject absolute path outside base");
1459    }
1460
1461    // -----------------------------------------------------------------------
1462    // Large file streaming tests
1463    // -----------------------------------------------------------------------
1464
1465    #[tokio::test]
1466    #[ignore] // Slow test - run with --ignored flag
1467    async fn test_large_file_streaming_constant_memory() {
1468        use std::io::Write;
1469        use tempfile::NamedTempFile;
1470
1471        // Create a 150MB file (larger than 100MB limit)
1472        let mut temp_file = NamedTempFile::new().unwrap();
1473        let file_size = 150 * 1024 * 1024; // 150MB
1474        let chunk = vec![b'X'; 1024 * 1024]; // 1MB chunk
1475
1476        for _ in 0..150 {
1477            temp_file.write_all(&chunk).unwrap();
1478        }
1479        temp_file.flush().unwrap();
1480
1481        let dir = temp_file.path().parent().unwrap();
1482        let dir_path = dir.to_str().unwrap();
1483        let file_name = temp_file
1484            .path()
1485            .file_name()
1486            .unwrap()
1487            .to_str()
1488            .unwrap()
1489            .to_string();
1490
1491        // Read file as stream (should succeed with lazy evaluation)
1492        let component = FileComponent::new();
1493        let endpoint = component
1494            .create_endpoint(&format!(
1495                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1496            ))
1497            .unwrap();
1498        let mut consumer = endpoint.create_consumer().unwrap();
1499
1500        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1501        let token = CancellationToken::new();
1502        let ctx = ConsumerContext::new(tx, token.clone());
1503
1504        tokio::spawn(async move {
1505            let _ = consumer.start(ctx).await;
1506        });
1507
1508        let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1509            rx.recv().await.unwrap().exchange
1510        })
1511        .await
1512        .expect("Should receive exchange");
1513        token.cancel();
1514
1515        // Verify body is a stream (not materialized)
1516        assert!(matches!(exchange.input.body, Body::Stream(_)));
1517
1518        // Verify we can read metadata without consuming
1519        if let Body::Stream(ref stream_body) = exchange.input.body {
1520            assert!(stream_body.metadata.size_hint.is_some());
1521            let size = stream_body.metadata.size_hint.unwrap();
1522            assert_eq!(size, file_size as u64);
1523        }
1524
1525        // Materializing should fail (exceeds 100MB limit)
1526        if let Body::Stream(stream_body) = exchange.input.body {
1527            let body = Body::Stream(stream_body);
1528            let result = body.into_bytes(100 * 1024 * 1024).await;
1529            assert!(result.is_err());
1530        }
1531
1532        // But we CAN read chunks one at a time (simulating line-by-line processing)
1533        // This demonstrates lazy evaluation - we don't need to load entire file
1534        let component2 = FileComponent::new();
1535        let endpoint2 = component2
1536            .create_endpoint(&format!(
1537                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1538            ))
1539            .unwrap();
1540        let mut consumer2 = endpoint2.create_consumer().unwrap();
1541
1542        let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1543        let token2 = CancellationToken::new();
1544        let ctx2 = ConsumerContext::new(tx2, token2.clone());
1545
1546        tokio::spawn(async move {
1547            let _ = consumer2.start(ctx2).await;
1548        });
1549
1550        let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1551            rx2.recv().await.unwrap().exchange
1552        })
1553        .await
1554        .expect("Should receive exchange");
1555        token2.cancel();
1556
1557        if let Body::Stream(stream_body) = exchange2.input.body {
1558            let mut stream_lock = stream_body.stream.lock().await;
1559            let mut stream = stream_lock.take().unwrap();
1560
1561            // Read first chunk (size varies based on ReaderStream's buffer)
1562            if let Some(chunk_result) = stream.next().await {
1563                let chunk = chunk_result.unwrap();
1564                assert!(!chunk.is_empty());
1565                assert!(chunk.len() < file_size);
1566                // Memory usage is constant - we only have this chunk in memory, not 150MB
1567            }
1568        }
1569    }
1570
1571    // -----------------------------------------------------------------------
1572    // Streaming producer tests
1573    // -----------------------------------------------------------------------
1574
1575    #[tokio::test]
1576    async fn test_producer_writes_stream_body() {
1577        let dir = tempfile::tempdir().unwrap();
1578        let dir_path = dir.path().to_str().unwrap();
1579        let uri = format!("file:{dir_path}?fileName=out.txt");
1580
1581        let component = FileComponent::new();
1582        let endpoint = component.create_endpoint(&uri).unwrap();
1583        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1584
1585        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1586            Ok(Bytes::from("hello ")),
1587            Ok(Bytes::from("streaming ")),
1588            Ok(Bytes::from("world")),
1589        ];
1590        let stream = futures::stream::iter(chunks);
1591        let body = Body::Stream(camel_api::body::StreamBody {
1592            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1593            metadata: camel_api::body::StreamMetadata {
1594                size_hint: None,
1595                content_type: None,
1596                origin: None,
1597            },
1598        });
1599
1600        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1601        tower::ServiceExt::oneshot(producer, exchange)
1602            .await
1603            .unwrap();
1604
1605        let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1606            .await
1607            .unwrap();
1608        assert_eq!(content, "hello streaming world");
1609    }
1610
1611    #[tokio::test]
1612    async fn test_producer_stream_atomic_no_partial_on_error() {
1613        // If the stream errors mid-write, no file should exist at the target path
1614        let dir = tempfile::tempdir().unwrap();
1615        let dir_path = dir.path().to_str().unwrap();
1616        let uri = format!("file:{dir_path}?fileName=out.txt");
1617
1618        let component = FileComponent::new();
1619        let endpoint = component.create_endpoint(&uri).unwrap();
1620        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1621
1622        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1623            Ok(Bytes::from("partial")),
1624            Err(CamelError::ProcessorError(
1625                "simulated stream error".to_string(),
1626            )),
1627        ];
1628        let stream = futures::stream::iter(chunks);
1629        let body = Body::Stream(camel_api::body::StreamBody {
1630            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1631            metadata: camel_api::body::StreamMetadata {
1632                size_hint: None,
1633                content_type: None,
1634                origin: None,
1635            },
1636        });
1637
1638        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1639        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1640        assert!(
1641            result.is_err(),
1642            "expected error when stream fails mid-write"
1643        );
1644
1645        // Target file must NOT exist — write was aborted and temp file cleaned up
1646        assert!(
1647            !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1648            "partial file must not exist after failed write"
1649        );
1650
1651        // Temp file must also be cleaned up
1652        assert!(
1653            !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1654            "temp file must be cleaned up after failed write"
1655        );
1656    }
1657
1658    #[tokio::test]
1659    async fn test_producer_stream_append() {
1660        let dir = tempfile::tempdir().unwrap();
1661        let dir_path = dir.path().to_str().unwrap();
1662        let target = format!("{dir_path}/out.txt");
1663
1664        // Pre-create file with initial content
1665        tokio::fs::write(&target, b"line1\n").await.unwrap();
1666
1667        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1668        let component = FileComponent::new();
1669        let endpoint = component.create_endpoint(&uri).unwrap();
1670        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1671
1672        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1673        let stream = futures::stream::iter(chunks);
1674        let body = Body::Stream(camel_api::body::StreamBody {
1675            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1676            metadata: camel_api::body::StreamMetadata {
1677                size_hint: None,
1678                content_type: None,
1679                origin: None,
1680            },
1681        });
1682
1683        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1684        tower::ServiceExt::oneshot(producer, exchange)
1685            .await
1686            .unwrap();
1687
1688        let content = tokio::fs::read_to_string(&target).await.unwrap();
1689        assert_eq!(content, "line1\nline2\n");
1690    }
1691
1692    #[tokio::test]
1693    async fn test_producer_stream_append_partial_on_error() {
1694        // Append is inherently non-atomic: if the stream errors mid-write,
1695        // the file will contain partial data. This test documents that behavior.
1696        let dir = tempfile::tempdir().unwrap();
1697        let dir_path = dir.path().to_str().unwrap();
1698        let target = format!("{dir_path}/out.txt");
1699
1700        // Pre-create file with initial content
1701        tokio::fs::write(&target, b"initial\n").await.unwrap();
1702
1703        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1704        let component = FileComponent::new();
1705        let endpoint = component.create_endpoint(&uri).unwrap();
1706        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1707
1708        // Stream with an error in the middle
1709        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1710            Ok(Bytes::from("partial-")), // This will be written
1711            Err(CamelError::ProcessorError("stream error".to_string())), // This causes failure
1712            Ok(Bytes::from("never-written")), // This won't be reached
1713        ];
1714        let stream = futures::stream::iter(chunks);
1715        let body = Body::Stream(camel_api::body::StreamBody {
1716            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1717            metadata: camel_api::body::StreamMetadata {
1718                size_hint: None,
1719                content_type: None,
1720                origin: None,
1721            },
1722        });
1723
1724        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1725        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1726
1727        // 1. Producer must return an error
1728        assert!(
1729            result.is_err(),
1730            "expected error when stream fails during append"
1731        );
1732
1733        // 2. File must contain initial content + partial data written before the error
1734        let content = tokio::fs::read_to_string(&target).await.unwrap();
1735        assert_eq!(
1736            content, "initial\npartial-",
1737            "append leaves partial data on stream error (non-atomic by nature)"
1738        );
1739    }
1740
1741    #[tokio::test]
1742    async fn test_producer_stream_already_consumed_errors() {
1743        let dir = tempfile::tempdir().unwrap();
1744        let dir_path = dir.path().to_str().unwrap();
1745        let uri = format!("file:{dir_path}?fileName=out.txt");
1746
1747        let component = FileComponent::new();
1748        let endpoint = component.create_endpoint(&uri).unwrap();
1749        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1750
1751        // Mutex holds None -> stream already consumed
1752        type MaybeStream = std::sync::Arc<
1753            tokio::sync::Mutex<
1754                Option<
1755                    std::pin::Pin<
1756                        Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1757                    >,
1758                >,
1759            >,
1760        >;
1761        let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1762        let body = Body::Stream(camel_api::body::StreamBody {
1763            stream: arc,
1764            metadata: camel_api::body::StreamMetadata {
1765                size_hint: None,
1766                content_type: None,
1767                origin: None,
1768            },
1769        });
1770
1771        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1772        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1773        assert!(
1774            result.is_err(),
1775            "expected error for already-consumed stream"
1776        );
1777    }
1778
1779    // -----------------------------------------------------------------------
1780    // GlobalConfig tests - apply_global_defaults behavior
1781    // -----------------------------------------------------------------------
1782
1783    #[test]
1784    fn test_global_config_applied_to_endpoint() {
1785        // Global config with non-default values
1786        let global = FileGlobalConfig::default()
1787            .with_delay_ms(2000)
1788            .with_initial_delay_ms(5000)
1789            .with_read_timeout_ms(60_000)
1790            .with_write_timeout_ms(45_000);
1791        let component = FileComponent::with_config(global);
1792        // URI uses no explicit delay/timeout params → macro defaults apply
1793        let endpoint = component.create_endpoint("file:/tmp/inbox").unwrap();
1794        // We cannot call endpoint.config directly (FileEndpoint is private),
1795        // but we can test apply_global_defaults on FileConfig directly:
1796        let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1797        let global2 = FileGlobalConfig::default()
1798            .with_delay_ms(2000)
1799            .with_initial_delay_ms(5000)
1800            .with_read_timeout_ms(60_000)
1801            .with_write_timeout_ms(45_000);
1802        config.apply_global_defaults(&global2);
1803        assert_eq!(config.delay, Duration::from_millis(2000));
1804        assert_eq!(config.initial_delay, Duration::from_millis(5000));
1805        assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1806        assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1807        // endpoint creation succeeds too
1808        let _ = endpoint; // just verify create_endpoint didn't fail
1809    }
1810
1811    #[test]
1812    fn test_uri_param_wins_over_global_config() {
1813        // URI explicitly sets delay=1000 (NOT the 500ms macro default)
1814        let mut config =
1815            FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1816        // Global config would want 3000ms delay
1817        let global = FileGlobalConfig::default()
1818            .with_delay_ms(3000)
1819            .with_initial_delay_ms(4000);
1820        config.apply_global_defaults(&global);
1821        // URI value of 1000ms must be preserved (not replaced by 3000ms)
1822        assert_eq!(config.delay, Duration::from_millis(1000));
1823        // URI value of 2000ms must be preserved (not replaced by 4000ms)
1824        assert_eq!(config.initial_delay, Duration::from_millis(2000));
1825        // read_timeout was not set by URI → macro default (30000) → global wins if different
1826        // (read_timeout stays at 30000 since global has same default = 30000)
1827        assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1828    }
1829}