Skip to main content

camel_component_file/
lib.rs

1use std::future::Future;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use regex::Regex;
11use tokio::fs;
12use tokio::fs::OpenOptions;
13use tokio::io;
14use tokio::io::AsyncWriteExt;
15use tokio::time;
16use tokio_util::io::ReaderStream;
17use tower::Service;
18use tracing::{debug, warn};
19
20use camel_api::{
21    BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
22};
23use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
24use camel_endpoint::{UriConfig, parse_uri};
25
26// ---------------------------------------------------------------------------
27// TempFileGuard — RAII cleanup for temp files (panic-safe)
28// ---------------------------------------------------------------------------
29
30/// RAII guard that ensures temp file cleanup even on panic.
31///
32/// When dropped, removes the file at `path` unless `disarm` is set to true.
33/// This protects against temp file leaks if `io::copy` panics mid-write.
34struct TempFileGuard {
35    path: PathBuf,
36    disarm: bool,
37}
38
39impl TempFileGuard {
40    fn new(path: PathBuf) -> Self {
41        Self {
42            path,
43            disarm: false,
44        }
45    }
46
47    /// Call after successful rename to prevent cleanup.
48    fn disarm(&mut self) {
49        self.disarm = true;
50    }
51}
52
53impl Drop for TempFileGuard {
54    fn drop(&mut self) {
55        if !self.disarm {
56            // Best-effort cleanup; ignore errors (file may not exist)
57            let _ = std::fs::remove_file(&self.path);
58        }
59    }
60}
61
62// ---------------------------------------------------------------------------
63// FileExistStrategy
64// ---------------------------------------------------------------------------
65
66/// Strategy for handling existing files when writing.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum FileExistStrategy {
69    /// Overwrite existing file (default).
70    #[default]
71    Override,
72    /// Append to existing file.
73    Append,
74    /// Fail if file exists.
75    Fail,
76}
77
78impl FromStr for FileExistStrategy {
79    type Err = String;
80
81    fn from_str(s: &str) -> Result<Self, Self::Err> {
82        match s {
83            "Override" | "override" => Ok(FileExistStrategy::Override),
84            "Append" | "append" => Ok(FileExistStrategy::Append),
85            "Fail" | "fail" => Ok(FileExistStrategy::Fail),
86            _ => Ok(FileExistStrategy::Override), // Default for unknown values
87        }
88    }
89}
90
91// ---------------------------------------------------------------------------
92// FileGlobalConfig
93// ---------------------------------------------------------------------------
94
95/// Global configuration for File component.
96/// Plain Rust, no serde, with Default impl and builder methods.
97/// These are the fallback defaults when URI params are not set.
98#[derive(Debug, Clone, PartialEq)]
99pub struct FileGlobalConfig {
100    pub delay_ms: u64,
101    pub initial_delay_ms: u64,
102    pub read_timeout_ms: u64,
103    pub write_timeout_ms: u64,
104}
105
106impl Default for FileGlobalConfig {
107    fn default() -> Self {
108        Self {
109            delay_ms: 500,
110            initial_delay_ms: 1_000,
111            read_timeout_ms: 30_000,
112            write_timeout_ms: 30_000,
113        }
114    }
115}
116
117impl FileGlobalConfig {
118    pub fn new() -> Self {
119        Self::default()
120    }
121    pub fn with_delay_ms(mut self, v: u64) -> Self {
122        self.delay_ms = v;
123        self
124    }
125    pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
126        self.initial_delay_ms = v;
127        self
128    }
129    pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
130        self.read_timeout_ms = v;
131        self
132    }
133    pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
134        self.write_timeout_ms = v;
135        self
136    }
137}
138
139// ---------------------------------------------------------------------------
140// FileConfig
141// ---------------------------------------------------------------------------
142
143/// Configuration for file component endpoints.
144///
145/// # Streaming
146///
147/// Both the file consumer and producer use **native streaming** with no RAM
148/// materialization:
149///
150/// - The **consumer** creates a `Body::Stream` backed by `tokio::fs::File` via
151///   `ReaderStream`. Files of any size are handled without loading them into memory.
152///
153/// - The **producer** writes via `tokio::io::copy` directly to a `tokio::fs::File`
154///   using `Body::into_async_read()`. Writes for the `Override` strategy are
155///   **atomic**: data is written to a temporary file first and renamed only on
156///   success, preventing partial files on failure.
157///
158/// # Write strategies (`fileExist` URI parameter)
159///
160/// | Value | Behavior |
161/// |-------|----------|
162/// | `Override` (default) | Atomic write via temp file + rename |
163/// | `Append` | Appends to existing file; non-atomic by nature |
164/// | `Fail` | Returns error if file already exists |
165#[derive(Debug, Clone, UriConfig)]
166#[uri_scheme = "file"]
167#[uri_config(skip_impl)]
168pub struct FileConfig {
169    /// Directory path to read from or write to.
170    pub directory: String,
171
172    /// Polling delay in milliseconds (companion field for `delay`).
173    #[allow(dead_code)]
174    #[uri_param(name = "delay", default = "500")]
175    delay_ms: u64,
176
177    /// Polling delay as Duration.
178    pub delay: Duration,
179
180    /// Initial delay in milliseconds (companion field for `initial_delay`).
181    #[allow(dead_code)]
182    #[uri_param(name = "initialDelay", default = "1000")]
183    initial_delay_ms: u64,
184
185    /// Initial delay as Duration.
186    pub initial_delay: Duration,
187
188    /// If true, don't delete or move files after processing.
189    #[uri_param(default = "false")]
190    pub noop: bool,
191
192    /// If true, delete files after processing.
193    #[uri_param(default = "false")]
194    pub delete: bool,
195
196    /// Directory to move processed files to (only if not noop/delete).
197    /// Default is ".camel" when not specified and noop/delete are false.
198    #[uri_param(name = "move")]
199    move_to: Option<String>,
200
201    /// Fixed filename for producer (optional).
202    #[uri_param(name = "fileName")]
203    pub file_name: Option<String>,
204
205    /// Regex pattern for including files (consumer).
206    #[uri_param]
207    pub include: Option<String>,
208
209    /// Regex pattern for excluding files (consumer).
210    #[uri_param]
211    pub exclude: Option<String>,
212
213    /// Whether to scan directories recursively.
214    #[uri_param(default = "false")]
215    pub recursive: bool,
216
217    /// Strategy for handling existing files when writing.
218    #[uri_param(name = "fileExist", default = "Override")]
219    pub file_exist: FileExistStrategy,
220
221    /// Prefix for temporary files during atomic writes.
222    #[uri_param(name = "tempPrefix")]
223    pub temp_prefix: Option<String>,
224
225    /// Whether to automatically create directories.
226    #[uri_param(name = "autoCreate", default = "true")]
227    pub auto_create: bool,
228
229    /// Read timeout in milliseconds (companion field for `read_timeout`).
230    #[allow(dead_code)]
231    #[uri_param(name = "readTimeout", default = "30000")]
232    read_timeout_ms: u64,
233
234    /// Read timeout as Duration.
235    pub read_timeout: Duration,
236
237    /// Write timeout in milliseconds (companion field for `write_timeout`).
238    #[allow(dead_code)]
239    #[uri_param(name = "writeTimeout", default = "30000")]
240    write_timeout_ms: u64,
241
242    /// Write timeout as Duration.
243    pub write_timeout: Duration,
244}
245
246impl UriConfig for FileConfig {
247    fn scheme() -> &'static str {
248        "file"
249    }
250
251    fn from_uri(uri: &str) -> Result<Self, CamelError> {
252        let parts = parse_uri(uri)?;
253        Self::from_components(parts)
254    }
255
256    fn from_components(parts: camel_endpoint::UriComponents) -> Result<Self, CamelError> {
257        Self::parse_uri_components(parts)?.validate()
258    }
259
260    fn validate(self) -> Result<Self, CamelError> {
261        // Apply conditional logic for move_to:
262        // - If noop or delete is true, move_to should be None
263        // - Otherwise, if move_to is None, default to ".camel"
264        let move_to = if self.noop || self.delete {
265            None
266        } else {
267            Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
268        };
269
270        Ok(Self { move_to, ..self })
271    }
272}
273
274impl FileConfig {
275    /// Apply global config defaults. Since FileConfig uses a proc macro that bakes in
276    /// defaults, we compare Duration values against the known macro defaults to detect
277    /// "not explicitly set by user". Only overrides when current value == macro default.
278    ///
279    /// **Note**: If a user explicitly sets a URI param to its default value (e.g.,
280    /// `?delay=500`), it is indistinguishable from "not set" and will be overridden
281    /// by global config. This is a known limitation of the Duration comparison approach.
282    pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
283        if self.delay == Duration::from_millis(500) {
284            self.delay = Duration::from_millis(global.delay_ms);
285        }
286        if self.initial_delay == Duration::from_millis(1_000) {
287            self.initial_delay = Duration::from_millis(global.initial_delay_ms);
288        }
289        if self.read_timeout == Duration::from_millis(30_000) {
290            self.read_timeout = Duration::from_millis(global.read_timeout_ms);
291        }
292        if self.write_timeout == Duration::from_millis(30_000) {
293            self.write_timeout = Duration::from_millis(global.write_timeout_ms);
294        }
295    }
296}
297
298// ---------------------------------------------------------------------------
299// FileComponent
300// ---------------------------------------------------------------------------
301
302pub struct FileComponent {
303    config: Option<FileGlobalConfig>,
304}
305
306impl FileComponent {
307    pub fn new() -> Self {
308        Self { config: None }
309    }
310
311    pub fn with_config(config: FileGlobalConfig) -> Self {
312        Self {
313            config: Some(config),
314        }
315    }
316
317    pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
318        Self { config }
319    }
320}
321
322impl Default for FileComponent {
323    fn default() -> Self {
324        Self::new()
325    }
326}
327
328impl Component for FileComponent {
329    fn scheme(&self) -> &str {
330        "file"
331    }
332
333    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
334        let mut config = FileConfig::from_uri(uri)?;
335        if let Some(ref global_config) = self.config {
336            config.apply_global_defaults(global_config);
337        }
338        Ok(Box::new(FileEndpoint {
339            uri: uri.to_string(),
340            config,
341        }))
342    }
343}
344
345// ---------------------------------------------------------------------------
346// FileEndpoint
347// ---------------------------------------------------------------------------
348
349struct FileEndpoint {
350    uri: String,
351    config: FileConfig,
352}
353
354impl Endpoint for FileEndpoint {
355    fn uri(&self) -> &str {
356        &self.uri
357    }
358
359    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
360        Ok(Box::new(FileConsumer {
361            config: self.config.clone(),
362        }))
363    }
364
365    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
366        Ok(BoxProcessor::new(FileProducer {
367            config: self.config.clone(),
368        }))
369    }
370}
371
372// ---------------------------------------------------------------------------
373// FileConsumer
374// ---------------------------------------------------------------------------
375
376struct FileConsumer {
377    config: FileConfig,
378}
379
380#[async_trait]
381impl Consumer for FileConsumer {
382    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
383        let config = self.config.clone();
384
385        let include_re = config
386            .include
387            .as_ref()
388            .map(|p| Regex::new(p))
389            .transpose()
390            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
391        let exclude_re = config
392            .exclude
393            .as_ref()
394            .map(|p| Regex::new(p))
395            .transpose()
396            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
397
398        if !config.initial_delay.is_zero() {
399            tokio::select! {
400                _ = time::sleep(config.initial_delay) => {}
401                _ = context.cancelled() => {
402                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
403                    return Ok(());
404                }
405            }
406        }
407
408        let mut interval = time::interval(config.delay);
409
410        loop {
411            tokio::select! {
412                _ = context.cancelled() => {
413                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
414                    break;
415                }
416                _ = interval.tick() => {
417                    if let Err(e) = poll_directory(
418                        &config,
419                        &context,
420                        &include_re,
421                        &exclude_re,
422                    ).await {
423                        warn!(directory = config.directory, error = %e, "Error polling directory");
424                    }
425                }
426            }
427        }
428
429        Ok(())
430    }
431
432    async fn stop(&mut self) -> Result<(), CamelError> {
433        Ok(())
434    }
435}
436
437async fn poll_directory(
438    config: &FileConfig,
439    context: &ConsumerContext,
440    include_re: &Option<Regex>,
441    exclude_re: &Option<Regex>,
442) -> Result<(), CamelError> {
443    let base_path = std::path::Path::new(&config.directory);
444
445    let files = list_files(base_path, config.recursive).await?;
446
447    for file_path in files {
448        let file_name = file_path
449            .file_name()
450            .and_then(|n| n.to_str())
451            .unwrap_or_default()
452            .to_string();
453
454        if let Some(ref target_name) = config.file_name
455            && file_name != *target_name
456        {
457            continue;
458        }
459
460        if let Some(re) = include_re
461            && !re.is_match(&file_name)
462        {
463            continue;
464        }
465
466        if let Some(re) = exclude_re
467            && re.is_match(&file_name)
468        {
469            continue;
470        }
471
472        if let Some(ref move_dir) = config.move_to
473            && file_path.starts_with(base_path.join(move_dir))
474        {
475            continue;
476        }
477
478        let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
479            let f = fs::File::open(&file_path).await?;
480            let m = f.metadata().await?;
481            Ok::<_, std::io::Error>((f, m))
482        })
483        .await
484        {
485            Ok(Ok((f, m))) => (f, Some(m)),
486            Ok(Err(e)) => {
487                warn!(
488                    file = %file_path.display(),
489                    error = %e,
490                    "Failed to open file"
491                );
492                continue;
493            }
494            Err(_) => {
495                warn!(
496                    file = %file_path.display(),
497                    timeout_ms = config.read_timeout.as_millis(),
498                    "Timeout opening file"
499                );
500                continue;
501            }
502        };
503
504        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
505        let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
506
507        let last_modified = metadata
508            .as_ref()
509            .and_then(|m| m.modified().ok())
510            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
511            .map(|d| d.as_millis() as u64)
512            .unwrap_or(0);
513
514        let relative_path = file_path
515            .strip_prefix(base_path)
516            .unwrap_or(&file_path)
517            .to_string_lossy()
518            .to_string();
519
520        let absolute_path = file_path
521            .canonicalize()
522            .unwrap_or_else(|_| file_path.clone())
523            .to_string_lossy()
524            .to_string();
525
526        let body = Body::Stream(StreamBody {
527            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
528            metadata: StreamMetadata {
529                size_hint: Some(file_len),
530                content_type: None,
531                origin: Some(absolute_path.clone()),
532            },
533        });
534
535        let mut exchange = Exchange::new(Message::new(body));
536        exchange
537            .input
538            .set_header("CamelFileName", serde_json::Value::String(relative_path));
539        exchange.input.set_header(
540            "CamelFileNameOnly",
541            serde_json::Value::String(file_name.clone()),
542        );
543        exchange.input.set_header(
544            "CamelFileAbsolutePath",
545            serde_json::Value::String(absolute_path),
546        );
547        exchange.input.set_header(
548            "CamelFileLength",
549            serde_json::Value::Number(file_len.into()),
550        );
551        exchange.input.set_header(
552            "CamelFileLastModified",
553            serde_json::Value::Number(last_modified.into()),
554        );
555
556        debug!(
557            file = %file_path.display(),
558            correlation_id = %exchange.correlation_id(),
559            "Processing file"
560        );
561
562        if context.send(exchange).await.is_err() {
563            break;
564        }
565
566        if config.noop {
567            // Do nothing
568        } else if config.delete {
569            if let Err(e) = fs::remove_file(&file_path).await {
570                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
571            }
572        } else if let Some(ref move_dir) = config.move_to {
573            let target_dir = base_path.join(move_dir);
574            if let Err(e) = fs::create_dir_all(&target_dir).await {
575                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
576                continue;
577            }
578            let target_path = target_dir.join(&file_name);
579            if let Err(e) = fs::rename(&file_path, &target_path).await {
580                warn!(
581                    from = %file_path.display(),
582                    to = %target_path.display(),
583                    error = %e,
584                    "Failed to move file"
585                );
586            }
587        }
588    }
589
590    Ok(())
591}
592
593async fn list_files(
594    dir: &std::path::Path,
595    recursive: bool,
596) -> Result<Vec<std::path::PathBuf>, CamelError> {
597    let mut files = Vec::new();
598    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
599
600    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
601        let path = entry.path();
602        if path.is_file() {
603            files.push(path);
604        } else if path.is_dir() && recursive {
605            let mut sub_files = Box::pin(list_files(&path, true)).await?;
606            files.append(&mut sub_files);
607        }
608    }
609
610    files.sort();
611    Ok(files)
612}
613
614// ---------------------------------------------------------------------------
615// Path validation for security
616// ---------------------------------------------------------------------------
617
618fn validate_path_is_within_base(
619    base_dir: &std::path::Path,
620    target_path: &std::path::Path,
621) -> Result<(), CamelError> {
622    let canonical_base = base_dir.canonicalize().map_err(|e| {
623        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
624    })?;
625
626    // For non-existent paths, canonicalize the parent and construct the full path
627    let canonical_target = if target_path.exists() {
628        target_path.canonicalize().map_err(|e| {
629            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
630        })?
631    } else if let Some(parent) = target_path.parent() {
632        // Ensure parent exists (should have been created by auto_create)
633        if !parent.exists() {
634            return Err(CamelError::ProcessorError(format!(
635                "Parent directory '{}' does not exist",
636                parent.display()
637            )));
638        }
639        let canonical_parent = parent.canonicalize().map_err(|e| {
640            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
641        })?;
642        // Reconstruct the full path with the filename
643        if let Some(filename) = target_path.file_name() {
644            canonical_parent.join(filename)
645        } else {
646            return Err(CamelError::ProcessorError(
647                "Invalid target path: no filename".to_string(),
648            ));
649        }
650    } else {
651        return Err(CamelError::ProcessorError(
652            "Invalid target path: no parent directory".to_string(),
653        ));
654    };
655
656    if !canonical_target.starts_with(&canonical_base) {
657        return Err(CamelError::ProcessorError(format!(
658            "Path '{}' is outside base directory '{}'",
659            canonical_target.display(),
660            canonical_base.display()
661        )));
662    }
663
664    Ok(())
665}
666
667// ---------------------------------------------------------------------------
668// FileProducer
669// ---------------------------------------------------------------------------
670
671#[derive(Clone)]
672struct FileProducer {
673    config: FileConfig,
674}
675
676impl FileProducer {
677    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
678        if let Some(name) = exchange
679            .input
680            .header("CamelFileName")
681            .and_then(|v| v.as_str())
682        {
683            return Ok(name.to_string());
684        }
685        if let Some(ref name) = config.file_name {
686            return Ok(name.clone());
687        }
688        Err(CamelError::ProcessorError(
689            "No filename specified: set CamelFileName header or fileName option".to_string(),
690        ))
691    }
692}
693
694impl Service<Exchange> for FileProducer {
695    type Response = Exchange;
696    type Error = CamelError;
697    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
698
699    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
700        Poll::Ready(Ok(()))
701    }
702
703    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
704        let config = self.config.clone();
705
706        Box::pin(async move {
707            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
708            let body = exchange.input.body.clone();
709
710            let dir_path = std::path::Path::new(&config.directory);
711            let target_path = dir_path.join(&file_name);
712
713            // 1. Auto-create directories
714            if config.auto_create
715                && let Some(parent) = target_path.parent()
716            {
717                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
718                    .await
719                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
720                    .map_err(CamelError::from)?;
721            }
722
723            // 2. Security: validate path is within base directory
724            validate_path_is_within_base(dir_path, &target_path)?;
725
726            // 3. Handle file-exist strategy
727            match config.file_exist {
728                FileExistStrategy::Fail if target_path.exists() => {
729                    return Err(CamelError::ProcessorError(format!(
730                        "File already exists: {}",
731                        target_path.display()
732                    )));
733                }
734                FileExistStrategy::Append => {
735                    // Append: write directly without temp file (append is inherently non-atomic)
736                    let mut file = tokio::time::timeout(
737                        config.write_timeout,
738                        OpenOptions::new()
739                            .append(true)
740                            .create(true)
741                            .open(&target_path),
742                    )
743                    .await
744                    .map_err(|_| {
745                        CamelError::ProcessorError("Timeout opening file for append".into())
746                    })?
747                    .map_err(CamelError::from)?;
748
749                    tokio::time::timeout(
750                        config.write_timeout,
751                        io::copy(&mut body.into_async_read(), &mut file),
752                    )
753                    .await
754                    .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
755                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
756
757                    file.flush().await.map_err(CamelError::from)?;
758                }
759                _ => {
760                    // Override (or Fail when file doesn't exist): always atomic via temp file
761                    let temp_name = if let Some(ref prefix) = config.temp_prefix {
762                        format!("{prefix}{file_name}")
763                    } else {
764                        format!(".tmp.{file_name}")
765                    };
766                    let temp_path = dir_path.join(&temp_name);
767
768                    // RAII guard ensures cleanup even on panic
769                    let mut guard = TempFileGuard::new(temp_path.clone());
770
771                    // Write to temp file
772                    let mut file =
773                        tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
774                            .await
775                            .map_err(|_| {
776                                CamelError::ProcessorError("Timeout creating temp file".into())
777                            })?
778                            .map_err(CamelError::from)?;
779
780                    let copy_result = tokio::time::timeout(
781                        config.write_timeout,
782                        io::copy(&mut body.into_async_read(), &mut file),
783                    )
784                    .await;
785
786                    // Flush any kernel buffers (best-effort; actual write errors come from io::copy above)
787                    let _ = file.flush().await;
788
789                    match copy_result {
790                        Ok(Ok(_)) => {}
791                        Ok(Err(e)) => {
792                            // Guard will clean up temp file on drop
793                            return Err(CamelError::ProcessorError(e.to_string()));
794                        }
795                        Err(_) => {
796                            // Guard will clean up temp file on drop
797                            return Err(CamelError::ProcessorError("Timeout writing file".into()));
798                        }
799                    }
800
801                    // Atomic rename: temp → target
802                    let rename_result = tokio::time::timeout(
803                        config.write_timeout,
804                        fs::rename(&temp_path, &target_path),
805                    )
806                    .await;
807
808                    match rename_result {
809                        Ok(Ok(_)) => {
810                            // Success — disarm guard so it doesn't delete the renamed file
811                            guard.disarm();
812                        }
813                        Ok(Err(e)) => {
814                            // Guard will clean up temp file on drop
815                            return Err(CamelError::from(e));
816                        }
817                        Err(_) => {
818                            // Guard will clean up temp file on drop
819                            return Err(CamelError::ProcessorError("Timeout renaming file".into()));
820                        }
821                    }
822                }
823            }
824
825            // 4. Set output header
826            let abs_path = target_path
827                .canonicalize()
828                .unwrap_or_else(|_| target_path.clone())
829                .to_string_lossy()
830                .to_string();
831            exchange
832                .input
833                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
834
835            debug!(
836                file = %target_path.display(),
837                correlation_id = %exchange.correlation_id(),
838                "File written"
839            );
840
841            Ok(exchange)
842        })
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849    use bytes::Bytes;
850    use std::time::Duration;
851    use tokio_util::sync::CancellationToken;
852
853    fn test_producer_ctx() -> ProducerContext {
854        ProducerContext::new()
855    }
856
857    #[test]
858    fn test_file_config_defaults() {
859        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
860        assert_eq!(config.directory, "/tmp/inbox");
861        assert_eq!(config.delay, Duration::from_millis(500));
862        assert_eq!(config.initial_delay, Duration::from_millis(1000));
863        assert!(!config.noop);
864        assert!(!config.delete);
865        assert_eq!(config.move_to, Some(".camel".to_string()));
866        assert!(config.file_name.is_none());
867        assert!(config.include.is_none());
868        assert!(config.exclude.is_none());
869        assert!(!config.recursive);
870        assert_eq!(config.file_exist, FileExistStrategy::Override);
871        assert!(config.temp_prefix.is_none());
872        assert!(config.auto_create);
873        // New timeout defaults
874        assert_eq!(config.read_timeout, Duration::from_secs(30));
875        assert_eq!(config.write_timeout, Duration::from_secs(30));
876    }
877
878    #[test]
879    fn test_file_config_consumer_options() {
880        let config = FileConfig::from_uri(
881            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
882        ).unwrap();
883        assert_eq!(config.directory, "/data/input");
884        assert_eq!(config.delay, Duration::from_millis(1000));
885        assert_eq!(config.initial_delay, Duration::from_millis(2000));
886        assert!(config.noop);
887        assert!(config.recursive);
888        assert_eq!(config.include, Some(".*\\.csv".to_string()));
889    }
890
891    #[test]
892    fn test_file_config_producer_options() {
893        let config = FileConfig::from_uri(
894            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
895        )
896        .unwrap();
897        assert_eq!(config.file_exist, FileExistStrategy::Append);
898        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
899        assert!(!config.auto_create);
900        assert_eq!(config.file_name, Some("out.txt".to_string()));
901    }
902
903    #[test]
904    fn test_file_config_delete_mode() {
905        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
906        assert!(config.delete);
907        assert!(config.move_to.is_none());
908    }
909
910    #[test]
911    fn test_file_config_noop_mode() {
912        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
913        assert!(config.noop);
914        assert!(config.move_to.is_none());
915    }
916
917    #[test]
918    fn test_file_config_wrong_scheme() {
919        let result = FileConfig::from_uri("timer:tick");
920        assert!(result.is_err());
921    }
922
923    #[test]
924    fn test_file_component_scheme() {
925        let component = FileComponent::new();
926        assert_eq!(component.scheme(), "file");
927    }
928
929    #[test]
930    fn test_file_component_creates_endpoint() {
931        let component = FileComponent::new();
932        let endpoint = component.create_endpoint("file:/tmp/test");
933        assert!(endpoint.is_ok());
934    }
935
936    // -----------------------------------------------------------------------
937    // Consumer tests
938    // -----------------------------------------------------------------------
939
940    #[tokio::test]
941    async fn test_file_consumer_reads_files() {
942        let dir = tempfile::tempdir().unwrap();
943        let dir_path = dir.path().to_str().unwrap();
944
945        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
946        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
947
948        let component = FileComponent::new();
949        let endpoint = component
950            .create_endpoint(&format!(
951                "file:{dir_path}?noop=true&initialDelay=0&delay=100"
952            ))
953            .unwrap();
954        let mut consumer = endpoint.create_consumer().unwrap();
955
956        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
957        let token = CancellationToken::new();
958        let ctx = ConsumerContext::new(tx, token.clone());
959
960        tokio::spawn(async move {
961            consumer.start(ctx).await.unwrap();
962        });
963
964        let mut received = Vec::new();
965        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
966            while let Some(envelope) = rx.recv().await {
967                received.push(envelope.exchange);
968                if received.len() == 2 {
969                    break;
970                }
971            }
972        })
973        .await;
974        token.cancel();
975
976        assert!(timeout.is_ok(), "Should have received 2 exchanges");
977        assert_eq!(received.len(), 2);
978
979        for ex in &received {
980            assert!(ex.input.header("CamelFileName").is_some());
981            assert!(ex.input.header("CamelFileNameOnly").is_some());
982            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
983            assert!(ex.input.header("CamelFileLength").is_some());
984            assert!(ex.input.header("CamelFileLastModified").is_some());
985        }
986    }
987
988    #[tokio::test]
989    async fn test_file_consumer_include_filter() {
990        let dir = tempfile::tempdir().unwrap();
991        let dir_path = dir.path().to_str().unwrap();
992
993        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
994        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
995
996        let component = FileComponent::new();
997        let endpoint = component
998            .create_endpoint(&format!(
999                "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
1000            ))
1001            .unwrap();
1002        let mut consumer = endpoint.create_consumer().unwrap();
1003
1004        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1005        let token = CancellationToken::new();
1006        let ctx = ConsumerContext::new(tx, token.clone());
1007
1008        tokio::spawn(async move {
1009            consumer.start(ctx).await.unwrap();
1010        });
1011
1012        let mut received = Vec::new();
1013        let _ = tokio::time::timeout(Duration::from_millis(500), async {
1014            while let Some(envelope) = rx.recv().await {
1015                received.push(envelope.exchange);
1016                if received.len() == 1 {
1017                    break;
1018                }
1019            }
1020        })
1021        .await;
1022        token.cancel();
1023
1024        assert_eq!(received.len(), 1);
1025        let name = received[0]
1026            .input
1027            .header("CamelFileNameOnly")
1028            .and_then(|v| v.as_str())
1029            .unwrap();
1030        assert_eq!(name, "data.csv");
1031    }
1032
1033    #[tokio::test]
1034    async fn test_file_consumer_delete_mode() {
1035        let dir = tempfile::tempdir().unwrap();
1036        let dir_path = dir.path().to_str().unwrap();
1037
1038        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1039
1040        let component = FileComponent::new();
1041        let endpoint = component
1042            .create_endpoint(&format!(
1043                "file:{dir_path}?delete=true&initialDelay=0&delay=100"
1044            ))
1045            .unwrap();
1046        let mut consumer = endpoint.create_consumer().unwrap();
1047
1048        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1049        let token = CancellationToken::new();
1050        let ctx = ConsumerContext::new(tx, token.clone());
1051
1052        tokio::spawn(async move {
1053            consumer.start(ctx).await.unwrap();
1054        });
1055
1056        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1057        token.cancel();
1058
1059        tokio::time::sleep(Duration::from_millis(100)).await;
1060
1061        assert!(
1062            !dir.path().join("deleteme.txt").exists(),
1063            "File should be deleted"
1064        );
1065    }
1066
1067    #[tokio::test]
1068    async fn test_file_consumer_move_mode() {
1069        let dir = tempfile::tempdir().unwrap();
1070        let dir_path = dir.path().to_str().unwrap();
1071
1072        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1073
1074        let component = FileComponent::new();
1075        let endpoint = component
1076            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
1077            .unwrap();
1078        let mut consumer = endpoint.create_consumer().unwrap();
1079
1080        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1081        let token = CancellationToken::new();
1082        let ctx = ConsumerContext::new(tx, token.clone());
1083
1084        tokio::spawn(async move {
1085            consumer.start(ctx).await.unwrap();
1086        });
1087
1088        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1089        token.cancel();
1090
1091        tokio::time::sleep(Duration::from_millis(100)).await;
1092
1093        assert!(
1094            !dir.path().join("moveme.txt").exists(),
1095            "Original file should be gone"
1096        );
1097        assert!(
1098            dir.path().join(".camel").join("moveme.txt").exists(),
1099            "File should be in .camel/"
1100        );
1101    }
1102
1103    #[tokio::test]
1104    async fn test_file_consumer_respects_cancellation() {
1105        let dir = tempfile::tempdir().unwrap();
1106        let dir_path = dir.path().to_str().unwrap();
1107
1108        let component = FileComponent::new();
1109        let endpoint = component
1110            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1111            .unwrap();
1112        let mut consumer = endpoint.create_consumer().unwrap();
1113
1114        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1115        let token = CancellationToken::new();
1116        let ctx = ConsumerContext::new(tx, token.clone());
1117
1118        let handle = tokio::spawn(async move {
1119            consumer.start(ctx).await.unwrap();
1120        });
1121
1122        tokio::time::sleep(Duration::from_millis(150)).await;
1123        token.cancel();
1124
1125        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1126        assert!(
1127            result.is_ok(),
1128            "Consumer should have stopped after cancellation"
1129        );
1130    }
1131
1132    // -----------------------------------------------------------------------
1133    // Producer tests
1134    // -----------------------------------------------------------------------
1135
1136    #[tokio::test]
1137    async fn test_file_producer_writes_file() {
1138        use tower::ServiceExt;
1139
1140        let dir = tempfile::tempdir().unwrap();
1141        let dir_path = dir.path().to_str().unwrap();
1142
1143        let component = FileComponent::new();
1144        let endpoint = component
1145            .create_endpoint(&format!("file:{dir_path}"))
1146            .unwrap();
1147        let ctx = test_producer_ctx();
1148        let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150        let mut exchange = Exchange::new(Message::new("file content"));
1151        exchange.input.set_header(
1152            "CamelFileName",
1153            serde_json::Value::String("output.txt".to_string()),
1154        );
1155
1156        let result = producer.oneshot(exchange).await.unwrap();
1157
1158        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1159        assert_eq!(content, "file content");
1160
1161        assert!(result.input.header("CamelFileNameProduced").is_some());
1162    }
1163
1164    #[tokio::test]
1165    async fn test_file_producer_auto_create_dirs() {
1166        use tower::ServiceExt;
1167
1168        let dir = tempfile::tempdir().unwrap();
1169        let dir_path = dir.path().to_str().unwrap();
1170
1171        let component = FileComponent::new();
1172        let endpoint = component
1173            .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1174            .unwrap();
1175        let ctx = test_producer_ctx();
1176        let producer = endpoint.create_producer(&ctx).unwrap();
1177
1178        let mut exchange = Exchange::new(Message::new("nested"));
1179        exchange.input.set_header(
1180            "CamelFileName",
1181            serde_json::Value::String("file.txt".to_string()),
1182        );
1183
1184        producer.oneshot(exchange).await.unwrap();
1185
1186        assert!(dir.path().join("sub/dir/file.txt").exists());
1187    }
1188
1189    #[tokio::test]
1190    async fn test_file_producer_file_exist_fail() {
1191        use tower::ServiceExt;
1192
1193        let dir = tempfile::tempdir().unwrap();
1194        let dir_path = dir.path().to_str().unwrap();
1195
1196        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1197
1198        let component = FileComponent::new();
1199        let endpoint = component
1200            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1201            .unwrap();
1202        let ctx = test_producer_ctx();
1203        let producer = endpoint.create_producer(&ctx).unwrap();
1204
1205        let mut exchange = Exchange::new(Message::new("new"));
1206        exchange.input.set_header(
1207            "CamelFileName",
1208            serde_json::Value::String("existing.txt".to_string()),
1209        );
1210
1211        let result = producer.oneshot(exchange).await;
1212        assert!(
1213            result.is_err(),
1214            "Should fail when file exists with Fail strategy"
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn test_file_producer_file_exist_append() {
1220        use tower::ServiceExt;
1221
1222        let dir = tempfile::tempdir().unwrap();
1223        let dir_path = dir.path().to_str().unwrap();
1224
1225        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1226
1227        let component = FileComponent::new();
1228        let endpoint = component
1229            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1230            .unwrap();
1231        let ctx = test_producer_ctx();
1232        let producer = endpoint.create_producer(&ctx).unwrap();
1233
1234        let mut exchange = Exchange::new(Message::new("new"));
1235        exchange.input.set_header(
1236            "CamelFileName",
1237            serde_json::Value::String("append.txt".to_string()),
1238        );
1239
1240        producer.oneshot(exchange).await.unwrap();
1241
1242        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1243        assert_eq!(content, "oldnew");
1244    }
1245
1246    #[tokio::test]
1247    async fn test_file_producer_temp_prefix() {
1248        use tower::ServiceExt;
1249
1250        let dir = tempfile::tempdir().unwrap();
1251        let dir_path = dir.path().to_str().unwrap();
1252
1253        let component = FileComponent::new();
1254        let endpoint = component
1255            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1256            .unwrap();
1257        let ctx = test_producer_ctx();
1258        let producer = endpoint.create_producer(&ctx).unwrap();
1259
1260        let mut exchange = Exchange::new(Message::new("atomic write"));
1261        exchange.input.set_header(
1262            "CamelFileName",
1263            serde_json::Value::String("final.txt".to_string()),
1264        );
1265
1266        producer.oneshot(exchange).await.unwrap();
1267
1268        assert!(dir.path().join("final.txt").exists());
1269        assert!(!dir.path().join(".tmpfinal.txt").exists());
1270        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1271        assert_eq!(content, "atomic write");
1272    }
1273
1274    #[tokio::test]
1275    async fn test_file_producer_uses_filename_option() {
1276        use tower::ServiceExt;
1277
1278        let dir = tempfile::tempdir().unwrap();
1279        let dir_path = dir.path().to_str().unwrap();
1280
1281        let component = FileComponent::new();
1282        let endpoint = component
1283            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1284            .unwrap();
1285        let ctx = test_producer_ctx();
1286        let producer = endpoint.create_producer(&ctx).unwrap();
1287
1288        let exchange = Exchange::new(Message::new("content"));
1289
1290        producer.oneshot(exchange).await.unwrap();
1291        assert!(dir.path().join("fixed.txt").exists());
1292    }
1293
1294    #[tokio::test]
1295    async fn test_file_producer_no_filename_errors() {
1296        use tower::ServiceExt;
1297
1298        let dir = tempfile::tempdir().unwrap();
1299        let dir_path = dir.path().to_str().unwrap();
1300
1301        let component = FileComponent::new();
1302        let endpoint = component
1303            .create_endpoint(&format!("file:{dir_path}"))
1304            .unwrap();
1305        let ctx = test_producer_ctx();
1306        let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308        let exchange = Exchange::new(Message::new("content"));
1309
1310        let result = producer.oneshot(exchange).await;
1311        assert!(result.is_err(), "Should error when no filename is provided");
1312    }
1313
1314    // -----------------------------------------------------------------------
1315    // Security tests - Path traversal protection
1316    // -----------------------------------------------------------------------
1317
1318    #[tokio::test]
1319    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1320        use tower::ServiceExt;
1321
1322        let dir = tempfile::tempdir().unwrap();
1323        let dir_path = dir.path().to_str().unwrap();
1324
1325        // Create a subdirectory
1326        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1327        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1328
1329        let component = FileComponent::new();
1330        let endpoint = component
1331            .create_endpoint(&format!("file:{dir_path}/subdir"))
1332            .unwrap();
1333        let ctx = test_producer_ctx();
1334        let producer = endpoint.create_producer(&ctx).unwrap();
1335
1336        let mut exchange = Exchange::new(Message::new("malicious"));
1337        exchange.input.set_header(
1338            "CamelFileName",
1339            serde_json::Value::String("../secret.txt".to_string()),
1340        );
1341
1342        let result = producer.oneshot(exchange).await;
1343        assert!(result.is_err(), "Should reject path traversal attempt");
1344
1345        let err = result.unwrap_err();
1346        assert!(
1347            err.to_string().contains("outside"),
1348            "Error should mention path is outside base directory"
1349        );
1350    }
1351
1352    #[tokio::test]
1353    async fn test_file_producer_rejects_absolute_path_outside_base() {
1354        use tower::ServiceExt;
1355
1356        let dir = tempfile::tempdir().unwrap();
1357        let dir_path = dir.path().to_str().unwrap();
1358
1359        let component = FileComponent::new();
1360        let endpoint = component
1361            .create_endpoint(&format!("file:{dir_path}"))
1362            .unwrap();
1363        let ctx = test_producer_ctx();
1364        let producer = endpoint.create_producer(&ctx).unwrap();
1365
1366        let mut exchange = Exchange::new(Message::new("malicious"));
1367        exchange.input.set_header(
1368            "CamelFileName",
1369            serde_json::Value::String("/etc/passwd".to_string()),
1370        );
1371
1372        let result = producer.oneshot(exchange).await;
1373        assert!(result.is_err(), "Should reject absolute path outside base");
1374    }
1375
1376    // -----------------------------------------------------------------------
1377    // Large file streaming tests
1378    // -----------------------------------------------------------------------
1379
1380    #[tokio::test]
1381    #[ignore] // Slow test - run with --ignored flag
1382    async fn test_large_file_streaming_constant_memory() {
1383        use std::io::Write;
1384        use tempfile::NamedTempFile;
1385
1386        // Create a 150MB file (larger than 100MB limit)
1387        let mut temp_file = NamedTempFile::new().unwrap();
1388        let file_size = 150 * 1024 * 1024; // 150MB
1389        let chunk = vec![b'X'; 1024 * 1024]; // 1MB chunk
1390
1391        for _ in 0..150 {
1392            temp_file.write_all(&chunk).unwrap();
1393        }
1394        temp_file.flush().unwrap();
1395
1396        let dir = temp_file.path().parent().unwrap();
1397        let dir_path = dir.to_str().unwrap();
1398        let file_name = temp_file
1399            .path()
1400            .file_name()
1401            .unwrap()
1402            .to_str()
1403            .unwrap()
1404            .to_string();
1405
1406        // Read file as stream (should succeed with lazy evaluation)
1407        let component = FileComponent::new();
1408        let endpoint = component
1409            .create_endpoint(&format!(
1410                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1411            ))
1412            .unwrap();
1413        let mut consumer = endpoint.create_consumer().unwrap();
1414
1415        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1416        let token = CancellationToken::new();
1417        let ctx = ConsumerContext::new(tx, token.clone());
1418
1419        tokio::spawn(async move {
1420            let _ = consumer.start(ctx).await;
1421        });
1422
1423        let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1424            rx.recv().await.unwrap().exchange
1425        })
1426        .await
1427        .expect("Should receive exchange");
1428        token.cancel();
1429
1430        // Verify body is a stream (not materialized)
1431        assert!(matches!(exchange.input.body, Body::Stream(_)));
1432
1433        // Verify we can read metadata without consuming
1434        if let Body::Stream(ref stream_body) = exchange.input.body {
1435            assert!(stream_body.metadata.size_hint.is_some());
1436            let size = stream_body.metadata.size_hint.unwrap();
1437            assert_eq!(size, file_size as u64);
1438        }
1439
1440        // Materializing should fail (exceeds 100MB limit)
1441        if let Body::Stream(stream_body) = exchange.input.body {
1442            let body = Body::Stream(stream_body);
1443            let result = body.into_bytes(100 * 1024 * 1024).await;
1444            assert!(result.is_err());
1445        }
1446
1447        // But we CAN read chunks one at a time (simulating line-by-line processing)
1448        // This demonstrates lazy evaluation - we don't need to load entire file
1449        let component2 = FileComponent::new();
1450        let endpoint2 = component2
1451            .create_endpoint(&format!(
1452                "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1453            ))
1454            .unwrap();
1455        let mut consumer2 = endpoint2.create_consumer().unwrap();
1456
1457        let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1458        let token2 = CancellationToken::new();
1459        let ctx2 = ConsumerContext::new(tx2, token2.clone());
1460
1461        tokio::spawn(async move {
1462            let _ = consumer2.start(ctx2).await;
1463        });
1464
1465        let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1466            rx2.recv().await.unwrap().exchange
1467        })
1468        .await
1469        .expect("Should receive exchange");
1470        token2.cancel();
1471
1472        if let Body::Stream(stream_body) = exchange2.input.body {
1473            let mut stream_lock = stream_body.stream.lock().await;
1474            let mut stream = stream_lock.take().unwrap();
1475
1476            // Read first chunk (size varies based on ReaderStream's buffer)
1477            if let Some(chunk_result) = stream.next().await {
1478                let chunk = chunk_result.unwrap();
1479                assert!(!chunk.is_empty());
1480                assert!(chunk.len() < file_size);
1481                // Memory usage is constant - we only have this chunk in memory, not 150MB
1482            }
1483        }
1484    }
1485
1486    // -----------------------------------------------------------------------
1487    // Streaming producer tests
1488    // -----------------------------------------------------------------------
1489
1490    #[tokio::test]
1491    async fn test_producer_writes_stream_body() {
1492        let dir = tempfile::tempdir().unwrap();
1493        let dir_path = dir.path().to_str().unwrap();
1494        let uri = format!("file:{dir_path}?fileName=out.txt");
1495
1496        let component = FileComponent::new();
1497        let endpoint = component.create_endpoint(&uri).unwrap();
1498        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1499
1500        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1501            Ok(Bytes::from("hello ")),
1502            Ok(Bytes::from("streaming ")),
1503            Ok(Bytes::from("world")),
1504        ];
1505        let stream = futures::stream::iter(chunks);
1506        let body = Body::Stream(camel_api::body::StreamBody {
1507            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1508            metadata: camel_api::body::StreamMetadata {
1509                size_hint: None,
1510                content_type: None,
1511                origin: None,
1512            },
1513        });
1514
1515        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1516        tower::ServiceExt::oneshot(producer, exchange)
1517            .await
1518            .unwrap();
1519
1520        let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1521            .await
1522            .unwrap();
1523        assert_eq!(content, "hello streaming world");
1524    }
1525
1526    #[tokio::test]
1527    async fn test_producer_stream_atomic_no_partial_on_error() {
1528        // If the stream errors mid-write, no file should exist at the target path
1529        let dir = tempfile::tempdir().unwrap();
1530        let dir_path = dir.path().to_str().unwrap();
1531        let uri = format!("file:{dir_path}?fileName=out.txt");
1532
1533        let component = FileComponent::new();
1534        let endpoint = component.create_endpoint(&uri).unwrap();
1535        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1536
1537        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1538            Ok(Bytes::from("partial")),
1539            Err(CamelError::ProcessorError(
1540                "simulated stream error".to_string(),
1541            )),
1542        ];
1543        let stream = futures::stream::iter(chunks);
1544        let body = Body::Stream(camel_api::body::StreamBody {
1545            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1546            metadata: camel_api::body::StreamMetadata {
1547                size_hint: None,
1548                content_type: None,
1549                origin: None,
1550            },
1551        });
1552
1553        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1554        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1555        assert!(
1556            result.is_err(),
1557            "expected error when stream fails mid-write"
1558        );
1559
1560        // Target file must NOT exist — write was aborted and temp file cleaned up
1561        assert!(
1562            !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1563            "partial file must not exist after failed write"
1564        );
1565
1566        // Temp file must also be cleaned up
1567        assert!(
1568            !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1569            "temp file must be cleaned up after failed write"
1570        );
1571    }
1572
1573    #[tokio::test]
1574    async fn test_producer_stream_append() {
1575        let dir = tempfile::tempdir().unwrap();
1576        let dir_path = dir.path().to_str().unwrap();
1577        let target = format!("{dir_path}/out.txt");
1578
1579        // Pre-create file with initial content
1580        tokio::fs::write(&target, b"line1\n").await.unwrap();
1581
1582        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1583        let component = FileComponent::new();
1584        let endpoint = component.create_endpoint(&uri).unwrap();
1585        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1586
1587        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1588        let stream = futures::stream::iter(chunks);
1589        let body = Body::Stream(camel_api::body::StreamBody {
1590            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1591            metadata: camel_api::body::StreamMetadata {
1592                size_hint: None,
1593                content_type: None,
1594                origin: None,
1595            },
1596        });
1597
1598        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1599        tower::ServiceExt::oneshot(producer, exchange)
1600            .await
1601            .unwrap();
1602
1603        let content = tokio::fs::read_to_string(&target).await.unwrap();
1604        assert_eq!(content, "line1\nline2\n");
1605    }
1606
1607    #[tokio::test]
1608    async fn test_producer_stream_append_partial_on_error() {
1609        // Append is inherently non-atomic: if the stream errors mid-write,
1610        // the file will contain partial data. This test documents that behavior.
1611        let dir = tempfile::tempdir().unwrap();
1612        let dir_path = dir.path().to_str().unwrap();
1613        let target = format!("{dir_path}/out.txt");
1614
1615        // Pre-create file with initial content
1616        tokio::fs::write(&target, b"initial\n").await.unwrap();
1617
1618        let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1619        let component = FileComponent::new();
1620        let endpoint = component.create_endpoint(&uri).unwrap();
1621        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1622
1623        // Stream with an error in the middle
1624        let chunks: Vec<Result<Bytes, CamelError>> = vec![
1625            Ok(Bytes::from("partial-")), // This will be written
1626            Err(CamelError::ProcessorError("stream error".to_string())), // This causes failure
1627            Ok(Bytes::from("never-written")), // This won't be reached
1628        ];
1629        let stream = futures::stream::iter(chunks);
1630        let body = Body::Stream(camel_api::body::StreamBody {
1631            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1632            metadata: camel_api::body::StreamMetadata {
1633                size_hint: None,
1634                content_type: None,
1635                origin: None,
1636            },
1637        });
1638
1639        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1640        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1641
1642        // 1. Producer must return an error
1643        assert!(
1644            result.is_err(),
1645            "expected error when stream fails during append"
1646        );
1647
1648        // 2. File must contain initial content + partial data written before the error
1649        let content = tokio::fs::read_to_string(&target).await.unwrap();
1650        assert_eq!(
1651            content, "initial\npartial-",
1652            "append leaves partial data on stream error (non-atomic by nature)"
1653        );
1654    }
1655
1656    #[tokio::test]
1657    async fn test_producer_stream_already_consumed_errors() {
1658        let dir = tempfile::tempdir().unwrap();
1659        let dir_path = dir.path().to_str().unwrap();
1660        let uri = format!("file:{dir_path}?fileName=out.txt");
1661
1662        let component = FileComponent::new();
1663        let endpoint = component.create_endpoint(&uri).unwrap();
1664        let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1665
1666        // Mutex holds None → stream already consumed
1667        let arc: std::sync::Arc<
1668            tokio::sync::Mutex<
1669                Option<
1670                    std::pin::Pin<
1671                        Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1672                    >,
1673                >,
1674            >,
1675        > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1676        let body = Body::Stream(camel_api::body::StreamBody {
1677            stream: arc,
1678            metadata: camel_api::body::StreamMetadata {
1679                size_hint: None,
1680                content_type: None,
1681                origin: None,
1682            },
1683        });
1684
1685        let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1686        let result = tower::ServiceExt::oneshot(producer, exchange).await;
1687        assert!(
1688            result.is_err(),
1689            "expected error for already-consumed stream"
1690        );
1691    }
1692
1693    // -----------------------------------------------------------------------
1694    // GlobalConfig tests - apply_global_defaults behavior
1695    // -----------------------------------------------------------------------
1696
1697    #[test]
1698    fn test_global_config_applied_to_endpoint() {
1699        // Global config with non-default values
1700        let global = FileGlobalConfig::default()
1701            .with_delay_ms(2000)
1702            .with_initial_delay_ms(5000)
1703            .with_read_timeout_ms(60_000)
1704            .with_write_timeout_ms(45_000);
1705        let component = FileComponent::with_config(global);
1706        // URI uses no explicit delay/timeout params → macro defaults apply
1707        let endpoint = component.create_endpoint("file:/tmp/inbox").unwrap();
1708        // We cannot call endpoint.config directly (FileEndpoint is private),
1709        // but we can test apply_global_defaults on FileConfig directly:
1710        let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1711        let global2 = FileGlobalConfig::default()
1712            .with_delay_ms(2000)
1713            .with_initial_delay_ms(5000)
1714            .with_read_timeout_ms(60_000)
1715            .with_write_timeout_ms(45_000);
1716        config.apply_global_defaults(&global2);
1717        assert_eq!(config.delay, Duration::from_millis(2000));
1718        assert_eq!(config.initial_delay, Duration::from_millis(5000));
1719        assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1720        assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1721        // endpoint creation succeeds too
1722        let _ = endpoint; // just verify create_endpoint didn't fail
1723    }
1724
1725    #[test]
1726    fn test_uri_param_wins_over_global_config() {
1727        // URI explicitly sets delay=1000 (NOT the 500ms macro default)
1728        let mut config =
1729            FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1730        // Global config would want 3000ms delay
1731        let global = FileGlobalConfig::default()
1732            .with_delay_ms(3000)
1733            .with_initial_delay_ms(4000);
1734        config.apply_global_defaults(&global);
1735        // URI value of 1000ms must be preserved (not replaced by 3000ms)
1736        assert_eq!(config.delay, Duration::from_millis(1000));
1737        // URI value of 2000ms must be preserved (not replaced by 4000ms)
1738        assert_eq!(config.initial_delay, Duration::from_millis(2000));
1739        // read_timeout was not set by URI → macro default (30000) → global wins if different
1740        // (read_timeout stays at 30000 since global has same default = 30000)
1741        assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1742    }
1743}