1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use regex::Regex;
8use tokio::fs;
9use tokio::time;
10use tower::Service;
11use tracing::{debug, warn};
12
13use camel_api::{BoxProcessor, CamelError, Exchange, Message, body::Body};
14use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
15use camel_endpoint::parse_uri;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum FileExistStrategy {
23 Override,
24 Append,
25 Fail,
26}
27
28impl FileExistStrategy {
29 fn from_str(s: &str) -> Self {
30 match s {
31 "Append" | "append" => FileExistStrategy::Append,
32 "Fail" | "fail" => FileExistStrategy::Fail,
33 _ => FileExistStrategy::Override,
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
43pub struct FileConfig {
44 pub directory: String,
45 pub delay: Duration,
46 pub initial_delay: Duration,
47 pub noop: bool,
48 pub delete: bool,
49 pub move_to: Option<String>,
50 pub file_name: Option<String>,
51 pub include: Option<String>,
52 pub exclude: Option<String>,
53 pub recursive: bool,
54 pub file_exist: FileExistStrategy,
55 pub temp_prefix: Option<String>,
56 pub auto_create: bool,
57 pub read_timeout: Duration,
59 pub write_timeout: Duration,
60}
61
62impl FileConfig {
63 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
64 let parts = parse_uri(uri)?;
65 if parts.scheme != "file" {
66 return Err(CamelError::InvalidUri(format!(
67 "expected scheme 'file', got '{}'",
68 parts.scheme
69 )));
70 }
71
72 let delay = parts
73 .params
74 .get("delay")
75 .and_then(|v| v.parse::<u64>().ok())
76 .unwrap_or(500);
77
78 let initial_delay = parts
79 .params
80 .get("initialDelay")
81 .and_then(|v| v.parse::<u64>().ok())
82 .unwrap_or(1000);
83
84 let noop = parts
85 .params
86 .get("noop")
87 .map(|v| v == "true")
88 .unwrap_or(false);
89
90 let delete = parts
91 .params
92 .get("delete")
93 .map(|v| v == "true")
94 .unwrap_or(false);
95
96 let move_to = if noop || delete {
97 None
98 } else {
99 Some(
100 parts
101 .params
102 .get("move")
103 .cloned()
104 .unwrap_or_else(|| ".camel".to_string()),
105 )
106 };
107
108 let file_name = parts.params.get("fileName").cloned();
109 let include = parts.params.get("include").cloned();
110 let exclude = parts.params.get("exclude").cloned();
111
112 let recursive = parts
113 .params
114 .get("recursive")
115 .map(|v| v == "true")
116 .unwrap_or(false);
117
118 let file_exist = parts
119 .params
120 .get("fileExist")
121 .map(|v| FileExistStrategy::from_str(v))
122 .unwrap_or(FileExistStrategy::Override);
123
124 let temp_prefix = parts.params.get("tempPrefix").cloned();
125
126 let auto_create = parts
127 .params
128 .get("autoCreate")
129 .map(|v| v != "false")
130 .unwrap_or(true);
131
132 let read_timeout = parts
133 .params
134 .get("readTimeout")
135 .and_then(|v| v.parse::<u64>().ok())
136 .map(Duration::from_millis)
137 .unwrap_or(Duration::from_secs(30));
138
139 let write_timeout = parts
140 .params
141 .get("writeTimeout")
142 .and_then(|v| v.parse::<u64>().ok())
143 .map(Duration::from_millis)
144 .unwrap_or(Duration::from_secs(30));
145
146 Ok(Self {
147 directory: parts.path,
148 delay: Duration::from_millis(delay),
149 initial_delay: Duration::from_millis(initial_delay),
150 noop,
151 delete,
152 move_to,
153 file_name,
154 include,
155 exclude,
156 recursive,
157 file_exist,
158 temp_prefix,
159 auto_create,
160 read_timeout,
161 write_timeout,
162 })
163 }
164}
165
166pub struct FileComponent;
171
172impl FileComponent {
173 pub fn new() -> Self {
174 Self
175 }
176}
177
178impl Default for FileComponent {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184impl Component for FileComponent {
185 fn scheme(&self) -> &str {
186 "file"
187 }
188
189 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
190 let config = FileConfig::from_uri(uri)?;
191 Ok(Box::new(FileEndpoint {
192 uri: uri.to_string(),
193 config,
194 }))
195 }
196}
197
198struct FileEndpoint {
203 uri: String,
204 config: FileConfig,
205}
206
207impl Endpoint for FileEndpoint {
208 fn uri(&self) -> &str {
209 &self.uri
210 }
211
212 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
213 Ok(Box::new(FileConsumer {
214 config: self.config.clone(),
215 }))
216 }
217
218 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
219 Ok(BoxProcessor::new(FileProducer {
220 config: self.config.clone(),
221 }))
222 }
223}
224
225struct FileConsumer {
230 config: FileConfig,
231}
232
233#[async_trait]
234impl Consumer for FileConsumer {
235 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
236 let config = self.config.clone();
237
238 let include_re = config
239 .include
240 .as_ref()
241 .map(|p| Regex::new(p))
242 .transpose()
243 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
244 let exclude_re = config
245 .exclude
246 .as_ref()
247 .map(|p| Regex::new(p))
248 .transpose()
249 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
250
251 if !config.initial_delay.is_zero() {
252 tokio::select! {
253 _ = time::sleep(config.initial_delay) => {}
254 _ = context.cancelled() => {
255 debug!(directory = config.directory, "File consumer cancelled during initial delay");
256 return Ok(());
257 }
258 }
259 }
260
261 let mut interval = time::interval(config.delay);
262
263 loop {
264 tokio::select! {
265 _ = context.cancelled() => {
266 debug!(directory = config.directory, "File consumer received cancellation, stopping");
267 break;
268 }
269 _ = interval.tick() => {
270 if let Err(e) = poll_directory(
271 &config,
272 &context,
273 &include_re,
274 &exclude_re,
275 ).await {
276 warn!(directory = config.directory, error = %e, "Error polling directory");
277 }
278 }
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn stop(&mut self) -> Result<(), CamelError> {
286 Ok(())
287 }
288}
289
290async fn poll_directory(
291 config: &FileConfig,
292 context: &ConsumerContext,
293 include_re: &Option<Regex>,
294 exclude_re: &Option<Regex>,
295) -> Result<(), CamelError> {
296 let base_path = std::path::Path::new(&config.directory);
297
298 let files = list_files(base_path, config.recursive).await?;
299
300 for file_path in files {
301 let file_name = file_path
302 .file_name()
303 .and_then(|n| n.to_str())
304 .unwrap_or_default()
305 .to_string();
306
307 if let Some(ref target_name) = config.file_name
308 && file_name != *target_name
309 {
310 continue;
311 }
312
313 if let Some(re) = include_re
314 && !re.is_match(&file_name)
315 {
316 continue;
317 }
318
319 if let Some(re) = exclude_re
320 && re.is_match(&file_name)
321 {
322 continue;
323 }
324
325 if let Some(ref move_dir) = config.move_to
326 && file_path.starts_with(base_path.join(move_dir))
327 {
328 continue;
329 }
330
331 let content = match tokio::time::timeout(config.read_timeout, fs::read(&file_path)).await {
332 Ok(Ok(c)) => c,
333 Ok(Err(e)) => {
334 warn!(
335 file = %file_path.display(),
336 error = %e,
337 "Failed to read file"
338 );
339 continue;
340 }
341 Err(_) => {
342 warn!(
343 file = %file_path.display(),
344 timeout_ms = config.read_timeout.as_millis(),
345 "Timeout reading file"
346 );
347 continue;
348 }
349 };
350
351 let metadata = fs::metadata(&file_path).await.ok();
352 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
353 let last_modified = metadata
354 .as_ref()
355 .and_then(|m| m.modified().ok())
356 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
357 .map(|d| d.as_millis() as u64)
358 .unwrap_or(0);
359
360 let relative_path = file_path
361 .strip_prefix(base_path)
362 .unwrap_or(&file_path)
363 .to_string_lossy()
364 .to_string();
365
366 let absolute_path = file_path
367 .canonicalize()
368 .unwrap_or_else(|_| file_path.clone())
369 .to_string_lossy()
370 .to_string();
371
372 let mut exchange = Exchange::new(Message::new(Body::Bytes(bytes::Bytes::from(content))));
373 exchange
374 .input
375 .set_header("CamelFileName", serde_json::Value::String(relative_path));
376 exchange.input.set_header(
377 "CamelFileNameOnly",
378 serde_json::Value::String(file_name.clone()),
379 );
380 exchange.input.set_header(
381 "CamelFileAbsolutePath",
382 serde_json::Value::String(absolute_path),
383 );
384 exchange.input.set_header(
385 "CamelFileLength",
386 serde_json::Value::Number(file_len.into()),
387 );
388 exchange.input.set_header(
389 "CamelFileLastModified",
390 serde_json::Value::Number(last_modified.into()),
391 );
392
393 debug!(
394 file = %file_path.display(),
395 correlation_id = %exchange.correlation_id(),
396 "Processing file"
397 );
398
399 if context.send(exchange).await.is_err() {
400 break;
401 }
402
403 if config.noop {
404 } else if config.delete {
406 if let Err(e) = fs::remove_file(&file_path).await {
407 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
408 }
409 } else if let Some(ref move_dir) = config.move_to {
410 let target_dir = base_path.join(move_dir);
411 if let Err(e) = fs::create_dir_all(&target_dir).await {
412 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
413 continue;
414 }
415 let target_path = target_dir.join(&file_name);
416 if let Err(e) = fs::rename(&file_path, &target_path).await {
417 warn!(
418 from = %file_path.display(),
419 to = %target_path.display(),
420 error = %e,
421 "Failed to move file"
422 );
423 }
424 }
425 }
426
427 Ok(())
428}
429
430async fn list_files(
431 dir: &std::path::Path,
432 recursive: bool,
433) -> Result<Vec<std::path::PathBuf>, CamelError> {
434 let mut files = Vec::new();
435 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
436
437 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
438 let path = entry.path();
439 if path.is_file() {
440 files.push(path);
441 } else if path.is_dir() && recursive {
442 let mut sub_files = Box::pin(list_files(&path, true)).await?;
443 files.append(&mut sub_files);
444 }
445 }
446
447 files.sort();
448 Ok(files)
449}
450
451fn validate_path_is_within_base(
456 base_dir: &std::path::Path,
457 target_path: &std::path::Path,
458) -> Result<(), CamelError> {
459 let canonical_base = base_dir.canonicalize().map_err(|e| {
460 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
461 })?;
462
463 let canonical_target = if target_path.exists() {
465 target_path.canonicalize().map_err(|e| {
466 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
467 })?
468 } else if let Some(parent) = target_path.parent() {
469 if !parent.exists() {
471 return Err(CamelError::ProcessorError(format!(
472 "Parent directory '{}' does not exist",
473 parent.display()
474 )));
475 }
476 let canonical_parent = parent.canonicalize().map_err(|e| {
477 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
478 })?;
479 if let Some(filename) = target_path.file_name() {
481 canonical_parent.join(filename)
482 } else {
483 return Err(CamelError::ProcessorError(
484 "Invalid target path: no filename".to_string(),
485 ));
486 }
487 } else {
488 return Err(CamelError::ProcessorError(
489 "Invalid target path: no parent directory".to_string(),
490 ));
491 };
492
493 if !canonical_target.starts_with(&canonical_base) {
494 return Err(CamelError::ProcessorError(format!(
495 "Path '{}' is outside base directory '{}'",
496 canonical_target.display(),
497 canonical_base.display()
498 )));
499 }
500
501 Ok(())
502}
503
504#[derive(Clone)]
509struct FileProducer {
510 config: FileConfig,
511}
512
513impl FileProducer {
514 fn body_to_bytes(body: &Body) -> Result<Vec<u8>, CamelError> {
515 match body {
516 Body::Empty => Err(CamelError::ProcessorError(
517 "Cannot write empty body to file".to_string(),
518 )),
519 Body::Bytes(b) => Ok(b.to_vec()),
520 Body::Text(s) => Ok(s.as_bytes().to_vec()),
521 Body::Json(v) => Ok(v.to_string().into_bytes()),
522 }
523 }
524
525 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
526 if let Some(name) = exchange
527 .input
528 .header("CamelFileName")
529 .and_then(|v| v.as_str())
530 {
531 return Ok(name.to_string());
532 }
533 if let Some(ref name) = config.file_name {
534 return Ok(name.clone());
535 }
536 Err(CamelError::ProcessorError(
537 "No filename specified: set CamelFileName header or fileName option".to_string(),
538 ))
539 }
540}
541
542impl Service<Exchange> for FileProducer {
543 type Response = Exchange;
544 type Error = CamelError;
545 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
546
547 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
548 Poll::Ready(Ok(()))
549 }
550
551 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
552 let config = self.config.clone();
553
554 Box::pin(async move {
555 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
556 let data = FileProducer::body_to_bytes(&exchange.input.body)?;
557
558 let dir_path = std::path::Path::new(&config.directory);
559 let target_path = dir_path.join(&file_name);
560
561 if config.auto_create
562 && let Some(parent) = target_path.parent()
563 {
564 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
565 .await
566 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
567 .map_err(CamelError::from)?;
568 }
569
570 validate_path_is_within_base(dir_path, &target_path)?;
572
573 if target_path.exists() {
574 match config.file_exist {
575 FileExistStrategy::Fail => {
576 return Err(CamelError::ProcessorError(format!(
577 "File already exists: {}",
578 target_path.display()
579 )));
580 }
581 FileExistStrategy::Append => {
582 use tokio::io::AsyncWriteExt;
583 let mut file = tokio::time::timeout(
584 config.write_timeout,
585 fs::OpenOptions::new().append(true).open(&target_path),
586 )
587 .await
588 .map_err(|_| {
589 CamelError::ProcessorError("Timeout opening file for append".into())
590 })?
591 .map_err(CamelError::from)?;
592
593 tokio::time::timeout(config.write_timeout, async {
594 file.write_all(&data).await?;
595 file.flush().await?;
596 Ok::<_, std::io::Error>(())
597 })
598 .await
599 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
600 .map_err(CamelError::from)?;
601
602 let abs_path = target_path
603 .canonicalize()
604 .unwrap_or_else(|_| target_path.clone())
605 .to_string_lossy()
606 .to_string();
607 exchange.input.set_header(
608 "CamelFileNameProduced",
609 serde_json::Value::String(abs_path),
610 );
611 return Ok(exchange);
612 }
613 FileExistStrategy::Override => {}
614 }
615 }
616
617 if let Some(ref prefix) = config.temp_prefix {
618 let temp_name = format!("{prefix}{file_name}");
619 let temp_path = dir_path.join(&temp_name);
620
621 tokio::time::timeout(config.write_timeout, fs::write(&temp_path, &data))
622 .await
623 .map_err(|_| CamelError::ProcessorError("Timeout writing temp file".into()))?
624 .map_err(CamelError::from)?;
625
626 tokio::time::timeout(config.write_timeout, fs::rename(&temp_path, &target_path))
627 .await
628 .map_err(|_| CamelError::ProcessorError("Timeout renaming file".into()))?
629 .map_err(CamelError::from)?;
630 } else {
631 tokio::time::timeout(config.write_timeout, fs::write(&target_path, &data))
632 .await
633 .map_err(|_| CamelError::ProcessorError("Timeout writing file".into()))?
634 .map_err(CamelError::from)?;
635 }
636
637 let abs_path = target_path
638 .canonicalize()
639 .unwrap_or_else(|_| target_path.clone())
640 .to_string_lossy()
641 .to_string();
642 exchange
643 .input
644 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
645
646 debug!(
647 file = %target_path.display(),
648 correlation_id = %exchange.correlation_id(),
649 "File written"
650 );
651 Ok(exchange)
652 })
653 }
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use std::sync::Arc;
660 use std::time::Duration;
661 use tokio::sync::Mutex;
662 use tokio_util::sync::CancellationToken;
663
664 struct NullRouteController;
666
667 #[async_trait::async_trait]
668 impl camel_api::RouteController for NullRouteController {
669 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
670 Ok(())
671 }
672 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
673 Ok(())
674 }
675 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
676 Ok(())
677 }
678 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
679 Ok(())
680 }
681 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
682 Ok(())
683 }
684 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
685 None
686 }
687 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
688 Ok(())
689 }
690 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
691 Ok(())
692 }
693 }
694
695 fn test_producer_ctx() -> ProducerContext {
696 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
697 }
698
699 #[test]
700 fn test_file_config_defaults() {
701 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
702 assert_eq!(config.directory, "/tmp/inbox");
703 assert_eq!(config.delay, Duration::from_millis(500));
704 assert_eq!(config.initial_delay, Duration::from_millis(1000));
705 assert!(!config.noop);
706 assert!(!config.delete);
707 assert_eq!(config.move_to, Some(".camel".to_string()));
708 assert!(config.file_name.is_none());
709 assert!(config.include.is_none());
710 assert!(config.exclude.is_none());
711 assert!(!config.recursive);
712 assert_eq!(config.file_exist, FileExistStrategy::Override);
713 assert!(config.temp_prefix.is_none());
714 assert!(config.auto_create);
715 assert_eq!(config.read_timeout, Duration::from_secs(30));
717 assert_eq!(config.write_timeout, Duration::from_secs(30));
718 }
719
720 #[test]
721 fn test_file_config_consumer_options() {
722 let config = FileConfig::from_uri(
723 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
724 ).unwrap();
725 assert_eq!(config.directory, "/data/input");
726 assert_eq!(config.delay, Duration::from_millis(1000));
727 assert_eq!(config.initial_delay, Duration::from_millis(2000));
728 assert!(config.noop);
729 assert!(config.recursive);
730 assert_eq!(config.include, Some(".*\\.csv".to_string()));
731 }
732
733 #[test]
734 fn test_file_config_producer_options() {
735 let config = FileConfig::from_uri(
736 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
737 )
738 .unwrap();
739 assert_eq!(config.file_exist, FileExistStrategy::Append);
740 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
741 assert!(!config.auto_create);
742 assert_eq!(config.file_name, Some("out.txt".to_string()));
743 }
744
745 #[test]
746 fn test_file_config_delete_mode() {
747 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
748 assert!(config.delete);
749 assert!(config.move_to.is_none());
750 }
751
752 #[test]
753 fn test_file_config_noop_mode() {
754 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
755 assert!(config.noop);
756 assert!(config.move_to.is_none());
757 }
758
759 #[test]
760 fn test_file_config_wrong_scheme() {
761 let result = FileConfig::from_uri("timer:tick");
762 assert!(result.is_err());
763 }
764
765 #[test]
766 fn test_file_component_scheme() {
767 let component = FileComponent::new();
768 assert_eq!(component.scheme(), "file");
769 }
770
771 #[test]
772 fn test_file_component_creates_endpoint() {
773 let component = FileComponent::new();
774 let endpoint = component.create_endpoint("file:/tmp/test");
775 assert!(endpoint.is_ok());
776 }
777
778 #[tokio::test]
783 async fn test_file_consumer_reads_files() {
784 let dir = tempfile::tempdir().unwrap();
785 let dir_path = dir.path().to_str().unwrap();
786
787 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
788 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
789
790 let component = FileComponent::new();
791 let endpoint = component
792 .create_endpoint(&format!(
793 "file:{dir_path}?noop=true&initialDelay=0&delay=100"
794 ))
795 .unwrap();
796 let mut consumer = endpoint.create_consumer().unwrap();
797
798 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
799 let token = CancellationToken::new();
800 let ctx = ConsumerContext::new(tx, token.clone());
801
802 tokio::spawn(async move {
803 consumer.start(ctx).await.unwrap();
804 });
805
806 let mut received = Vec::new();
807 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
808 while let Some(envelope) = rx.recv().await {
809 received.push(envelope.exchange);
810 if received.len() == 2 {
811 break;
812 }
813 }
814 })
815 .await;
816 token.cancel();
817
818 assert!(timeout.is_ok(), "Should have received 2 exchanges");
819 assert_eq!(received.len(), 2);
820
821 for ex in &received {
822 assert!(ex.input.header("CamelFileName").is_some());
823 assert!(ex.input.header("CamelFileNameOnly").is_some());
824 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
825 assert!(ex.input.header("CamelFileLength").is_some());
826 assert!(ex.input.header("CamelFileLastModified").is_some());
827 }
828 }
829
830 #[tokio::test]
831 async fn test_file_consumer_include_filter() {
832 let dir = tempfile::tempdir().unwrap();
833 let dir_path = dir.path().to_str().unwrap();
834
835 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
836 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
837
838 let component = FileComponent::new();
839 let endpoint = component
840 .create_endpoint(&format!(
841 "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
842 ))
843 .unwrap();
844 let mut consumer = endpoint.create_consumer().unwrap();
845
846 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
847 let token = CancellationToken::new();
848 let ctx = ConsumerContext::new(tx, token.clone());
849
850 tokio::spawn(async move {
851 consumer.start(ctx).await.unwrap();
852 });
853
854 let mut received = Vec::new();
855 let _ = tokio::time::timeout(Duration::from_millis(500), async {
856 while let Some(envelope) = rx.recv().await {
857 received.push(envelope.exchange);
858 if received.len() == 1 {
859 break;
860 }
861 }
862 })
863 .await;
864 token.cancel();
865
866 assert_eq!(received.len(), 1);
867 let name = received[0]
868 .input
869 .header("CamelFileNameOnly")
870 .and_then(|v| v.as_str())
871 .unwrap();
872 assert_eq!(name, "data.csv");
873 }
874
875 #[tokio::test]
876 async fn test_file_consumer_delete_mode() {
877 let dir = tempfile::tempdir().unwrap();
878 let dir_path = dir.path().to_str().unwrap();
879
880 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
881
882 let component = FileComponent::new();
883 let endpoint = component
884 .create_endpoint(&format!(
885 "file:{dir_path}?delete=true&initialDelay=0&delay=100"
886 ))
887 .unwrap();
888 let mut consumer = endpoint.create_consumer().unwrap();
889
890 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
891 let token = CancellationToken::new();
892 let ctx = ConsumerContext::new(tx, token.clone());
893
894 tokio::spawn(async move {
895 consumer.start(ctx).await.unwrap();
896 });
897
898 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
899 token.cancel();
900
901 tokio::time::sleep(Duration::from_millis(100)).await;
902
903 assert!(
904 !dir.path().join("deleteme.txt").exists(),
905 "File should be deleted"
906 );
907 }
908
909 #[tokio::test]
910 async fn test_file_consumer_move_mode() {
911 let dir = tempfile::tempdir().unwrap();
912 let dir_path = dir.path().to_str().unwrap();
913
914 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
915
916 let component = FileComponent::new();
917 let endpoint = component
918 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
919 .unwrap();
920 let mut consumer = endpoint.create_consumer().unwrap();
921
922 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
923 let token = CancellationToken::new();
924 let ctx = ConsumerContext::new(tx, token.clone());
925
926 tokio::spawn(async move {
927 consumer.start(ctx).await.unwrap();
928 });
929
930 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
931 token.cancel();
932
933 tokio::time::sleep(Duration::from_millis(100)).await;
934
935 assert!(
936 !dir.path().join("moveme.txt").exists(),
937 "Original file should be gone"
938 );
939 assert!(
940 dir.path().join(".camel").join("moveme.txt").exists(),
941 "File should be in .camel/"
942 );
943 }
944
945 #[tokio::test]
946 async fn test_file_consumer_respects_cancellation() {
947 let dir = tempfile::tempdir().unwrap();
948 let dir_path = dir.path().to_str().unwrap();
949
950 let component = FileComponent::new();
951 let endpoint = component
952 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
953 .unwrap();
954 let mut consumer = endpoint.create_consumer().unwrap();
955
956 let (tx, _rx) = tokio::sync::mpsc::channel(16);
957 let token = CancellationToken::new();
958 let ctx = ConsumerContext::new(tx, token.clone());
959
960 let handle = tokio::spawn(async move {
961 consumer.start(ctx).await.unwrap();
962 });
963
964 tokio::time::sleep(Duration::from_millis(150)).await;
965 token.cancel();
966
967 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
968 assert!(
969 result.is_ok(),
970 "Consumer should have stopped after cancellation"
971 );
972 }
973
974 #[tokio::test]
979 async fn test_file_producer_writes_file() {
980 use tower::ServiceExt;
981
982 let dir = tempfile::tempdir().unwrap();
983 let dir_path = dir.path().to_str().unwrap();
984
985 let component = FileComponent::new();
986 let endpoint = component
987 .create_endpoint(&format!("file:{dir_path}"))
988 .unwrap();
989 let ctx = test_producer_ctx();
990 let producer = endpoint.create_producer(&ctx).unwrap();
991
992 let mut exchange = Exchange::new(Message::new("file content"));
993 exchange.input.set_header(
994 "CamelFileName",
995 serde_json::Value::String("output.txt".to_string()),
996 );
997
998 let result = producer.oneshot(exchange).await.unwrap();
999
1000 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1001 assert_eq!(content, "file content");
1002
1003 assert!(result.input.header("CamelFileNameProduced").is_some());
1004 }
1005
1006 #[tokio::test]
1007 async fn test_file_producer_auto_create_dirs() {
1008 use tower::ServiceExt;
1009
1010 let dir = tempfile::tempdir().unwrap();
1011 let dir_path = dir.path().to_str().unwrap();
1012
1013 let component = FileComponent::new();
1014 let endpoint = component
1015 .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1016 .unwrap();
1017 let ctx = test_producer_ctx();
1018 let producer = endpoint.create_producer(&ctx).unwrap();
1019
1020 let mut exchange = Exchange::new(Message::new("nested"));
1021 exchange.input.set_header(
1022 "CamelFileName",
1023 serde_json::Value::String("file.txt".to_string()),
1024 );
1025
1026 producer.oneshot(exchange).await.unwrap();
1027
1028 assert!(dir.path().join("sub/dir/file.txt").exists());
1029 }
1030
1031 #[tokio::test]
1032 async fn test_file_producer_file_exist_fail() {
1033 use tower::ServiceExt;
1034
1035 let dir = tempfile::tempdir().unwrap();
1036 let dir_path = dir.path().to_str().unwrap();
1037
1038 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1039
1040 let component = FileComponent::new();
1041 let endpoint = component
1042 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1043 .unwrap();
1044 let ctx = test_producer_ctx();
1045 let producer = endpoint.create_producer(&ctx).unwrap();
1046
1047 let mut exchange = Exchange::new(Message::new("new"));
1048 exchange.input.set_header(
1049 "CamelFileName",
1050 serde_json::Value::String("existing.txt".to_string()),
1051 );
1052
1053 let result = producer.oneshot(exchange).await;
1054 assert!(
1055 result.is_err(),
1056 "Should fail when file exists with Fail strategy"
1057 );
1058 }
1059
1060 #[tokio::test]
1061 async fn test_file_producer_file_exist_append() {
1062 use tower::ServiceExt;
1063
1064 let dir = tempfile::tempdir().unwrap();
1065 let dir_path = dir.path().to_str().unwrap();
1066
1067 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1068
1069 let component = FileComponent::new();
1070 let endpoint = component
1071 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1072 .unwrap();
1073 let ctx = test_producer_ctx();
1074 let producer = endpoint.create_producer(&ctx).unwrap();
1075
1076 let mut exchange = Exchange::new(Message::new("new"));
1077 exchange.input.set_header(
1078 "CamelFileName",
1079 serde_json::Value::String("append.txt".to_string()),
1080 );
1081
1082 producer.oneshot(exchange).await.unwrap();
1083
1084 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1085 assert_eq!(content, "oldnew");
1086 }
1087
1088 #[tokio::test]
1089 async fn test_file_producer_temp_prefix() {
1090 use tower::ServiceExt;
1091
1092 let dir = tempfile::tempdir().unwrap();
1093 let dir_path = dir.path().to_str().unwrap();
1094
1095 let component = FileComponent::new();
1096 let endpoint = component
1097 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1098 .unwrap();
1099 let ctx = test_producer_ctx();
1100 let producer = endpoint.create_producer(&ctx).unwrap();
1101
1102 let mut exchange = Exchange::new(Message::new("atomic write"));
1103 exchange.input.set_header(
1104 "CamelFileName",
1105 serde_json::Value::String("final.txt".to_string()),
1106 );
1107
1108 producer.oneshot(exchange).await.unwrap();
1109
1110 assert!(dir.path().join("final.txt").exists());
1111 assert!(!dir.path().join(".tmpfinal.txt").exists());
1112 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1113 assert_eq!(content, "atomic write");
1114 }
1115
1116 #[tokio::test]
1117 async fn test_file_producer_uses_filename_option() {
1118 use tower::ServiceExt;
1119
1120 let dir = tempfile::tempdir().unwrap();
1121 let dir_path = dir.path().to_str().unwrap();
1122
1123 let component = FileComponent::new();
1124 let endpoint = component
1125 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1126 .unwrap();
1127 let ctx = test_producer_ctx();
1128 let producer = endpoint.create_producer(&ctx).unwrap();
1129
1130 let exchange = Exchange::new(Message::new("content"));
1131
1132 producer.oneshot(exchange).await.unwrap();
1133 assert!(dir.path().join("fixed.txt").exists());
1134 }
1135
1136 #[tokio::test]
1137 async fn test_file_producer_no_filename_errors() {
1138 use tower::ServiceExt;
1139
1140 let dir = tempfile::tempdir().unwrap();
1141 let dir_path = dir.path().to_str().unwrap();
1142
1143 let component = FileComponent::new();
1144 let endpoint = component
1145 .create_endpoint(&format!("file:{dir_path}"))
1146 .unwrap();
1147 let ctx = test_producer_ctx();
1148 let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150 let exchange = Exchange::new(Message::new("content"));
1151
1152 let result = producer.oneshot(exchange).await;
1153 assert!(result.is_err(), "Should error when no filename is provided");
1154 }
1155
1156 #[tokio::test]
1161 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1162 use tower::ServiceExt;
1163
1164 let dir = tempfile::tempdir().unwrap();
1165 let dir_path = dir.path().to_str().unwrap();
1166
1167 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1169 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1170
1171 let component = FileComponent::new();
1172 let endpoint = component
1173 .create_endpoint(&format!("file:{dir_path}/subdir"))
1174 .unwrap();
1175 let ctx = test_producer_ctx();
1176 let producer = endpoint.create_producer(&ctx).unwrap();
1177
1178 let mut exchange = Exchange::new(Message::new("malicious"));
1179 exchange.input.set_header(
1180 "CamelFileName",
1181 serde_json::Value::String("../secret.txt".to_string()),
1182 );
1183
1184 let result = producer.oneshot(exchange).await;
1185 assert!(result.is_err(), "Should reject path traversal attempt");
1186
1187 let err = result.unwrap_err();
1188 assert!(
1189 err.to_string().contains("outside"),
1190 "Error should mention path is outside base directory"
1191 );
1192 }
1193
1194 #[tokio::test]
1195 async fn test_file_producer_rejects_absolute_path_outside_base() {
1196 use tower::ServiceExt;
1197
1198 let dir = tempfile::tempdir().unwrap();
1199 let dir_path = dir.path().to_str().unwrap();
1200
1201 let component = FileComponent::new();
1202 let endpoint = component
1203 .create_endpoint(&format!("file:{dir_path}"))
1204 .unwrap();
1205 let ctx = test_producer_ctx();
1206 let producer = endpoint.create_producer(&ctx).unwrap();
1207
1208 let mut exchange = Exchange::new(Message::new("malicious"));
1209 exchange.input.set_header(
1210 "CamelFileName",
1211 serde_json::Value::String("/etc/passwd".to_string()),
1212 );
1213
1214 let result = producer.oneshot(exchange).await;
1215 assert!(result.is_err(), "Should reject absolute path outside base");
1216 }
1217}