1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Duration;
5
6use async_trait::async_trait;
7use futures::StreamExt;
8use regex::Regex;
9use tokio::fs;
10use tokio::time;
11use tokio_util::io::ReaderStream;
12use tower::Service;
13use tracing::{debug, warn};
14
15use camel_api::{
16 BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
17};
18use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
19use camel_endpoint::parse_uri;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum FileExistStrategy {
27 Override,
28 Append,
29 Fail,
30}
31
32impl FileExistStrategy {
33 fn from_str(s: &str) -> Self {
34 match s {
35 "Append" | "append" => FileExistStrategy::Append,
36 "Fail" | "fail" => FileExistStrategy::Fail,
37 _ => FileExistStrategy::Override,
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
96pub struct FileConfig {
97 pub directory: String,
98 pub delay: Duration,
99 pub initial_delay: Duration,
100 pub noop: bool,
101 pub delete: bool,
102 pub move_to: Option<String>,
103 pub file_name: Option<String>,
104 pub include: Option<String>,
105 pub exclude: Option<String>,
106 pub recursive: bool,
107 pub file_exist: FileExistStrategy,
108 pub temp_prefix: Option<String>,
109 pub auto_create: bool,
110 pub read_timeout: Duration,
112 pub write_timeout: Duration,
113 pub max_body_size: usize,
115}
116
117impl FileConfig {
118 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
119 let parts = parse_uri(uri)?;
120 if parts.scheme != "file" {
121 return Err(CamelError::InvalidUri(format!(
122 "expected scheme 'file', got '{}'",
123 parts.scheme
124 )));
125 }
126
127 let delay = parts
128 .params
129 .get("delay")
130 .and_then(|v| v.parse::<u64>().ok())
131 .unwrap_or(500);
132
133 let initial_delay = parts
134 .params
135 .get("initialDelay")
136 .and_then(|v| v.parse::<u64>().ok())
137 .unwrap_or(1000);
138
139 let noop = parts
140 .params
141 .get("noop")
142 .map(|v| v == "true")
143 .unwrap_or(false);
144
145 let delete = parts
146 .params
147 .get("delete")
148 .map(|v| v == "true")
149 .unwrap_or(false);
150
151 let move_to = if noop || delete {
152 None
153 } else {
154 Some(
155 parts
156 .params
157 .get("move")
158 .cloned()
159 .unwrap_or_else(|| ".camel".to_string()),
160 )
161 };
162
163 let file_name = parts.params.get("fileName").cloned();
164 let include = parts.params.get("include").cloned();
165 let exclude = parts.params.get("exclude").cloned();
166
167 let recursive = parts
168 .params
169 .get("recursive")
170 .map(|v| v == "true")
171 .unwrap_or(false);
172
173 let file_exist = parts
174 .params
175 .get("fileExist")
176 .map(|v| FileExistStrategy::from_str(v))
177 .unwrap_or(FileExistStrategy::Override);
178
179 let temp_prefix = parts.params.get("tempPrefix").cloned();
180
181 let auto_create = parts
182 .params
183 .get("autoCreate")
184 .map(|v| v != "false")
185 .unwrap_or(true);
186
187 let read_timeout = parts
188 .params
189 .get("readTimeout")
190 .and_then(|v| v.parse::<u64>().ok())
191 .map(Duration::from_millis)
192 .unwrap_or(Duration::from_secs(30));
193
194 let write_timeout = parts
195 .params
196 .get("writeTimeout")
197 .and_then(|v| v.parse::<u64>().ok())
198 .map(Duration::from_millis)
199 .unwrap_or(Duration::from_secs(30));
200
201 let max_body_size = parts
202 .params
203 .get("maxBodySize")
204 .and_then(|v| v.parse::<usize>().ok())
205 .unwrap_or(100 * 1024 * 1024); Ok(Self {
208 directory: parts.path,
209 delay: Duration::from_millis(delay),
210 initial_delay: Duration::from_millis(initial_delay),
211 noop,
212 delete,
213 move_to,
214 file_name,
215 include,
216 exclude,
217 recursive,
218 file_exist,
219 temp_prefix,
220 auto_create,
221 read_timeout,
222 write_timeout,
223 max_body_size,
224 })
225 }
226}
227
228pub struct FileComponent;
233
234impl FileComponent {
235 pub fn new() -> Self {
236 Self
237 }
238}
239
240impl Default for FileComponent {
241 fn default() -> Self {
242 Self::new()
243 }
244}
245
246impl Component for FileComponent {
247 fn scheme(&self) -> &str {
248 "file"
249 }
250
251 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
252 let config = FileConfig::from_uri(uri)?;
253 Ok(Box::new(FileEndpoint {
254 uri: uri.to_string(),
255 config,
256 }))
257 }
258}
259
260struct FileEndpoint {
265 uri: String,
266 config: FileConfig,
267}
268
269impl Endpoint for FileEndpoint {
270 fn uri(&self) -> &str {
271 &self.uri
272 }
273
274 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
275 Ok(Box::new(FileConsumer {
276 config: self.config.clone(),
277 }))
278 }
279
280 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
281 Ok(BoxProcessor::new(FileProducer {
282 config: self.config.clone(),
283 }))
284 }
285}
286
287struct FileConsumer {
292 config: FileConfig,
293}
294
295#[async_trait]
296impl Consumer for FileConsumer {
297 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
298 let config = self.config.clone();
299
300 let include_re = config
301 .include
302 .as_ref()
303 .map(|p| Regex::new(p))
304 .transpose()
305 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
306 let exclude_re = config
307 .exclude
308 .as_ref()
309 .map(|p| Regex::new(p))
310 .transpose()
311 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
312
313 if !config.initial_delay.is_zero() {
314 tokio::select! {
315 _ = time::sleep(config.initial_delay) => {}
316 _ = context.cancelled() => {
317 debug!(directory = config.directory, "File consumer cancelled during initial delay");
318 return Ok(());
319 }
320 }
321 }
322
323 let mut interval = time::interval(config.delay);
324
325 loop {
326 tokio::select! {
327 _ = context.cancelled() => {
328 debug!(directory = config.directory, "File consumer received cancellation, stopping");
329 break;
330 }
331 _ = interval.tick() => {
332 if let Err(e) = poll_directory(
333 &config,
334 &context,
335 &include_re,
336 &exclude_re,
337 ).await {
338 warn!(directory = config.directory, error = %e, "Error polling directory");
339 }
340 }
341 }
342 }
343
344 Ok(())
345 }
346
347 async fn stop(&mut self) -> Result<(), CamelError> {
348 Ok(())
349 }
350}
351
352async fn poll_directory(
353 config: &FileConfig,
354 context: &ConsumerContext,
355 include_re: &Option<Regex>,
356 exclude_re: &Option<Regex>,
357) -> Result<(), CamelError> {
358 let base_path = std::path::Path::new(&config.directory);
359
360 let files = list_files(base_path, config.recursive).await?;
361
362 for file_path in files {
363 let file_name = file_path
364 .file_name()
365 .and_then(|n| n.to_str())
366 .unwrap_or_default()
367 .to_string();
368
369 if let Some(ref target_name) = config.file_name
370 && file_name != *target_name
371 {
372 continue;
373 }
374
375 if let Some(re) = include_re
376 && !re.is_match(&file_name)
377 {
378 continue;
379 }
380
381 if let Some(re) = exclude_re
382 && re.is_match(&file_name)
383 {
384 continue;
385 }
386
387 if let Some(ref move_dir) = config.move_to
388 && file_path.starts_with(base_path.join(move_dir))
389 {
390 continue;
391 }
392
393 let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
394 let f = fs::File::open(&file_path).await?;
395 let m = f.metadata().await?;
396 Ok::<_, std::io::Error>((f, m))
397 })
398 .await
399 {
400 Ok(Ok((f, m))) => (f, Some(m)),
401 Ok(Err(e)) => {
402 warn!(
403 file = %file_path.display(),
404 error = %e,
405 "Failed to open file"
406 );
407 continue;
408 }
409 Err(_) => {
410 warn!(
411 file = %file_path.display(),
412 timeout_ms = config.read_timeout.as_millis(),
413 "Timeout opening file"
414 );
415 continue;
416 }
417 };
418
419 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
420 let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
421
422 let last_modified = metadata
423 .as_ref()
424 .and_then(|m| m.modified().ok())
425 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
426 .map(|d| d.as_millis() as u64)
427 .unwrap_or(0);
428
429 let relative_path = file_path
430 .strip_prefix(base_path)
431 .unwrap_or(&file_path)
432 .to_string_lossy()
433 .to_string();
434
435 let absolute_path = file_path
436 .canonicalize()
437 .unwrap_or_else(|_| file_path.clone())
438 .to_string_lossy()
439 .to_string();
440
441 let body = Body::Stream(StreamBody {
442 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
443 metadata: StreamMetadata {
444 size_hint: Some(file_len),
445 content_type: None,
446 origin: Some(absolute_path.clone()),
447 },
448 });
449
450 let mut exchange = Exchange::new(Message::new(body));
451 exchange
452 .input
453 .set_header("CamelFileName", serde_json::Value::String(relative_path));
454 exchange.input.set_header(
455 "CamelFileNameOnly",
456 serde_json::Value::String(file_name.clone()),
457 );
458 exchange.input.set_header(
459 "CamelFileAbsolutePath",
460 serde_json::Value::String(absolute_path),
461 );
462 exchange.input.set_header(
463 "CamelFileLength",
464 serde_json::Value::Number(file_len.into()),
465 );
466 exchange.input.set_header(
467 "CamelFileLastModified",
468 serde_json::Value::Number(last_modified.into()),
469 );
470
471 debug!(
472 file = %file_path.display(),
473 correlation_id = %exchange.correlation_id(),
474 "Processing file"
475 );
476
477 if context.send(exchange).await.is_err() {
478 break;
479 }
480
481 if config.noop {
482 } else if config.delete {
484 if let Err(e) = fs::remove_file(&file_path).await {
485 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
486 }
487 } else if let Some(ref move_dir) = config.move_to {
488 let target_dir = base_path.join(move_dir);
489 if let Err(e) = fs::create_dir_all(&target_dir).await {
490 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
491 continue;
492 }
493 let target_path = target_dir.join(&file_name);
494 if let Err(e) = fs::rename(&file_path, &target_path).await {
495 warn!(
496 from = %file_path.display(),
497 to = %target_path.display(),
498 error = %e,
499 "Failed to move file"
500 );
501 }
502 }
503 }
504
505 Ok(())
506}
507
508async fn list_files(
509 dir: &std::path::Path,
510 recursive: bool,
511) -> Result<Vec<std::path::PathBuf>, CamelError> {
512 let mut files = Vec::new();
513 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
514
515 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
516 let path = entry.path();
517 if path.is_file() {
518 files.push(path);
519 } else if path.is_dir() && recursive {
520 let mut sub_files = Box::pin(list_files(&path, true)).await?;
521 files.append(&mut sub_files);
522 }
523 }
524
525 files.sort();
526 Ok(files)
527}
528
529fn validate_path_is_within_base(
534 base_dir: &std::path::Path,
535 target_path: &std::path::Path,
536) -> Result<(), CamelError> {
537 let canonical_base = base_dir.canonicalize().map_err(|e| {
538 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
539 })?;
540
541 let canonical_target = if target_path.exists() {
543 target_path.canonicalize().map_err(|e| {
544 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
545 })?
546 } else if let Some(parent) = target_path.parent() {
547 if !parent.exists() {
549 return Err(CamelError::ProcessorError(format!(
550 "Parent directory '{}' does not exist",
551 parent.display()
552 )));
553 }
554 let canonical_parent = parent.canonicalize().map_err(|e| {
555 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
556 })?;
557 if let Some(filename) = target_path.file_name() {
559 canonical_parent.join(filename)
560 } else {
561 return Err(CamelError::ProcessorError(
562 "Invalid target path: no filename".to_string(),
563 ));
564 }
565 } else {
566 return Err(CamelError::ProcessorError(
567 "Invalid target path: no parent directory".to_string(),
568 ));
569 };
570
571 if !canonical_target.starts_with(&canonical_base) {
572 return Err(CamelError::ProcessorError(format!(
573 "Path '{}' is outside base directory '{}'",
574 canonical_target.display(),
575 canonical_base.display()
576 )));
577 }
578
579 Ok(())
580}
581
582#[derive(Clone)]
587struct FileProducer {
588 config: FileConfig,
589}
590
591impl FileProducer {
592 async fn body_to_bytes(body: Body, max_size: usize) -> Result<Vec<u8>, CamelError> {
593 let bytes = body.into_bytes(max_size).await?;
594 Ok(bytes.to_vec())
595 }
596
597 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
598 if let Some(name) = exchange
599 .input
600 .header("CamelFileName")
601 .and_then(|v| v.as_str())
602 {
603 return Ok(name.to_string());
604 }
605 if let Some(ref name) = config.file_name {
606 return Ok(name.clone());
607 }
608 Err(CamelError::ProcessorError(
609 "No filename specified: set CamelFileName header or fileName option".to_string(),
610 ))
611 }
612}
613
614impl Service<Exchange> for FileProducer {
615 type Response = Exchange;
616 type Error = CamelError;
617 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
618
619 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
620 Poll::Ready(Ok(()))
621 }
622
623 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
624 let config = self.config.clone();
625
626 Box::pin(async move {
627 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
628 let data =
629 FileProducer::body_to_bytes(exchange.input.body.clone(), config.max_body_size)
630 .await?;
631
632 let dir_path = std::path::Path::new(&config.directory);
633 let target_path = dir_path.join(&file_name);
634
635 if config.auto_create
636 && let Some(parent) = target_path.parent()
637 {
638 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
639 .await
640 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
641 .map_err(CamelError::from)?;
642 }
643
644 validate_path_is_within_base(dir_path, &target_path)?;
646
647 if target_path.exists() {
648 match config.file_exist {
649 FileExistStrategy::Fail => {
650 return Err(CamelError::ProcessorError(format!(
651 "File already exists: {}",
652 target_path.display()
653 )));
654 }
655 FileExistStrategy::Append => {
656 use tokio::io::AsyncWriteExt;
657 let mut file = tokio::time::timeout(
658 config.write_timeout,
659 fs::OpenOptions::new().append(true).open(&target_path),
660 )
661 .await
662 .map_err(|_| {
663 CamelError::ProcessorError("Timeout opening file for append".into())
664 })?
665 .map_err(CamelError::from)?;
666
667 tokio::time::timeout(config.write_timeout, async {
668 file.write_all(&data).await?;
669 file.flush().await?;
670 Ok::<_, std::io::Error>(())
671 })
672 .await
673 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
674 .map_err(CamelError::from)?;
675
676 let abs_path = target_path
677 .canonicalize()
678 .unwrap_or_else(|_| target_path.clone())
679 .to_string_lossy()
680 .to_string();
681 exchange.input.set_header(
682 "CamelFileNameProduced",
683 serde_json::Value::String(abs_path),
684 );
685 return Ok(exchange);
686 }
687 FileExistStrategy::Override => {}
688 }
689 }
690
691 if let Some(ref prefix) = config.temp_prefix {
692 let temp_name = format!("{prefix}{file_name}");
693 let temp_path = dir_path.join(&temp_name);
694
695 tokio::time::timeout(config.write_timeout, fs::write(&temp_path, &data))
696 .await
697 .map_err(|_| CamelError::ProcessorError("Timeout writing temp file".into()))?
698 .map_err(CamelError::from)?;
699
700 tokio::time::timeout(config.write_timeout, fs::rename(&temp_path, &target_path))
701 .await
702 .map_err(|_| CamelError::ProcessorError("Timeout renaming file".into()))?
703 .map_err(CamelError::from)?;
704 } else {
705 tokio::time::timeout(config.write_timeout, fs::write(&target_path, &data))
706 .await
707 .map_err(|_| CamelError::ProcessorError("Timeout writing file".into()))?
708 .map_err(CamelError::from)?;
709 }
710
711 let abs_path = target_path
712 .canonicalize()
713 .unwrap_or_else(|_| target_path.clone())
714 .to_string_lossy()
715 .to_string();
716 exchange
717 .input
718 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
719
720 debug!(
721 file = %target_path.display(),
722 correlation_id = %exchange.correlation_id(),
723 "File written"
724 );
725 Ok(exchange)
726 })
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733 use std::sync::Arc;
734 use std::time::Duration;
735 use tokio::sync::Mutex;
736 use tokio_util::sync::CancellationToken;
737
738 struct NullRouteController;
740
741 #[async_trait::async_trait]
742 impl camel_api::RouteController for NullRouteController {
743 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
744 Ok(())
745 }
746 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
747 Ok(())
748 }
749 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
750 Ok(())
751 }
752 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
753 Ok(())
754 }
755 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
756 Ok(())
757 }
758 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
759 None
760 }
761 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
762 Ok(())
763 }
764 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
765 Ok(())
766 }
767 }
768
769 fn test_producer_ctx() -> ProducerContext {
770 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
771 }
772
773 #[test]
774 fn test_file_config_defaults() {
775 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
776 assert_eq!(config.directory, "/tmp/inbox");
777 assert_eq!(config.delay, Duration::from_millis(500));
778 assert_eq!(config.initial_delay, Duration::from_millis(1000));
779 assert!(!config.noop);
780 assert!(!config.delete);
781 assert_eq!(config.move_to, Some(".camel".to_string()));
782 assert!(config.file_name.is_none());
783 assert!(config.include.is_none());
784 assert!(config.exclude.is_none());
785 assert!(!config.recursive);
786 assert_eq!(config.file_exist, FileExistStrategy::Override);
787 assert!(config.temp_prefix.is_none());
788 assert!(config.auto_create);
789 assert_eq!(config.read_timeout, Duration::from_secs(30));
791 assert_eq!(config.write_timeout, Duration::from_secs(30));
792 }
793
794 #[test]
795 fn test_file_config_consumer_options() {
796 let config = FileConfig::from_uri(
797 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
798 ).unwrap();
799 assert_eq!(config.directory, "/data/input");
800 assert_eq!(config.delay, Duration::from_millis(1000));
801 assert_eq!(config.initial_delay, Duration::from_millis(2000));
802 assert!(config.noop);
803 assert!(config.recursive);
804 assert_eq!(config.include, Some(".*\\.csv".to_string()));
805 }
806
807 #[test]
808 fn test_file_config_producer_options() {
809 let config = FileConfig::from_uri(
810 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
811 )
812 .unwrap();
813 assert_eq!(config.file_exist, FileExistStrategy::Append);
814 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
815 assert!(!config.auto_create);
816 assert_eq!(config.file_name, Some("out.txt".to_string()));
817 }
818
819 #[test]
820 fn test_file_config_delete_mode() {
821 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
822 assert!(config.delete);
823 assert!(config.move_to.is_none());
824 }
825
826 #[test]
827 fn test_file_config_noop_mode() {
828 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
829 assert!(config.noop);
830 assert!(config.move_to.is_none());
831 }
832
833 #[test]
834 fn test_file_config_wrong_scheme() {
835 let result = FileConfig::from_uri("timer:tick");
836 assert!(result.is_err());
837 }
838
839 #[test]
840 fn test_file_component_scheme() {
841 let component = FileComponent::new();
842 assert_eq!(component.scheme(), "file");
843 }
844
845 #[test]
846 fn test_file_component_creates_endpoint() {
847 let component = FileComponent::new();
848 let endpoint = component.create_endpoint("file:/tmp/test");
849 assert!(endpoint.is_ok());
850 }
851
852 #[tokio::test]
857 async fn test_file_consumer_reads_files() {
858 let dir = tempfile::tempdir().unwrap();
859 let dir_path = dir.path().to_str().unwrap();
860
861 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
862 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
863
864 let component = FileComponent::new();
865 let endpoint = component
866 .create_endpoint(&format!(
867 "file:{dir_path}?noop=true&initialDelay=0&delay=100"
868 ))
869 .unwrap();
870 let mut consumer = endpoint.create_consumer().unwrap();
871
872 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
873 let token = CancellationToken::new();
874 let ctx = ConsumerContext::new(tx, token.clone());
875
876 tokio::spawn(async move {
877 consumer.start(ctx).await.unwrap();
878 });
879
880 let mut received = Vec::new();
881 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
882 while let Some(envelope) = rx.recv().await {
883 received.push(envelope.exchange);
884 if received.len() == 2 {
885 break;
886 }
887 }
888 })
889 .await;
890 token.cancel();
891
892 assert!(timeout.is_ok(), "Should have received 2 exchanges");
893 assert_eq!(received.len(), 2);
894
895 for ex in &received {
896 assert!(ex.input.header("CamelFileName").is_some());
897 assert!(ex.input.header("CamelFileNameOnly").is_some());
898 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
899 assert!(ex.input.header("CamelFileLength").is_some());
900 assert!(ex.input.header("CamelFileLastModified").is_some());
901 }
902 }
903
904 #[tokio::test]
905 async fn test_file_consumer_include_filter() {
906 let dir = tempfile::tempdir().unwrap();
907 let dir_path = dir.path().to_str().unwrap();
908
909 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
910 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
911
912 let component = FileComponent::new();
913 let endpoint = component
914 .create_endpoint(&format!(
915 "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
916 ))
917 .unwrap();
918 let mut consumer = endpoint.create_consumer().unwrap();
919
920 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
921 let token = CancellationToken::new();
922 let ctx = ConsumerContext::new(tx, token.clone());
923
924 tokio::spawn(async move {
925 consumer.start(ctx).await.unwrap();
926 });
927
928 let mut received = Vec::new();
929 let _ = tokio::time::timeout(Duration::from_millis(500), async {
930 while let Some(envelope) = rx.recv().await {
931 received.push(envelope.exchange);
932 if received.len() == 1 {
933 break;
934 }
935 }
936 })
937 .await;
938 token.cancel();
939
940 assert_eq!(received.len(), 1);
941 let name = received[0]
942 .input
943 .header("CamelFileNameOnly")
944 .and_then(|v| v.as_str())
945 .unwrap();
946 assert_eq!(name, "data.csv");
947 }
948
949 #[tokio::test]
950 async fn test_file_consumer_delete_mode() {
951 let dir = tempfile::tempdir().unwrap();
952 let dir_path = dir.path().to_str().unwrap();
953
954 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
955
956 let component = FileComponent::new();
957 let endpoint = component
958 .create_endpoint(&format!(
959 "file:{dir_path}?delete=true&initialDelay=0&delay=100"
960 ))
961 .unwrap();
962 let mut consumer = endpoint.create_consumer().unwrap();
963
964 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
965 let token = CancellationToken::new();
966 let ctx = ConsumerContext::new(tx, token.clone());
967
968 tokio::spawn(async move {
969 consumer.start(ctx).await.unwrap();
970 });
971
972 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
973 token.cancel();
974
975 tokio::time::sleep(Duration::from_millis(100)).await;
976
977 assert!(
978 !dir.path().join("deleteme.txt").exists(),
979 "File should be deleted"
980 );
981 }
982
983 #[tokio::test]
984 async fn test_file_consumer_move_mode() {
985 let dir = tempfile::tempdir().unwrap();
986 let dir_path = dir.path().to_str().unwrap();
987
988 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
989
990 let component = FileComponent::new();
991 let endpoint = component
992 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
993 .unwrap();
994 let mut consumer = endpoint.create_consumer().unwrap();
995
996 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
997 let token = CancellationToken::new();
998 let ctx = ConsumerContext::new(tx, token.clone());
999
1000 tokio::spawn(async move {
1001 consumer.start(ctx).await.unwrap();
1002 });
1003
1004 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1005 token.cancel();
1006
1007 tokio::time::sleep(Duration::from_millis(100)).await;
1008
1009 assert!(
1010 !dir.path().join("moveme.txt").exists(),
1011 "Original file should be gone"
1012 );
1013 assert!(
1014 dir.path().join(".camel").join("moveme.txt").exists(),
1015 "File should be in .camel/"
1016 );
1017 }
1018
1019 #[tokio::test]
1020 async fn test_file_consumer_respects_cancellation() {
1021 let dir = tempfile::tempdir().unwrap();
1022 let dir_path = dir.path().to_str().unwrap();
1023
1024 let component = FileComponent::new();
1025 let endpoint = component
1026 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1027 .unwrap();
1028 let mut consumer = endpoint.create_consumer().unwrap();
1029
1030 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1031 let token = CancellationToken::new();
1032 let ctx = ConsumerContext::new(tx, token.clone());
1033
1034 let handle = tokio::spawn(async move {
1035 consumer.start(ctx).await.unwrap();
1036 });
1037
1038 tokio::time::sleep(Duration::from_millis(150)).await;
1039 token.cancel();
1040
1041 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1042 assert!(
1043 result.is_ok(),
1044 "Consumer should have stopped after cancellation"
1045 );
1046 }
1047
1048 #[tokio::test]
1053 async fn test_file_producer_writes_file() {
1054 use tower::ServiceExt;
1055
1056 let dir = tempfile::tempdir().unwrap();
1057 let dir_path = dir.path().to_str().unwrap();
1058
1059 let component = FileComponent::new();
1060 let endpoint = component
1061 .create_endpoint(&format!("file:{dir_path}"))
1062 .unwrap();
1063 let ctx = test_producer_ctx();
1064 let producer = endpoint.create_producer(&ctx).unwrap();
1065
1066 let mut exchange = Exchange::new(Message::new("file content"));
1067 exchange.input.set_header(
1068 "CamelFileName",
1069 serde_json::Value::String("output.txt".to_string()),
1070 );
1071
1072 let result = producer.oneshot(exchange).await.unwrap();
1073
1074 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1075 assert_eq!(content, "file content");
1076
1077 assert!(result.input.header("CamelFileNameProduced").is_some());
1078 }
1079
1080 #[tokio::test]
1081 async fn test_file_producer_auto_create_dirs() {
1082 use tower::ServiceExt;
1083
1084 let dir = tempfile::tempdir().unwrap();
1085 let dir_path = dir.path().to_str().unwrap();
1086
1087 let component = FileComponent::new();
1088 let endpoint = component
1089 .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1090 .unwrap();
1091 let ctx = test_producer_ctx();
1092 let producer = endpoint.create_producer(&ctx).unwrap();
1093
1094 let mut exchange = Exchange::new(Message::new("nested"));
1095 exchange.input.set_header(
1096 "CamelFileName",
1097 serde_json::Value::String("file.txt".to_string()),
1098 );
1099
1100 producer.oneshot(exchange).await.unwrap();
1101
1102 assert!(dir.path().join("sub/dir/file.txt").exists());
1103 }
1104
1105 #[tokio::test]
1106 async fn test_file_producer_file_exist_fail() {
1107 use tower::ServiceExt;
1108
1109 let dir = tempfile::tempdir().unwrap();
1110 let dir_path = dir.path().to_str().unwrap();
1111
1112 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1113
1114 let component = FileComponent::new();
1115 let endpoint = component
1116 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1117 .unwrap();
1118 let ctx = test_producer_ctx();
1119 let producer = endpoint.create_producer(&ctx).unwrap();
1120
1121 let mut exchange = Exchange::new(Message::new("new"));
1122 exchange.input.set_header(
1123 "CamelFileName",
1124 serde_json::Value::String("existing.txt".to_string()),
1125 );
1126
1127 let result = producer.oneshot(exchange).await;
1128 assert!(
1129 result.is_err(),
1130 "Should fail when file exists with Fail strategy"
1131 );
1132 }
1133
1134 #[tokio::test]
1135 async fn test_file_producer_file_exist_append() {
1136 use tower::ServiceExt;
1137
1138 let dir = tempfile::tempdir().unwrap();
1139 let dir_path = dir.path().to_str().unwrap();
1140
1141 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1142
1143 let component = FileComponent::new();
1144 let endpoint = component
1145 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1146 .unwrap();
1147 let ctx = test_producer_ctx();
1148 let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150 let mut exchange = Exchange::new(Message::new("new"));
1151 exchange.input.set_header(
1152 "CamelFileName",
1153 serde_json::Value::String("append.txt".to_string()),
1154 );
1155
1156 producer.oneshot(exchange).await.unwrap();
1157
1158 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1159 assert_eq!(content, "oldnew");
1160 }
1161
1162 #[tokio::test]
1163 async fn test_file_producer_temp_prefix() {
1164 use tower::ServiceExt;
1165
1166 let dir = tempfile::tempdir().unwrap();
1167 let dir_path = dir.path().to_str().unwrap();
1168
1169 let component = FileComponent::new();
1170 let endpoint = component
1171 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1172 .unwrap();
1173 let ctx = test_producer_ctx();
1174 let producer = endpoint.create_producer(&ctx).unwrap();
1175
1176 let mut exchange = Exchange::new(Message::new("atomic write"));
1177 exchange.input.set_header(
1178 "CamelFileName",
1179 serde_json::Value::String("final.txt".to_string()),
1180 );
1181
1182 producer.oneshot(exchange).await.unwrap();
1183
1184 assert!(dir.path().join("final.txt").exists());
1185 assert!(!dir.path().join(".tmpfinal.txt").exists());
1186 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1187 assert_eq!(content, "atomic write");
1188 }
1189
1190 #[tokio::test]
1191 async fn test_file_producer_uses_filename_option() {
1192 use tower::ServiceExt;
1193
1194 let dir = tempfile::tempdir().unwrap();
1195 let dir_path = dir.path().to_str().unwrap();
1196
1197 let component = FileComponent::new();
1198 let endpoint = component
1199 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1200 .unwrap();
1201 let ctx = test_producer_ctx();
1202 let producer = endpoint.create_producer(&ctx).unwrap();
1203
1204 let exchange = Exchange::new(Message::new("content"));
1205
1206 producer.oneshot(exchange).await.unwrap();
1207 assert!(dir.path().join("fixed.txt").exists());
1208 }
1209
1210 #[tokio::test]
1211 async fn test_file_producer_no_filename_errors() {
1212 use tower::ServiceExt;
1213
1214 let dir = tempfile::tempdir().unwrap();
1215 let dir_path = dir.path().to_str().unwrap();
1216
1217 let component = FileComponent::new();
1218 let endpoint = component
1219 .create_endpoint(&format!("file:{dir_path}"))
1220 .unwrap();
1221 let ctx = test_producer_ctx();
1222 let producer = endpoint.create_producer(&ctx).unwrap();
1223
1224 let exchange = Exchange::new(Message::new("content"));
1225
1226 let result = producer.oneshot(exchange).await;
1227 assert!(result.is_err(), "Should error when no filename is provided");
1228 }
1229
1230 #[tokio::test]
1235 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1236 use tower::ServiceExt;
1237
1238 let dir = tempfile::tempdir().unwrap();
1239 let dir_path = dir.path().to_str().unwrap();
1240
1241 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1243 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1244
1245 let component = FileComponent::new();
1246 let endpoint = component
1247 .create_endpoint(&format!("file:{dir_path}/subdir"))
1248 .unwrap();
1249 let ctx = test_producer_ctx();
1250 let producer = endpoint.create_producer(&ctx).unwrap();
1251
1252 let mut exchange = Exchange::new(Message::new("malicious"));
1253 exchange.input.set_header(
1254 "CamelFileName",
1255 serde_json::Value::String("../secret.txt".to_string()),
1256 );
1257
1258 let result = producer.oneshot(exchange).await;
1259 assert!(result.is_err(), "Should reject path traversal attempt");
1260
1261 let err = result.unwrap_err();
1262 assert!(
1263 err.to_string().contains("outside"),
1264 "Error should mention path is outside base directory"
1265 );
1266 }
1267
1268 #[tokio::test]
1269 async fn test_file_producer_rejects_absolute_path_outside_base() {
1270 use tower::ServiceExt;
1271
1272 let dir = tempfile::tempdir().unwrap();
1273 let dir_path = dir.path().to_str().unwrap();
1274
1275 let component = FileComponent::new();
1276 let endpoint = component
1277 .create_endpoint(&format!("file:{dir_path}"))
1278 .unwrap();
1279 let ctx = test_producer_ctx();
1280 let producer = endpoint.create_producer(&ctx).unwrap();
1281
1282 let mut exchange = Exchange::new(Message::new("malicious"));
1283 exchange.input.set_header(
1284 "CamelFileName",
1285 serde_json::Value::String("/etc/passwd".to_string()),
1286 );
1287
1288 let result = producer.oneshot(exchange).await;
1289 assert!(result.is_err(), "Should reject absolute path outside base");
1290 }
1291
1292 #[tokio::test]
1297 #[ignore] async fn test_large_file_streaming_constant_memory() {
1299 use std::io::Write;
1300 use tempfile::NamedTempFile;
1301
1302 let mut temp_file = NamedTempFile::new().unwrap();
1304 let file_size = 150 * 1024 * 1024; let chunk = vec![b'X'; 1024 * 1024]; for _ in 0..150 {
1308 temp_file.write_all(&chunk).unwrap();
1309 }
1310 temp_file.flush().unwrap();
1311
1312 let dir = temp_file.path().parent().unwrap();
1313 let dir_path = dir.to_str().unwrap();
1314 let file_name = temp_file
1315 .path()
1316 .file_name()
1317 .unwrap()
1318 .to_str()
1319 .unwrap()
1320 .to_string();
1321
1322 let component = FileComponent::new();
1324 let endpoint = component
1325 .create_endpoint(&format!(
1326 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1327 ))
1328 .unwrap();
1329 let mut consumer = endpoint.create_consumer().unwrap();
1330
1331 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1332 let token = CancellationToken::new();
1333 let ctx = ConsumerContext::new(tx, token.clone());
1334
1335 tokio::spawn(async move {
1336 let _ = consumer.start(ctx).await;
1337 });
1338
1339 let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1340 rx.recv().await.unwrap().exchange
1341 })
1342 .await
1343 .expect("Should receive exchange");
1344 token.cancel();
1345
1346 assert!(matches!(exchange.input.body, Body::Stream(_)));
1348
1349 if let Body::Stream(ref stream_body) = exchange.input.body {
1351 assert!(stream_body.metadata.size_hint.is_some());
1352 let size = stream_body.metadata.size_hint.unwrap();
1353 assert_eq!(size, file_size as u64);
1354 }
1355
1356 if let Body::Stream(stream_body) = exchange.input.body {
1358 let body = Body::Stream(stream_body);
1359 let result = body.into_bytes(100 * 1024 * 1024).await;
1360 assert!(result.is_err());
1361 }
1362
1363 let component2 = FileComponent::new();
1366 let endpoint2 = component2
1367 .create_endpoint(&format!(
1368 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1369 ))
1370 .unwrap();
1371 let mut consumer2 = endpoint2.create_consumer().unwrap();
1372
1373 let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1374 let token2 = CancellationToken::new();
1375 let ctx2 = ConsumerContext::new(tx2, token2.clone());
1376
1377 tokio::spawn(async move {
1378 let _ = consumer2.start(ctx2).await;
1379 });
1380
1381 let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1382 rx2.recv().await.unwrap().exchange
1383 })
1384 .await
1385 .expect("Should receive exchange");
1386 token2.cancel();
1387
1388 if let Body::Stream(stream_body) = exchange2.input.body {
1389 let mut stream_lock = stream_body.stream.lock().await;
1390 let mut stream = stream_lock.take().unwrap();
1391
1392 if let Some(chunk_result) = stream.next().await {
1394 let chunk = chunk_result.unwrap();
1395 assert!(!chunk.is_empty());
1396 assert!(chunk.len() < file_size);
1397 }
1399 }
1400 }
1401}