Skip to main content

camel_component_file/
lib.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use regex::Regex;
8use tokio::fs;
9use tokio::time;
10use tower::Service;
11use tracing::{debug, warn};
12
13use camel_api::{BoxProcessor, CamelError, Exchange, Message, body::Body};
14use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
15use camel_endpoint::parse_uri;
16
17// ---------------------------------------------------------------------------
18// FileExistStrategy
19// ---------------------------------------------------------------------------
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum FileExistStrategy {
23    Override,
24    Append,
25    Fail,
26}
27
28impl FileExistStrategy {
29    fn from_str(s: &str) -> Self {
30        match s {
31            "Append" | "append" => FileExistStrategy::Append,
32            "Fail" | "fail" => FileExistStrategy::Fail,
33            _ => FileExistStrategy::Override,
34        }
35    }
36}
37
38// ---------------------------------------------------------------------------
39// FileConfig
40// ---------------------------------------------------------------------------
41
42#[derive(Debug, Clone)]
43pub struct FileConfig {
44    pub directory: String,
45    pub delay: Duration,
46    pub initial_delay: Duration,
47    pub noop: bool,
48    pub delete: bool,
49    pub move_to: Option<String>,
50    pub file_name: Option<String>,
51    pub include: Option<String>,
52    pub exclude: Option<String>,
53    pub recursive: bool,
54    pub file_exist: FileExistStrategy,
55    pub temp_prefix: Option<String>,
56    pub auto_create: bool,
57    // Timeout fields for preventing hanging on slow filesystems
58    pub read_timeout: Duration,
59    pub write_timeout: Duration,
60}
61
62impl FileConfig {
63    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
64        let parts = parse_uri(uri)?;
65        if parts.scheme != "file" {
66            return Err(CamelError::InvalidUri(format!(
67                "expected scheme 'file', got '{}'",
68                parts.scheme
69            )));
70        }
71
72        let delay = parts
73            .params
74            .get("delay")
75            .and_then(|v| v.parse::<u64>().ok())
76            .unwrap_or(500);
77
78        let initial_delay = parts
79            .params
80            .get("initialDelay")
81            .and_then(|v| v.parse::<u64>().ok())
82            .unwrap_or(1000);
83
84        let noop = parts
85            .params
86            .get("noop")
87            .map(|v| v == "true")
88            .unwrap_or(false);
89
90        let delete = parts
91            .params
92            .get("delete")
93            .map(|v| v == "true")
94            .unwrap_or(false);
95
96        let move_to = if noop || delete {
97            None
98        } else {
99            Some(
100                parts
101                    .params
102                    .get("move")
103                    .cloned()
104                    .unwrap_or_else(|| ".camel".to_string()),
105            )
106        };
107
108        let file_name = parts.params.get("fileName").cloned();
109        let include = parts.params.get("include").cloned();
110        let exclude = parts.params.get("exclude").cloned();
111
112        let recursive = parts
113            .params
114            .get("recursive")
115            .map(|v| v == "true")
116            .unwrap_or(false);
117
118        let file_exist = parts
119            .params
120            .get("fileExist")
121            .map(|v| FileExistStrategy::from_str(v))
122            .unwrap_or(FileExistStrategy::Override);
123
124        let temp_prefix = parts.params.get("tempPrefix").cloned();
125
126        let auto_create = parts
127            .params
128            .get("autoCreate")
129            .map(|v| v != "false")
130            .unwrap_or(true);
131
132        let read_timeout = parts
133            .params
134            .get("readTimeout")
135            .and_then(|v| v.parse::<u64>().ok())
136            .map(Duration::from_millis)
137            .unwrap_or(Duration::from_secs(30));
138
139        let write_timeout = parts
140            .params
141            .get("writeTimeout")
142            .and_then(|v| v.parse::<u64>().ok())
143            .map(Duration::from_millis)
144            .unwrap_or(Duration::from_secs(30));
145
146        Ok(Self {
147            directory: parts.path,
148            delay: Duration::from_millis(delay),
149            initial_delay: Duration::from_millis(initial_delay),
150            noop,
151            delete,
152            move_to,
153            file_name,
154            include,
155            exclude,
156            recursive,
157            file_exist,
158            temp_prefix,
159            auto_create,
160            read_timeout,
161            write_timeout,
162        })
163    }
164}
165
166// ---------------------------------------------------------------------------
167// FileComponent
168// ---------------------------------------------------------------------------
169
170pub struct FileComponent;
171
172impl FileComponent {
173    pub fn new() -> Self {
174        Self
175    }
176}
177
178impl Default for FileComponent {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184impl Component for FileComponent {
185    fn scheme(&self) -> &str {
186        "file"
187    }
188
189    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
190        let config = FileConfig::from_uri(uri)?;
191        Ok(Box::new(FileEndpoint {
192            uri: uri.to_string(),
193            config,
194        }))
195    }
196}
197
198// ---------------------------------------------------------------------------
199// FileEndpoint
200// ---------------------------------------------------------------------------
201
202struct FileEndpoint {
203    uri: String,
204    config: FileConfig,
205}
206
207impl Endpoint for FileEndpoint {
208    fn uri(&self) -> &str {
209        &self.uri
210    }
211
212    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
213        Ok(Box::new(FileConsumer {
214            config: self.config.clone(),
215        }))
216    }
217
218    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
219        Ok(BoxProcessor::new(FileProducer {
220            config: self.config.clone(),
221        }))
222    }
223}
224
225// ---------------------------------------------------------------------------
226// FileConsumer
227// ---------------------------------------------------------------------------
228
229struct FileConsumer {
230    config: FileConfig,
231}
232
233#[async_trait]
234impl Consumer for FileConsumer {
235    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
236        let config = self.config.clone();
237
238        let include_re = config
239            .include
240            .as_ref()
241            .map(|p| Regex::new(p))
242            .transpose()
243            .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
244        let exclude_re = config
245            .exclude
246            .as_ref()
247            .map(|p| Regex::new(p))
248            .transpose()
249            .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
250
251        if !config.initial_delay.is_zero() {
252            tokio::select! {
253                _ = time::sleep(config.initial_delay) => {}
254                _ = context.cancelled() => {
255                    debug!(directory = config.directory, "File consumer cancelled during initial delay");
256                    return Ok(());
257                }
258            }
259        }
260
261        let mut interval = time::interval(config.delay);
262
263        loop {
264            tokio::select! {
265                _ = context.cancelled() => {
266                    debug!(directory = config.directory, "File consumer received cancellation, stopping");
267                    break;
268                }
269                _ = interval.tick() => {
270                    if let Err(e) = poll_directory(
271                        &config,
272                        &context,
273                        &include_re,
274                        &exclude_re,
275                    ).await {
276                        warn!(directory = config.directory, error = %e, "Error polling directory");
277                    }
278                }
279            }
280        }
281
282        Ok(())
283    }
284
285    async fn stop(&mut self) -> Result<(), CamelError> {
286        Ok(())
287    }
288}
289
290async fn poll_directory(
291    config: &FileConfig,
292    context: &ConsumerContext,
293    include_re: &Option<Regex>,
294    exclude_re: &Option<Regex>,
295) -> Result<(), CamelError> {
296    let base_path = std::path::Path::new(&config.directory);
297
298    let files = list_files(base_path, config.recursive).await?;
299
300    for file_path in files {
301        let file_name = file_path
302            .file_name()
303            .and_then(|n| n.to_str())
304            .unwrap_or_default()
305            .to_string();
306
307        if let Some(ref target_name) = config.file_name
308            && file_name != *target_name
309        {
310            continue;
311        }
312
313        if let Some(re) = include_re
314            && !re.is_match(&file_name)
315        {
316            continue;
317        }
318
319        if let Some(re) = exclude_re
320            && re.is_match(&file_name)
321        {
322            continue;
323        }
324
325        if let Some(ref move_dir) = config.move_to
326            && file_path.starts_with(base_path.join(move_dir))
327        {
328            continue;
329        }
330
331        let content = match tokio::time::timeout(config.read_timeout, fs::read(&file_path)).await {
332            Ok(Ok(c)) => c,
333            Ok(Err(e)) => {
334                warn!(
335                    file = %file_path.display(),
336                    error = %e,
337                    "Failed to read file"
338                );
339                continue;
340            }
341            Err(_) => {
342                warn!(
343                    file = %file_path.display(),
344                    timeout_ms = config.read_timeout.as_millis(),
345                    "Timeout reading file"
346                );
347                continue;
348            }
349        };
350
351        let metadata = fs::metadata(&file_path).await.ok();
352        let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
353        let last_modified = metadata
354            .as_ref()
355            .and_then(|m| m.modified().ok())
356            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
357            .map(|d| d.as_millis() as u64)
358            .unwrap_or(0);
359
360        let relative_path = file_path
361            .strip_prefix(base_path)
362            .unwrap_or(&file_path)
363            .to_string_lossy()
364            .to_string();
365
366        let absolute_path = file_path
367            .canonicalize()
368            .unwrap_or_else(|_| file_path.clone())
369            .to_string_lossy()
370            .to_string();
371
372        let mut exchange = Exchange::new(Message::new(Body::Bytes(bytes::Bytes::from(content))));
373        exchange
374            .input
375            .set_header("CamelFileName", serde_json::Value::String(relative_path));
376        exchange.input.set_header(
377            "CamelFileNameOnly",
378            serde_json::Value::String(file_name.clone()),
379        );
380        exchange.input.set_header(
381            "CamelFileAbsolutePath",
382            serde_json::Value::String(absolute_path),
383        );
384        exchange.input.set_header(
385            "CamelFileLength",
386            serde_json::Value::Number(file_len.into()),
387        );
388        exchange.input.set_header(
389            "CamelFileLastModified",
390            serde_json::Value::Number(last_modified.into()),
391        );
392
393        debug!(
394            file = %file_path.display(),
395            correlation_id = %exchange.correlation_id(),
396            "Processing file"
397        );
398
399        if context.send(exchange).await.is_err() {
400            break;
401        }
402
403        if config.noop {
404            // Do nothing
405        } else if config.delete {
406            if let Err(e) = fs::remove_file(&file_path).await {
407                warn!(file = %file_path.display(), error = %e, "Failed to delete file");
408            }
409        } else if let Some(ref move_dir) = config.move_to {
410            let target_dir = base_path.join(move_dir);
411            if let Err(e) = fs::create_dir_all(&target_dir).await {
412                warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
413                continue;
414            }
415            let target_path = target_dir.join(&file_name);
416            if let Err(e) = fs::rename(&file_path, &target_path).await {
417                warn!(
418                    from = %file_path.display(),
419                    to = %target_path.display(),
420                    error = %e,
421                    "Failed to move file"
422                );
423            }
424        }
425    }
426
427    Ok(())
428}
429
430async fn list_files(
431    dir: &std::path::Path,
432    recursive: bool,
433) -> Result<Vec<std::path::PathBuf>, CamelError> {
434    let mut files = Vec::new();
435    let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
436
437    while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
438        let path = entry.path();
439        if path.is_file() {
440            files.push(path);
441        } else if path.is_dir() && recursive {
442            let mut sub_files = Box::pin(list_files(&path, true)).await?;
443            files.append(&mut sub_files);
444        }
445    }
446
447    files.sort();
448    Ok(files)
449}
450
451// ---------------------------------------------------------------------------
452// Path validation for security
453// ---------------------------------------------------------------------------
454
455fn validate_path_is_within_base(
456    base_dir: &std::path::Path,
457    target_path: &std::path::Path,
458) -> Result<(), CamelError> {
459    let canonical_base = base_dir.canonicalize().map_err(|e| {
460        CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
461    })?;
462
463    // For non-existent paths, canonicalize the parent and construct the full path
464    let canonical_target = if target_path.exists() {
465        target_path.canonicalize().map_err(|e| {
466            CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
467        })?
468    } else if let Some(parent) = target_path.parent() {
469        // Ensure parent exists (should have been created by auto_create)
470        if !parent.exists() {
471            return Err(CamelError::ProcessorError(format!(
472                "Parent directory '{}' does not exist",
473                parent.display()
474            )));
475        }
476        let canonical_parent = parent.canonicalize().map_err(|e| {
477            CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
478        })?;
479        // Reconstruct the full path with the filename
480        if let Some(filename) = target_path.file_name() {
481            canonical_parent.join(filename)
482        } else {
483            return Err(CamelError::ProcessorError(
484                "Invalid target path: no filename".to_string(),
485            ));
486        }
487    } else {
488        return Err(CamelError::ProcessorError(
489            "Invalid target path: no parent directory".to_string(),
490        ));
491    };
492
493    if !canonical_target.starts_with(&canonical_base) {
494        return Err(CamelError::ProcessorError(format!(
495            "Path '{}' is outside base directory '{}'",
496            canonical_target.display(),
497            canonical_base.display()
498        )));
499    }
500
501    Ok(())
502}
503
504// ---------------------------------------------------------------------------
505// FileProducer
506// ---------------------------------------------------------------------------
507
508#[derive(Clone)]
509struct FileProducer {
510    config: FileConfig,
511}
512
513impl FileProducer {
514    fn body_to_bytes(body: &Body) -> Result<Vec<u8>, CamelError> {
515        match body {
516            Body::Empty => Err(CamelError::ProcessorError(
517                "Cannot write empty body to file".to_string(),
518            )),
519            Body::Bytes(b) => Ok(b.to_vec()),
520            Body::Text(s) => Ok(s.as_bytes().to_vec()),
521            Body::Json(v) => Ok(v.to_string().into_bytes()),
522        }
523    }
524
525    fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
526        if let Some(name) = exchange
527            .input
528            .header("CamelFileName")
529            .and_then(|v| v.as_str())
530        {
531            return Ok(name.to_string());
532        }
533        if let Some(ref name) = config.file_name {
534            return Ok(name.clone());
535        }
536        Err(CamelError::ProcessorError(
537            "No filename specified: set CamelFileName header or fileName option".to_string(),
538        ))
539    }
540}
541
542impl Service<Exchange> for FileProducer {
543    type Response = Exchange;
544    type Error = CamelError;
545    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
546
547    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
548        Poll::Ready(Ok(()))
549    }
550
551    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
552        let config = self.config.clone();
553
554        Box::pin(async move {
555            let file_name = FileProducer::resolve_filename(&exchange, &config)?;
556            let data = FileProducer::body_to_bytes(&exchange.input.body)?;
557
558            let dir_path = std::path::Path::new(&config.directory);
559            let target_path = dir_path.join(&file_name);
560
561            if config.auto_create
562                && let Some(parent) = target_path.parent()
563            {
564                tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
565                    .await
566                    .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
567                    .map_err(CamelError::from)?;
568            }
569
570            // SECURITY: Validate path is within base directory
571            validate_path_is_within_base(dir_path, &target_path)?;
572
573            if target_path.exists() {
574                match config.file_exist {
575                    FileExistStrategy::Fail => {
576                        return Err(CamelError::ProcessorError(format!(
577                            "File already exists: {}",
578                            target_path.display()
579                        )));
580                    }
581                    FileExistStrategy::Append => {
582                        use tokio::io::AsyncWriteExt;
583                        let mut file = tokio::time::timeout(
584                            config.write_timeout,
585                            fs::OpenOptions::new().append(true).open(&target_path),
586                        )
587                        .await
588                        .map_err(|_| {
589                            CamelError::ProcessorError("Timeout opening file for append".into())
590                        })?
591                        .map_err(CamelError::from)?;
592
593                        tokio::time::timeout(config.write_timeout, async {
594                            file.write_all(&data).await?;
595                            file.flush().await?;
596                            Ok::<_, std::io::Error>(())
597                        })
598                        .await
599                        .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
600                        .map_err(CamelError::from)?;
601
602                        let abs_path = target_path
603                            .canonicalize()
604                            .unwrap_or_else(|_| target_path.clone())
605                            .to_string_lossy()
606                            .to_string();
607                        exchange.input.set_header(
608                            "CamelFileNameProduced",
609                            serde_json::Value::String(abs_path),
610                        );
611                        return Ok(exchange);
612                    }
613                    FileExistStrategy::Override => {}
614                }
615            }
616
617            if let Some(ref prefix) = config.temp_prefix {
618                let temp_name = format!("{prefix}{file_name}");
619                let temp_path = dir_path.join(&temp_name);
620
621                tokio::time::timeout(config.write_timeout, fs::write(&temp_path, &data))
622                    .await
623                    .map_err(|_| CamelError::ProcessorError("Timeout writing temp file".into()))?
624                    .map_err(CamelError::from)?;
625
626                tokio::time::timeout(config.write_timeout, fs::rename(&temp_path, &target_path))
627                    .await
628                    .map_err(|_| CamelError::ProcessorError("Timeout renaming file".into()))?
629                    .map_err(CamelError::from)?;
630            } else {
631                tokio::time::timeout(config.write_timeout, fs::write(&target_path, &data))
632                    .await
633                    .map_err(|_| CamelError::ProcessorError("Timeout writing file".into()))?
634                    .map_err(CamelError::from)?;
635            }
636
637            let abs_path = target_path
638                .canonicalize()
639                .unwrap_or_else(|_| target_path.clone())
640                .to_string_lossy()
641                .to_string();
642            exchange
643                .input
644                .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
645
646            debug!(
647                file = %target_path.display(),
648                correlation_id = %exchange.correlation_id(),
649                "File written"
650            );
651            Ok(exchange)
652        })
653    }
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659    use std::sync::Arc;
660    use std::time::Duration;
661    use tokio::sync::Mutex;
662    use tokio_util::sync::CancellationToken;
663
664    // NullRouteController for testing
665    struct NullRouteController;
666
667    #[async_trait::async_trait]
668    impl camel_api::RouteController for NullRouteController {
669        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
670            Ok(())
671        }
672        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
673            Ok(())
674        }
675        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
676            Ok(())
677        }
678        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
679            Ok(())
680        }
681        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
682            Ok(())
683        }
684        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
685            None
686        }
687        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
688            Ok(())
689        }
690        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
691            Ok(())
692        }
693    }
694
695    fn test_producer_ctx() -> ProducerContext {
696        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
697    }
698
699    #[test]
700    fn test_file_config_defaults() {
701        let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
702        assert_eq!(config.directory, "/tmp/inbox");
703        assert_eq!(config.delay, Duration::from_millis(500));
704        assert_eq!(config.initial_delay, Duration::from_millis(1000));
705        assert!(!config.noop);
706        assert!(!config.delete);
707        assert_eq!(config.move_to, Some(".camel".to_string()));
708        assert!(config.file_name.is_none());
709        assert!(config.include.is_none());
710        assert!(config.exclude.is_none());
711        assert!(!config.recursive);
712        assert_eq!(config.file_exist, FileExistStrategy::Override);
713        assert!(config.temp_prefix.is_none());
714        assert!(config.auto_create);
715        // New timeout defaults
716        assert_eq!(config.read_timeout, Duration::from_secs(30));
717        assert_eq!(config.write_timeout, Duration::from_secs(30));
718    }
719
720    #[test]
721    fn test_file_config_consumer_options() {
722        let config = FileConfig::from_uri(
723            "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
724        ).unwrap();
725        assert_eq!(config.directory, "/data/input");
726        assert_eq!(config.delay, Duration::from_millis(1000));
727        assert_eq!(config.initial_delay, Duration::from_millis(2000));
728        assert!(config.noop);
729        assert!(config.recursive);
730        assert_eq!(config.include, Some(".*\\.csv".to_string()));
731    }
732
733    #[test]
734    fn test_file_config_producer_options() {
735        let config = FileConfig::from_uri(
736            "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
737        )
738        .unwrap();
739        assert_eq!(config.file_exist, FileExistStrategy::Append);
740        assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
741        assert!(!config.auto_create);
742        assert_eq!(config.file_name, Some("out.txt".to_string()));
743    }
744
745    #[test]
746    fn test_file_config_delete_mode() {
747        let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
748        assert!(config.delete);
749        assert!(config.move_to.is_none());
750    }
751
752    #[test]
753    fn test_file_config_noop_mode() {
754        let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
755        assert!(config.noop);
756        assert!(config.move_to.is_none());
757    }
758
759    #[test]
760    fn test_file_config_wrong_scheme() {
761        let result = FileConfig::from_uri("timer:tick");
762        assert!(result.is_err());
763    }
764
765    #[test]
766    fn test_file_component_scheme() {
767        let component = FileComponent::new();
768        assert_eq!(component.scheme(), "file");
769    }
770
771    #[test]
772    fn test_file_component_creates_endpoint() {
773        let component = FileComponent::new();
774        let endpoint = component.create_endpoint("file:/tmp/test");
775        assert!(endpoint.is_ok());
776    }
777
778    // -----------------------------------------------------------------------
779    // Consumer tests
780    // -----------------------------------------------------------------------
781
782    #[tokio::test]
783    async fn test_file_consumer_reads_files() {
784        let dir = tempfile::tempdir().unwrap();
785        let dir_path = dir.path().to_str().unwrap();
786
787        std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
788        std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
789
790        let component = FileComponent::new();
791        let endpoint = component
792            .create_endpoint(&format!(
793                "file:{dir_path}?noop=true&initialDelay=0&delay=100"
794            ))
795            .unwrap();
796        let mut consumer = endpoint.create_consumer().unwrap();
797
798        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
799        let token = CancellationToken::new();
800        let ctx = ConsumerContext::new(tx, token.clone());
801
802        tokio::spawn(async move {
803            consumer.start(ctx).await.unwrap();
804        });
805
806        let mut received = Vec::new();
807        let timeout = tokio::time::timeout(Duration::from_secs(2), async {
808            while let Some(envelope) = rx.recv().await {
809                received.push(envelope.exchange);
810                if received.len() == 2 {
811                    break;
812                }
813            }
814        })
815        .await;
816        token.cancel();
817
818        assert!(timeout.is_ok(), "Should have received 2 exchanges");
819        assert_eq!(received.len(), 2);
820
821        for ex in &received {
822            assert!(ex.input.header("CamelFileName").is_some());
823            assert!(ex.input.header("CamelFileNameOnly").is_some());
824            assert!(ex.input.header("CamelFileAbsolutePath").is_some());
825            assert!(ex.input.header("CamelFileLength").is_some());
826            assert!(ex.input.header("CamelFileLastModified").is_some());
827        }
828    }
829
830    #[tokio::test]
831    async fn test_file_consumer_include_filter() {
832        let dir = tempfile::tempdir().unwrap();
833        let dir_path = dir.path().to_str().unwrap();
834
835        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
836        std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
837
838        let component = FileComponent::new();
839        let endpoint = component
840            .create_endpoint(&format!(
841                "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
842            ))
843            .unwrap();
844        let mut consumer = endpoint.create_consumer().unwrap();
845
846        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
847        let token = CancellationToken::new();
848        let ctx = ConsumerContext::new(tx, token.clone());
849
850        tokio::spawn(async move {
851            consumer.start(ctx).await.unwrap();
852        });
853
854        let mut received = Vec::new();
855        let _ = tokio::time::timeout(Duration::from_millis(500), async {
856            while let Some(envelope) = rx.recv().await {
857                received.push(envelope.exchange);
858                if received.len() == 1 {
859                    break;
860                }
861            }
862        })
863        .await;
864        token.cancel();
865
866        assert_eq!(received.len(), 1);
867        let name = received[0]
868            .input
869            .header("CamelFileNameOnly")
870            .and_then(|v| v.as_str())
871            .unwrap();
872        assert_eq!(name, "data.csv");
873    }
874
875    #[tokio::test]
876    async fn test_file_consumer_delete_mode() {
877        let dir = tempfile::tempdir().unwrap();
878        let dir_path = dir.path().to_str().unwrap();
879
880        std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
881
882        let component = FileComponent::new();
883        let endpoint = component
884            .create_endpoint(&format!(
885                "file:{dir_path}?delete=true&initialDelay=0&delay=100"
886            ))
887            .unwrap();
888        let mut consumer = endpoint.create_consumer().unwrap();
889
890        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
891        let token = CancellationToken::new();
892        let ctx = ConsumerContext::new(tx, token.clone());
893
894        tokio::spawn(async move {
895            consumer.start(ctx).await.unwrap();
896        });
897
898        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
899        token.cancel();
900
901        tokio::time::sleep(Duration::from_millis(100)).await;
902
903        assert!(
904            !dir.path().join("deleteme.txt").exists(),
905            "File should be deleted"
906        );
907    }
908
909    #[tokio::test]
910    async fn test_file_consumer_move_mode() {
911        let dir = tempfile::tempdir().unwrap();
912        let dir_path = dir.path().to_str().unwrap();
913
914        std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
915
916        let component = FileComponent::new();
917        let endpoint = component
918            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
919            .unwrap();
920        let mut consumer = endpoint.create_consumer().unwrap();
921
922        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
923        let token = CancellationToken::new();
924        let ctx = ConsumerContext::new(tx, token.clone());
925
926        tokio::spawn(async move {
927            consumer.start(ctx).await.unwrap();
928        });
929
930        let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
931        token.cancel();
932
933        tokio::time::sleep(Duration::from_millis(100)).await;
934
935        assert!(
936            !dir.path().join("moveme.txt").exists(),
937            "Original file should be gone"
938        );
939        assert!(
940            dir.path().join(".camel").join("moveme.txt").exists(),
941            "File should be in .camel/"
942        );
943    }
944
945    #[tokio::test]
946    async fn test_file_consumer_respects_cancellation() {
947        let dir = tempfile::tempdir().unwrap();
948        let dir_path = dir.path().to_str().unwrap();
949
950        let component = FileComponent::new();
951        let endpoint = component
952            .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
953            .unwrap();
954        let mut consumer = endpoint.create_consumer().unwrap();
955
956        let (tx, _rx) = tokio::sync::mpsc::channel(16);
957        let token = CancellationToken::new();
958        let ctx = ConsumerContext::new(tx, token.clone());
959
960        let handle = tokio::spawn(async move {
961            consumer.start(ctx).await.unwrap();
962        });
963
964        tokio::time::sleep(Duration::from_millis(150)).await;
965        token.cancel();
966
967        let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
968        assert!(
969            result.is_ok(),
970            "Consumer should have stopped after cancellation"
971        );
972    }
973
974    // -----------------------------------------------------------------------
975    // Producer tests
976    // -----------------------------------------------------------------------
977
978    #[tokio::test]
979    async fn test_file_producer_writes_file() {
980        use tower::ServiceExt;
981
982        let dir = tempfile::tempdir().unwrap();
983        let dir_path = dir.path().to_str().unwrap();
984
985        let component = FileComponent::new();
986        let endpoint = component
987            .create_endpoint(&format!("file:{dir_path}"))
988            .unwrap();
989        let ctx = test_producer_ctx();
990        let producer = endpoint.create_producer(&ctx).unwrap();
991
992        let mut exchange = Exchange::new(Message::new("file content"));
993        exchange.input.set_header(
994            "CamelFileName",
995            serde_json::Value::String("output.txt".to_string()),
996        );
997
998        let result = producer.oneshot(exchange).await.unwrap();
999
1000        let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1001        assert_eq!(content, "file content");
1002
1003        assert!(result.input.header("CamelFileNameProduced").is_some());
1004    }
1005
1006    #[tokio::test]
1007    async fn test_file_producer_auto_create_dirs() {
1008        use tower::ServiceExt;
1009
1010        let dir = tempfile::tempdir().unwrap();
1011        let dir_path = dir.path().to_str().unwrap();
1012
1013        let component = FileComponent::new();
1014        let endpoint = component
1015            .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1016            .unwrap();
1017        let ctx = test_producer_ctx();
1018        let producer = endpoint.create_producer(&ctx).unwrap();
1019
1020        let mut exchange = Exchange::new(Message::new("nested"));
1021        exchange.input.set_header(
1022            "CamelFileName",
1023            serde_json::Value::String("file.txt".to_string()),
1024        );
1025
1026        producer.oneshot(exchange).await.unwrap();
1027
1028        assert!(dir.path().join("sub/dir/file.txt").exists());
1029    }
1030
1031    #[tokio::test]
1032    async fn test_file_producer_file_exist_fail() {
1033        use tower::ServiceExt;
1034
1035        let dir = tempfile::tempdir().unwrap();
1036        let dir_path = dir.path().to_str().unwrap();
1037
1038        std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1039
1040        let component = FileComponent::new();
1041        let endpoint = component
1042            .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1043            .unwrap();
1044        let ctx = test_producer_ctx();
1045        let producer = endpoint.create_producer(&ctx).unwrap();
1046
1047        let mut exchange = Exchange::new(Message::new("new"));
1048        exchange.input.set_header(
1049            "CamelFileName",
1050            serde_json::Value::String("existing.txt".to_string()),
1051        );
1052
1053        let result = producer.oneshot(exchange).await;
1054        assert!(
1055            result.is_err(),
1056            "Should fail when file exists with Fail strategy"
1057        );
1058    }
1059
1060    #[tokio::test]
1061    async fn test_file_producer_file_exist_append() {
1062        use tower::ServiceExt;
1063
1064        let dir = tempfile::tempdir().unwrap();
1065        let dir_path = dir.path().to_str().unwrap();
1066
1067        std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1068
1069        let component = FileComponent::new();
1070        let endpoint = component
1071            .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1072            .unwrap();
1073        let ctx = test_producer_ctx();
1074        let producer = endpoint.create_producer(&ctx).unwrap();
1075
1076        let mut exchange = Exchange::new(Message::new("new"));
1077        exchange.input.set_header(
1078            "CamelFileName",
1079            serde_json::Value::String("append.txt".to_string()),
1080        );
1081
1082        producer.oneshot(exchange).await.unwrap();
1083
1084        let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1085        assert_eq!(content, "oldnew");
1086    }
1087
1088    #[tokio::test]
1089    async fn test_file_producer_temp_prefix() {
1090        use tower::ServiceExt;
1091
1092        let dir = tempfile::tempdir().unwrap();
1093        let dir_path = dir.path().to_str().unwrap();
1094
1095        let component = FileComponent::new();
1096        let endpoint = component
1097            .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1098            .unwrap();
1099        let ctx = test_producer_ctx();
1100        let producer = endpoint.create_producer(&ctx).unwrap();
1101
1102        let mut exchange = Exchange::new(Message::new("atomic write"));
1103        exchange.input.set_header(
1104            "CamelFileName",
1105            serde_json::Value::String("final.txt".to_string()),
1106        );
1107
1108        producer.oneshot(exchange).await.unwrap();
1109
1110        assert!(dir.path().join("final.txt").exists());
1111        assert!(!dir.path().join(".tmpfinal.txt").exists());
1112        let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1113        assert_eq!(content, "atomic write");
1114    }
1115
1116    #[tokio::test]
1117    async fn test_file_producer_uses_filename_option() {
1118        use tower::ServiceExt;
1119
1120        let dir = tempfile::tempdir().unwrap();
1121        let dir_path = dir.path().to_str().unwrap();
1122
1123        let component = FileComponent::new();
1124        let endpoint = component
1125            .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1126            .unwrap();
1127        let ctx = test_producer_ctx();
1128        let producer = endpoint.create_producer(&ctx).unwrap();
1129
1130        let exchange = Exchange::new(Message::new("content"));
1131
1132        producer.oneshot(exchange).await.unwrap();
1133        assert!(dir.path().join("fixed.txt").exists());
1134    }
1135
1136    #[tokio::test]
1137    async fn test_file_producer_no_filename_errors() {
1138        use tower::ServiceExt;
1139
1140        let dir = tempfile::tempdir().unwrap();
1141        let dir_path = dir.path().to_str().unwrap();
1142
1143        let component = FileComponent::new();
1144        let endpoint = component
1145            .create_endpoint(&format!("file:{dir_path}"))
1146            .unwrap();
1147        let ctx = test_producer_ctx();
1148        let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150        let exchange = Exchange::new(Message::new("content"));
1151
1152        let result = producer.oneshot(exchange).await;
1153        assert!(result.is_err(), "Should error when no filename is provided");
1154    }
1155
1156    // -----------------------------------------------------------------------
1157    // Security tests - Path traversal protection
1158    // -----------------------------------------------------------------------
1159
1160    #[tokio::test]
1161    async fn test_file_producer_rejects_path_traversal_parent_directory() {
1162        use tower::ServiceExt;
1163
1164        let dir = tempfile::tempdir().unwrap();
1165        let dir_path = dir.path().to_str().unwrap();
1166
1167        // Create a subdirectory
1168        std::fs::create_dir(dir.path().join("subdir")).unwrap();
1169        std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1170
1171        let component = FileComponent::new();
1172        let endpoint = component
1173            .create_endpoint(&format!("file:{dir_path}/subdir"))
1174            .unwrap();
1175        let ctx = test_producer_ctx();
1176        let producer = endpoint.create_producer(&ctx).unwrap();
1177
1178        let mut exchange = Exchange::new(Message::new("malicious"));
1179        exchange.input.set_header(
1180            "CamelFileName",
1181            serde_json::Value::String("../secret.txt".to_string()),
1182        );
1183
1184        let result = producer.oneshot(exchange).await;
1185        assert!(result.is_err(), "Should reject path traversal attempt");
1186
1187        let err = result.unwrap_err();
1188        assert!(
1189            err.to_string().contains("outside"),
1190            "Error should mention path is outside base directory"
1191        );
1192    }
1193
1194    #[tokio::test]
1195    async fn test_file_producer_rejects_absolute_path_outside_base() {
1196        use tower::ServiceExt;
1197
1198        let dir = tempfile::tempdir().unwrap();
1199        let dir_path = dir.path().to_str().unwrap();
1200
1201        let component = FileComponent::new();
1202        let endpoint = component
1203            .create_endpoint(&format!("file:{dir_path}"))
1204            .unwrap();
1205        let ctx = test_producer_ctx();
1206        let producer = endpoint.create_producer(&ctx).unwrap();
1207
1208        let mut exchange = Exchange::new(Message::new("malicious"));
1209        exchange.input.set_header(
1210            "CamelFileName",
1211            serde_json::Value::String("/etc/passwd".to_string()),
1212        );
1213
1214        let result = producer.oneshot(exchange).await;
1215        assert!(result.is_err(), "Should reject absolute path outside base");
1216    }
1217}