1pub mod bundle;
2
3pub use bundle::FileBundle;
4
5use std::collections::HashSet;
6use std::future::Future;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use futures::StreamExt;
15use regex::Regex;
16use tokio::fs;
17use tokio::fs::OpenOptions;
18use tokio::io;
19use tokio::io::AsyncWriteExt;
20use tokio::time;
21use tokio_util::io::ReaderStream;
22use tower::Service;
23use tracing::{debug, warn};
24
25use camel_component_api::{
26 Body, BoxProcessor, CamelError, Exchange, Message, StreamBody, StreamMetadata,
27};
28use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
29use camel_component_api::{UriConfig, parse_uri};
30use camel_language_api::Language;
31use camel_language_simple::SimpleLanguage;
32
33struct TempFileGuard {
42 path: PathBuf,
43 disarm: bool,
44}
45
46impl TempFileGuard {
47 fn new(path: PathBuf) -> Self {
48 Self {
49 path,
50 disarm: false,
51 }
52 }
53
54 fn disarm(&mut self) {
56 self.disarm = true;
57 }
58}
59
60impl Drop for TempFileGuard {
61 fn drop(&mut self) {
62 if !self.disarm {
63 let _ = std::fs::remove_file(&self.path);
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum FileExistStrategy {
76 #[default]
78 Override,
79 Append,
81 Fail,
83}
84
85impl FromStr for FileExistStrategy {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 match s {
90 "Override" | "override" => Ok(FileExistStrategy::Override),
91 "Append" | "append" => Ok(FileExistStrategy::Append),
92 "Fail" | "fail" => Ok(FileExistStrategy::Fail),
93 _ => Ok(FileExistStrategy::Override), }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
106#[serde(default)]
107pub struct FileGlobalConfig {
108 pub delay_ms: u64,
109 pub initial_delay_ms: u64,
110 pub read_timeout_ms: u64,
111 pub write_timeout_ms: u64,
112}
113
114impl Default for FileGlobalConfig {
115 fn default() -> Self {
116 Self {
117 delay_ms: 500,
118 initial_delay_ms: 1_000,
119 read_timeout_ms: 30_000,
120 write_timeout_ms: 30_000,
121 }
122 }
123}
124
125impl FileGlobalConfig {
126 pub fn new() -> Self {
127 Self::default()
128 }
129 pub fn with_delay_ms(mut self, v: u64) -> Self {
130 self.delay_ms = v;
131 self
132 }
133 pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
134 self.initial_delay_ms = v;
135 self
136 }
137 pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
138 self.read_timeout_ms = v;
139 self
140 }
141 pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
142 self.write_timeout_ms = v;
143 self
144 }
145}
146
147#[derive(Debug, Clone, UriConfig)]
174#[uri_scheme = "file"]
175#[uri_config(skip_impl, crate = "camel_component_api")]
176pub struct FileConfig {
177 pub directory: String,
179
180 #[allow(dead_code)]
182 #[uri_param(name = "delay", default = "500")]
183 delay_ms: u64,
184
185 pub delay: Duration,
187
188 #[allow(dead_code)]
190 #[uri_param(name = "initialDelay", default = "1000")]
191 initial_delay_ms: u64,
192
193 pub initial_delay: Duration,
195
196 #[uri_param(default = "false")]
198 pub noop: bool,
199
200 #[uri_param(default = "false")]
202 pub delete: bool,
203
204 #[uri_param(name = "move")]
207 move_to: Option<String>,
208
209 #[uri_param(name = "fileName")]
211 pub file_name: Option<String>,
212
213 #[uri_param]
215 pub include: Option<String>,
216
217 #[uri_param]
219 pub exclude: Option<String>,
220
221 #[uri_param(default = "false")]
223 pub recursive: bool,
224
225 #[uri_param(name = "fileExist", default = "Override")]
227 pub file_exist: FileExistStrategy,
228
229 #[uri_param(name = "tempPrefix")]
231 pub temp_prefix: Option<String>,
232
233 #[uri_param(name = "autoCreate", default = "true")]
235 pub auto_create: bool,
236
237 #[allow(dead_code)]
239 #[uri_param(name = "readTimeout", default = "30000")]
240 read_timeout_ms: u64,
241
242 pub read_timeout: Duration,
244
245 #[allow(dead_code)]
247 #[uri_param(name = "writeTimeout", default = "30000")]
248 write_timeout_ms: u64,
249
250 pub write_timeout: Duration,
252}
253
254impl UriConfig for FileConfig {
255 fn scheme() -> &'static str {
256 "file"
257 }
258
259 fn from_uri(uri: &str) -> Result<Self, CamelError> {
260 let parts = parse_uri(uri)?;
261 Self::from_components(parts)
262 }
263
264 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
265 Self::parse_uri_components(parts)?.validate()
266 }
267
268 fn validate(self) -> Result<Self, CamelError> {
269 let move_to = if self.noop || self.delete {
273 None
274 } else {
275 Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
276 };
277
278 Ok(Self { move_to, ..self })
279 }
280}
281
282impl FileConfig {
283 pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
291 if self.delay == Duration::from_millis(500) {
292 self.delay = Duration::from_millis(global.delay_ms);
293 }
294 if self.initial_delay == Duration::from_millis(1_000) {
295 self.initial_delay = Duration::from_millis(global.initial_delay_ms);
296 }
297 if self.read_timeout == Duration::from_millis(30_000) {
298 self.read_timeout = Duration::from_millis(global.read_timeout_ms);
299 }
300 if self.write_timeout == Duration::from_millis(30_000) {
301 self.write_timeout = Duration::from_millis(global.write_timeout_ms);
302 }
303 }
304}
305
306pub struct FileComponent {
311 config: Option<FileGlobalConfig>,
312}
313
314impl FileComponent {
315 pub fn new() -> Self {
316 Self { config: None }
317 }
318
319 pub fn with_config(config: FileGlobalConfig) -> Self {
320 Self {
321 config: Some(config),
322 }
323 }
324
325 pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
326 Self { config }
327 }
328}
329
330impl Default for FileComponent {
331 fn default() -> Self {
332 Self::new()
333 }
334}
335
336impl Component for FileComponent {
337 fn scheme(&self) -> &str {
338 "file"
339 }
340
341 fn create_endpoint(
342 &self,
343 uri: &str,
344 _ctx: &dyn camel_component_api::ComponentContext,
345 ) -> Result<Box<dyn Endpoint>, CamelError> {
346 let mut config = FileConfig::from_uri(uri)?;
347 if let Some(ref global_config) = self.config {
348 config.apply_global_defaults(global_config);
349 }
350 Ok(Box::new(FileEndpoint {
351 uri: uri.to_string(),
352 config,
353 }))
354 }
355}
356
357struct FileEndpoint {
362 uri: String,
363 config: FileConfig,
364}
365
366impl Endpoint for FileEndpoint {
367 fn uri(&self) -> &str {
368 &self.uri
369 }
370
371 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
372 Ok(Box::new(FileConsumer {
373 config: self.config.clone(),
374 seen: HashSet::new(),
375 }))
376 }
377
378 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
379 Ok(BoxProcessor::new(FileProducer {
380 config: self.config.clone(),
381 }))
382 }
383}
384
385struct FileConsumer {
390 config: FileConfig,
391 seen: HashSet<PathBuf>,
392}
393
394#[async_trait]
395impl Consumer for FileConsumer {
396 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
397 let config = self.config.clone();
398
399 let include_re = config
400 .include
401 .as_ref()
402 .map(|p| Regex::new(p))
403 .transpose()
404 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
405 let exclude_re = config
406 .exclude
407 .as_ref()
408 .map(|p| Regex::new(p))
409 .transpose()
410 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
411
412 if !config.initial_delay.is_zero() {
413 tokio::select! {
414 _ = time::sleep(config.initial_delay) => {}
415 _ = context.cancelled() => {
416 debug!(directory = config.directory, "File consumer cancelled during initial delay");
417 return Ok(());
418 }
419 }
420 }
421
422 let mut interval = time::interval(config.delay);
423
424 loop {
425 tokio::select! {
426 _ = context.cancelled() => {
427 debug!(directory = config.directory, "File consumer received cancellation, stopping");
428 break;
429 }
430 _ = interval.tick() => {
431 if let Err(e) = poll_directory(
432 &config,
433 &context,
434 &include_re,
435 &exclude_re,
436 &mut self.seen,
437 ).await {
438 warn!(directory = config.directory, error = %e, "Error polling directory");
439 }
440 }
441 }
442 }
443
444 Ok(())
445 }
446
447 async fn stop(&mut self) -> Result<(), CamelError> {
448 Ok(())
449 }
450}
451
452async fn poll_directory(
453 config: &FileConfig,
454 context: &ConsumerContext,
455 include_re: &Option<Regex>,
456 exclude_re: &Option<Regex>,
457 seen: &mut HashSet<PathBuf>,
458) -> Result<(), CamelError> {
459 let base_path = std::path::Path::new(&config.directory);
460
461 let files = list_files(base_path, config.recursive).await?;
462
463 for file_path in files {
464 let file_name = file_path
465 .file_name()
466 .and_then(|n| n.to_str())
467 .unwrap_or_default()
468 .to_string();
469
470 if let Some(ref target_name) = config.file_name
471 && file_name != *target_name
472 {
473 continue;
474 }
475
476 if let Some(re) = include_re
477 && !re.is_match(&file_name)
478 {
479 continue;
480 }
481
482 if let Some(re) = exclude_re
483 && re.is_match(&file_name)
484 {
485 continue;
486 }
487
488 if let Some(ref move_dir) = config.move_to
489 && file_path.starts_with(base_path.join(move_dir))
490 {
491 continue;
492 }
493
494 if config.noop && seen.contains(&file_path) {
496 continue;
497 }
498
499 let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
500 let f = fs::File::open(&file_path).await?;
501 let m = f.metadata().await?;
502 Ok::<_, std::io::Error>((f, m))
503 })
504 .await
505 {
506 Ok(Ok((f, m))) => (f, Some(m)),
507 Ok(Err(e)) => {
508 warn!(
509 file = %file_path.display(),
510 error = %e,
511 "Failed to open file"
512 );
513 continue;
514 }
515 Err(_) => {
516 warn!(
517 file = %file_path.display(),
518 timeout_ms = config.read_timeout.as_millis(),
519 "Timeout opening file"
520 );
521 continue;
522 }
523 };
524
525 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
526 let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
527
528 let last_modified = metadata
529 .as_ref()
530 .and_then(|m| m.modified().ok())
531 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
532 .map(|d| d.as_millis() as u64)
533 .unwrap_or(0);
534
535 let relative_path = file_path
536 .strip_prefix(base_path)
537 .unwrap_or(&file_path)
538 .to_string_lossy()
539 .to_string();
540
541 let absolute_path = file_path
542 .canonicalize()
543 .unwrap_or_else(|_| file_path.clone())
544 .to_string_lossy()
545 .to_string();
546
547 let body = Body::Stream(StreamBody {
548 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
549 metadata: StreamMetadata {
550 size_hint: Some(file_len),
551 content_type: None,
552 origin: Some(absolute_path.clone()),
553 },
554 });
555
556 let mut exchange = Exchange::new(Message::new(body));
557 exchange
558 .input
559 .set_header("CamelFileName", serde_json::Value::String(relative_path));
560 exchange.input.set_header(
561 "CamelFileNameOnly",
562 serde_json::Value::String(file_name.clone()),
563 );
564 exchange.input.set_header(
565 "CamelFileAbsolutePath",
566 serde_json::Value::String(absolute_path),
567 );
568 exchange.input.set_header(
569 "CamelFileLength",
570 serde_json::Value::Number(file_len.into()),
571 );
572 exchange.input.set_header(
573 "CamelFileLastModified",
574 serde_json::Value::Number(last_modified.into()),
575 );
576
577 debug!(
578 file = %file_path.display(),
579 correlation_id = %exchange.correlation_id(),
580 "Processing file"
581 );
582
583 if context.send(exchange).await.is_err() {
584 break;
585 }
586
587 if config.noop {
588 seen.insert(file_path.clone());
589 }
590
591 if config.noop {
592 } else if config.delete {
594 if let Err(e) = fs::remove_file(&file_path).await {
595 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
596 }
597 } else if let Some(ref move_dir) = config.move_to {
598 let target_dir = base_path.join(move_dir);
599 if let Err(e) = fs::create_dir_all(&target_dir).await {
600 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
601 continue;
602 }
603 let target_path = target_dir.join(&file_name);
604 if let Err(e) = fs::rename(&file_path, &target_path).await {
605 warn!(
606 from = %file_path.display(),
607 to = %target_path.display(),
608 error = %e,
609 "Failed to move file"
610 );
611 }
612 }
613 }
614
615 Ok(())
616}
617
618async fn list_files(
619 dir: &std::path::Path,
620 recursive: bool,
621) -> Result<Vec<std::path::PathBuf>, CamelError> {
622 let mut files = Vec::new();
623 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
624
625 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
626 let path = entry.path();
627 if path.is_file() {
628 files.push(path);
629 } else if path.is_dir() && recursive {
630 let mut sub_files = Box::pin(list_files(&path, true)).await?;
631 files.append(&mut sub_files);
632 }
633 }
634
635 files.sort();
636 Ok(files)
637}
638
639fn validate_path_is_within_base(
644 base_dir: &std::path::Path,
645 target_path: &std::path::Path,
646) -> Result<(), CamelError> {
647 let canonical_base = base_dir.canonicalize().map_err(|e| {
648 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
649 })?;
650
651 let canonical_target = if target_path.exists() {
653 target_path.canonicalize().map_err(|e| {
654 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
655 })?
656 } else if let Some(parent) = target_path.parent() {
657 if !parent.exists() {
659 return Err(CamelError::ProcessorError(format!(
660 "Parent directory '{}' does not exist",
661 parent.display()
662 )));
663 }
664 let canonical_parent = parent.canonicalize().map_err(|e| {
665 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
666 })?;
667 if let Some(filename) = target_path.file_name() {
669 canonical_parent.join(filename)
670 } else {
671 return Err(CamelError::ProcessorError(
672 "Invalid target path: no filename".to_string(),
673 ));
674 }
675 } else {
676 return Err(CamelError::ProcessorError(
677 "Invalid target path: no parent directory".to_string(),
678 ));
679 };
680
681 if !canonical_target.starts_with(&canonical_base) {
682 return Err(CamelError::ProcessorError(format!(
683 "Path '{}' is outside base directory '{}'",
684 canonical_target.display(),
685 canonical_base.display()
686 )));
687 }
688
689 Ok(())
690}
691
692#[derive(Clone)]
697struct FileProducer {
698 config: FileConfig,
699}
700
701impl FileProducer {
702 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
703 let raw = if let Some(name) = exchange
704 .input
705 .header("CamelFileName")
706 .and_then(|v| v.as_str())
707 {
708 Some(name.to_string())
709 } else {
710 config.file_name.clone()
711 };
712
713 match raw {
714 Some(name) if name.contains("${") => {
715 let lang = SimpleLanguage::new();
716 let expr = lang.create_expression(&name).map_err(|e| {
717 CamelError::ProcessorError(format!(
718 "cannot parse fileName expression '{}': {e}",
719 name
720 ))
721 })?;
722 let val = expr.evaluate(exchange).map_err(|e| {
723 CamelError::ProcessorError(format!(
724 "cannot evaluate fileName expression '{}': {e}",
725 name
726 ))
727 })?;
728 match val {
729 serde_json::Value::String(s) => Ok(s),
730 other => Ok(other.to_string()),
731 }
732 }
733 Some(name) => Ok(name),
734 None => Err(CamelError::ProcessorError(
735 "No filename specified: set CamelFileName header or fileName option".to_string(),
736 )),
737 }
738 }
739}
740
741impl Service<Exchange> for FileProducer {
742 type Response = Exchange;
743 type Error = CamelError;
744 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
745
746 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
747 Poll::Ready(Ok(()))
748 }
749
750 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
751 let config = self.config.clone();
752
753 Box::pin(async move {
754 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
755 let body = exchange.input.body.clone();
756
757 let dir_path = std::path::Path::new(&config.directory);
758 let target_path = dir_path.join(&file_name);
759
760 if config.auto_create
762 && let Some(parent) = target_path.parent()
763 {
764 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
765 .await
766 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
767 .map_err(CamelError::from)?;
768 }
769
770 validate_path_is_within_base(dir_path, &target_path)?;
772
773 match config.file_exist {
775 FileExistStrategy::Fail if target_path.exists() => {
776 return Err(CamelError::ProcessorError(format!(
777 "File already exists: {}",
778 target_path.display()
779 )));
780 }
781 FileExistStrategy::Append => {
782 let mut file = tokio::time::timeout(
784 config.write_timeout,
785 OpenOptions::new()
786 .append(true)
787 .create(true)
788 .open(&target_path),
789 )
790 .await
791 .map_err(|_| {
792 CamelError::ProcessorError("Timeout opening file for append".into())
793 })?
794 .map_err(CamelError::from)?;
795
796 tokio::time::timeout(
797 config.write_timeout,
798 io::copy(&mut body.into_async_read(), &mut file),
799 )
800 .await
801 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
802 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
803
804 file.flush().await.map_err(CamelError::from)?;
805 }
806 _ => {
807 let temp_name = if let Some(ref prefix) = config.temp_prefix {
809 format!("{prefix}{file_name}")
810 } else {
811 format!(".tmp.{file_name}")
812 };
813 let temp_path = dir_path.join(&temp_name);
814
815 let mut guard = TempFileGuard::new(temp_path.clone());
817
818 let mut file =
820 tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
821 .await
822 .map_err(|_| {
823 CamelError::ProcessorError("Timeout creating temp file".into())
824 })?
825 .map_err(CamelError::from)?;
826
827 let copy_result = tokio::time::timeout(
828 config.write_timeout,
829 io::copy(&mut body.into_async_read(), &mut file),
830 )
831 .await;
832
833 let _ = file.flush().await;
835
836 match copy_result {
837 Ok(Ok(_)) => {}
838 Ok(Err(e)) => {
839 return Err(CamelError::ProcessorError(e.to_string()));
841 }
842 Err(_) => {
843 return Err(CamelError::ProcessorError("Timeout writing file".into()));
845 }
846 }
847
848 let rename_result = tokio::time::timeout(
850 config.write_timeout,
851 fs::rename(&temp_path, &target_path),
852 )
853 .await;
854
855 match rename_result {
856 Ok(Ok(_)) => {
857 guard.disarm();
859 }
860 Ok(Err(e)) => {
861 return Err(CamelError::from(e));
863 }
864 Err(_) => {
865 return Err(CamelError::ProcessorError("Timeout renaming file".into()));
867 }
868 }
869 }
870 }
871
872 let abs_path = target_path
874 .canonicalize()
875 .unwrap_or_else(|_| target_path.clone())
876 .to_string_lossy()
877 .to_string();
878 exchange
879 .input
880 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
881
882 debug!(
883 file = %target_path.display(),
884 correlation_id = %exchange.correlation_id(),
885 "File written"
886 );
887
888 Ok(exchange)
889 })
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896 use bytes::Bytes;
897 use camel_component_api::NoOpComponentContext;
898 use std::time::Duration;
899 use tokio_util::sync::CancellationToken;
900
901 fn test_producer_ctx() -> ProducerContext {
902 ProducerContext::new()
903 }
904
905 #[test]
906 fn test_file_config_defaults() {
907 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
908 assert_eq!(config.directory, "/tmp/inbox");
909 assert_eq!(config.delay, Duration::from_millis(500));
910 assert_eq!(config.initial_delay, Duration::from_millis(1000));
911 assert!(!config.noop);
912 assert!(!config.delete);
913 assert_eq!(config.move_to, Some(".camel".to_string()));
914 assert!(config.file_name.is_none());
915 assert!(config.include.is_none());
916 assert!(config.exclude.is_none());
917 assert!(!config.recursive);
918 assert_eq!(config.file_exist, FileExistStrategy::Override);
919 assert!(config.temp_prefix.is_none());
920 assert!(config.auto_create);
921 assert_eq!(config.read_timeout, Duration::from_secs(30));
923 assert_eq!(config.write_timeout, Duration::from_secs(30));
924 }
925
926 #[test]
927 fn test_file_config_consumer_options() {
928 let config = FileConfig::from_uri(
929 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
930 ).unwrap();
931 assert_eq!(config.directory, "/data/input");
932 assert_eq!(config.delay, Duration::from_millis(1000));
933 assert_eq!(config.initial_delay, Duration::from_millis(2000));
934 assert!(config.noop);
935 assert!(config.recursive);
936 assert_eq!(config.include, Some(".*\\.csv".to_string()));
937 }
938
939 #[test]
940 fn test_file_config_producer_options() {
941 let config = FileConfig::from_uri(
942 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
943 )
944 .unwrap();
945 assert_eq!(config.file_exist, FileExistStrategy::Append);
946 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
947 assert!(!config.auto_create);
948 assert_eq!(config.file_name, Some("out.txt".to_string()));
949 }
950
951 #[test]
952 fn test_file_config_delete_mode() {
953 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
954 assert!(config.delete);
955 assert!(config.move_to.is_none());
956 }
957
958 #[test]
959 fn test_file_config_noop_mode() {
960 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
961 assert!(config.noop);
962 assert!(config.move_to.is_none());
963 }
964
965 #[test]
966 fn test_file_config_wrong_scheme() {
967 let result = FileConfig::from_uri("timer:tick");
968 assert!(result.is_err());
969 }
970
971 #[test]
972 fn test_file_component_scheme() {
973 let component = FileComponent::new();
974 assert_eq!(component.scheme(), "file");
975 }
976
977 #[test]
978 fn test_file_component_creates_endpoint() {
979 let component = FileComponent::new();
980 let ctx = NoOpComponentContext;
981 let endpoint = component.create_endpoint("file:/tmp/test", &ctx);
982 assert!(endpoint.is_ok());
983 }
984
985 #[tokio::test]
990 async fn test_file_consumer_reads_files() {
991 let dir = tempfile::tempdir().unwrap();
992 let dir_path = dir.path().to_str().unwrap();
993
994 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
995 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
996
997 let component = FileComponent::new();
998 let ctx = NoOpComponentContext;
999 let endpoint = component
1000 .create_endpoint(
1001 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100"),
1002 &ctx,
1003 )
1004 .unwrap();
1005 let mut consumer = endpoint.create_consumer().unwrap();
1006
1007 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1008 let token = CancellationToken::new();
1009 let ctx = ConsumerContext::new(tx, token.clone());
1010
1011 tokio::spawn(async move {
1012 consumer.start(ctx).await.unwrap();
1013 });
1014
1015 let mut received = Vec::new();
1016 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
1017 while let Some(envelope) = rx.recv().await {
1018 received.push(envelope.exchange);
1019 if received.len() == 2 {
1020 break;
1021 }
1022 }
1023 })
1024 .await;
1025 token.cancel();
1026
1027 assert!(timeout.is_ok(), "Should have received 2 exchanges");
1028 assert_eq!(received.len(), 2);
1029
1030 for ex in &received {
1031 assert!(ex.input.header("CamelFileName").is_some());
1032 assert!(ex.input.header("CamelFileNameOnly").is_some());
1033 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
1034 assert!(ex.input.header("CamelFileLength").is_some());
1035 assert!(ex.input.header("CamelFileLastModified").is_some());
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn noop_second_poll_does_not_re_emit_seen_files() {
1041 let dir = tempfile::tempdir().unwrap();
1042 let file_path = dir.path().join("test.txt");
1043 tokio::fs::write(&file_path, b"hello").await.unwrap();
1044
1045 let uri = format!(
1046 "file:{}?noop=true&initialDelay=0&delay=50",
1047 dir.path().display()
1048 );
1049 let config = FileConfig::from_uri(&uri).unwrap();
1050 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1051 let token = CancellationToken::new();
1052 let ctx = ConsumerContext::new(tx, token);
1053
1054 let include_re = None;
1055 let exclude_re = None;
1056 let mut seen = std::collections::HashSet::new();
1057
1058 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1059 .await
1060 .unwrap();
1061 assert!(rx.try_recv().is_ok(), "first poll should emit file");
1062 assert!(rx.try_recv().is_err(), "should only emit once");
1063
1064 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1065 .await
1066 .unwrap();
1067 assert!(
1068 rx.try_recv().is_err(),
1069 "second poll should not re-emit seen file"
1070 );
1071 }
1072
1073 #[tokio::test]
1074 async fn noop_new_files_picked_up_after_first_poll() {
1075 let dir = tempfile::tempdir().unwrap();
1076 let file1 = dir.path().join("a.txt");
1077 tokio::fs::write(&file1, b"a").await.unwrap();
1078
1079 let uri = format!(
1080 "file:{}?noop=true&initialDelay=0&delay=50",
1081 dir.path().display()
1082 );
1083 let config = FileConfig::from_uri(&uri).unwrap();
1084 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1085 let token = CancellationToken::new();
1086 let ctx = ConsumerContext::new(tx, token);
1087
1088 let include_re = None;
1089 let exclude_re = None;
1090 let mut seen = std::collections::HashSet::new();
1091
1092 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1093 .await
1094 .unwrap();
1095 let _ = rx.try_recv();
1096
1097 let file2 = dir.path().join("b.txt");
1098 tokio::fs::write(&file2, b"b").await.unwrap();
1099
1100 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1101 .await
1102 .unwrap();
1103 assert!(
1104 rx.try_recv().is_ok(),
1105 "b.txt should be emitted on second poll"
1106 );
1107 assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1108 }
1109
1110 #[tokio::test]
1111 async fn test_file_consumer_include_filter() {
1112 let dir = tempfile::tempdir().unwrap();
1113 let dir_path = dir.path().to_str().unwrap();
1114
1115 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1116 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1117
1118 let component = FileComponent::new();
1119 let ctx = NoOpComponentContext;
1120 let endpoint = component
1121 .create_endpoint(
1122 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"),
1123 &ctx,
1124 )
1125 .unwrap();
1126 let mut consumer = endpoint.create_consumer().unwrap();
1127
1128 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1129 let token = CancellationToken::new();
1130 let ctx = ConsumerContext::new(tx, token.clone());
1131
1132 tokio::spawn(async move {
1133 consumer.start(ctx).await.unwrap();
1134 });
1135
1136 let mut received = Vec::new();
1137 let _ = tokio::time::timeout(Duration::from_millis(500), async {
1138 while let Some(envelope) = rx.recv().await {
1139 received.push(envelope.exchange);
1140 if received.len() == 1 {
1141 break;
1142 }
1143 }
1144 })
1145 .await;
1146 token.cancel();
1147
1148 assert_eq!(received.len(), 1);
1149 let name = received[0]
1150 .input
1151 .header("CamelFileNameOnly")
1152 .and_then(|v| v.as_str())
1153 .unwrap();
1154 assert_eq!(name, "data.csv");
1155 }
1156
1157 #[tokio::test]
1158 async fn test_file_consumer_delete_mode() {
1159 let dir = tempfile::tempdir().unwrap();
1160 let dir_path = dir.path().to_str().unwrap();
1161
1162 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1163
1164 let component = FileComponent::new();
1165 let ctx = NoOpComponentContext;
1166 let endpoint = component
1167 .create_endpoint(
1168 &format!("file:{dir_path}?delete=true&initialDelay=0&delay=100"),
1169 &ctx,
1170 )
1171 .unwrap();
1172 let mut consumer = endpoint.create_consumer().unwrap();
1173
1174 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1175 let token = CancellationToken::new();
1176 let ctx = ConsumerContext::new(tx, token.clone());
1177
1178 tokio::spawn(async move {
1179 consumer.start(ctx).await.unwrap();
1180 });
1181
1182 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1183 token.cancel();
1184
1185 tokio::time::sleep(Duration::from_millis(100)).await;
1186
1187 assert!(
1188 !dir.path().join("deleteme.txt").exists(),
1189 "File should be deleted"
1190 );
1191 }
1192
1193 #[tokio::test]
1194 async fn test_file_consumer_move_mode() {
1195 let dir = tempfile::tempdir().unwrap();
1196 let dir_path = dir.path().to_str().unwrap();
1197
1198 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1199
1200 let component = FileComponent::new();
1201 let ctx = NoOpComponentContext;
1202 let endpoint = component
1203 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"), &ctx)
1204 .unwrap();
1205 let mut consumer = endpoint.create_consumer().unwrap();
1206
1207 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1208 let token = CancellationToken::new();
1209 let ctx = ConsumerContext::new(tx, token.clone());
1210
1211 tokio::spawn(async move {
1212 consumer.start(ctx).await.unwrap();
1213 });
1214
1215 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1216 token.cancel();
1217
1218 tokio::time::sleep(Duration::from_millis(100)).await;
1219
1220 assert!(
1221 !dir.path().join("moveme.txt").exists(),
1222 "Original file should be gone"
1223 );
1224 assert!(
1225 dir.path().join(".camel").join("moveme.txt").exists(),
1226 "File should be in .camel/"
1227 );
1228 }
1229
1230 #[tokio::test]
1231 async fn test_file_consumer_respects_cancellation() {
1232 let dir = tempfile::tempdir().unwrap();
1233 let dir_path = dir.path().to_str().unwrap();
1234
1235 let component = FileComponent::new();
1236 let ctx = NoOpComponentContext;
1237 let endpoint = component
1238 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"), &ctx)
1239 .unwrap();
1240 let mut consumer = endpoint.create_consumer().unwrap();
1241
1242 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1243 let token = CancellationToken::new();
1244 let ctx = ConsumerContext::new(tx, token.clone());
1245
1246 let handle = tokio::spawn(async move {
1247 consumer.start(ctx).await.unwrap();
1248 });
1249
1250 tokio::time::sleep(Duration::from_millis(150)).await;
1251 token.cancel();
1252
1253 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1254 assert!(
1255 result.is_ok(),
1256 "Consumer should have stopped after cancellation"
1257 );
1258 }
1259
1260 #[tokio::test]
1265 async fn test_file_producer_writes_file() {
1266 use tower::ServiceExt;
1267
1268 let dir = tempfile::tempdir().unwrap();
1269 let dir_path = dir.path().to_str().unwrap();
1270
1271 let component = FileComponent::new();
1272 let ctx = NoOpComponentContext;
1273 let endpoint = component
1274 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1275 .unwrap();
1276 let ctx = test_producer_ctx();
1277 let producer = endpoint.create_producer(&ctx).unwrap();
1278
1279 let mut exchange = Exchange::new(Message::new("file content"));
1280 exchange.input.set_header(
1281 "CamelFileName",
1282 serde_json::Value::String("output.txt".to_string()),
1283 );
1284
1285 let result = producer.oneshot(exchange).await.unwrap();
1286
1287 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1288 assert_eq!(content, "file content");
1289
1290 assert!(result.input.header("CamelFileNameProduced").is_some());
1291 }
1292
1293 #[tokio::test]
1294 async fn test_file_producer_auto_create_dirs() {
1295 use tower::ServiceExt;
1296
1297 let dir = tempfile::tempdir().unwrap();
1298 let dir_path = dir.path().to_str().unwrap();
1299
1300 let component = FileComponent::new();
1301 let ctx = NoOpComponentContext;
1302 let endpoint = component
1303 .create_endpoint(&format!("file:{dir_path}/sub/dir"), &ctx)
1304 .unwrap();
1305 let ctx = test_producer_ctx();
1306 let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308 let mut exchange = Exchange::new(Message::new("nested"));
1309 exchange.input.set_header(
1310 "CamelFileName",
1311 serde_json::Value::String("file.txt".to_string()),
1312 );
1313
1314 producer.oneshot(exchange).await.unwrap();
1315
1316 assert!(dir.path().join("sub/dir/file.txt").exists());
1317 }
1318
1319 #[tokio::test]
1320 async fn test_file_producer_file_exist_fail() {
1321 use tower::ServiceExt;
1322
1323 let dir = tempfile::tempdir().unwrap();
1324 let dir_path = dir.path().to_str().unwrap();
1325
1326 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1327
1328 let component = FileComponent::new();
1329 let ctx = NoOpComponentContext;
1330 let endpoint = component
1331 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"), &ctx)
1332 .unwrap();
1333 let ctx = test_producer_ctx();
1334 let producer = endpoint.create_producer(&ctx).unwrap();
1335
1336 let mut exchange = Exchange::new(Message::new("new"));
1337 exchange.input.set_header(
1338 "CamelFileName",
1339 serde_json::Value::String("existing.txt".to_string()),
1340 );
1341
1342 let result = producer.oneshot(exchange).await;
1343 assert!(
1344 result.is_err(),
1345 "Should fail when file exists with Fail strategy"
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn test_file_producer_file_exist_append() {
1351 use tower::ServiceExt;
1352
1353 let dir = tempfile::tempdir().unwrap();
1354 let dir_path = dir.path().to_str().unwrap();
1355
1356 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1357
1358 let component = FileComponent::new();
1359 let ctx = NoOpComponentContext;
1360 let endpoint = component
1361 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"), &ctx)
1362 .unwrap();
1363 let ctx = test_producer_ctx();
1364 let producer = endpoint.create_producer(&ctx).unwrap();
1365
1366 let mut exchange = Exchange::new(Message::new("new"));
1367 exchange.input.set_header(
1368 "CamelFileName",
1369 serde_json::Value::String("append.txt".to_string()),
1370 );
1371
1372 producer.oneshot(exchange).await.unwrap();
1373
1374 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1375 assert_eq!(content, "oldnew");
1376 }
1377
1378 #[tokio::test]
1379 async fn test_file_producer_temp_prefix() {
1380 use tower::ServiceExt;
1381
1382 let dir = tempfile::tempdir().unwrap();
1383 let dir_path = dir.path().to_str().unwrap();
1384
1385 let component = FileComponent::new();
1386 let ctx = NoOpComponentContext;
1387 let endpoint = component
1388 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"), &ctx)
1389 .unwrap();
1390 let ctx = test_producer_ctx();
1391 let producer = endpoint.create_producer(&ctx).unwrap();
1392
1393 let mut exchange = Exchange::new(Message::new("atomic write"));
1394 exchange.input.set_header(
1395 "CamelFileName",
1396 serde_json::Value::String("final.txt".to_string()),
1397 );
1398
1399 producer.oneshot(exchange).await.unwrap();
1400
1401 assert!(dir.path().join("final.txt").exists());
1402 assert!(!dir.path().join(".tmpfinal.txt").exists());
1403 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1404 assert_eq!(content, "atomic write");
1405 }
1406
1407 #[tokio::test]
1408 async fn test_file_producer_uses_filename_option() {
1409 use tower::ServiceExt;
1410
1411 let dir = tempfile::tempdir().unwrap();
1412 let dir_path = dir.path().to_str().unwrap();
1413
1414 let component = FileComponent::new();
1415 let ctx = NoOpComponentContext;
1416 let endpoint = component
1417 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"), &ctx)
1418 .unwrap();
1419 let ctx = test_producer_ctx();
1420 let producer = endpoint.create_producer(&ctx).unwrap();
1421
1422 let exchange = Exchange::new(Message::new("content"));
1423
1424 producer.oneshot(exchange).await.unwrap();
1425 assert!(dir.path().join("fixed.txt").exists());
1426 }
1427
1428 #[tokio::test]
1429 async fn test_file_producer_no_filename_errors() {
1430 use tower::ServiceExt;
1431
1432 let dir = tempfile::tempdir().unwrap();
1433 let dir_path = dir.path().to_str().unwrap();
1434
1435 let component = FileComponent::new();
1436 let ctx = NoOpComponentContext;
1437 let endpoint = component
1438 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1439 .unwrap();
1440 let ctx = test_producer_ctx();
1441 let producer = endpoint.create_producer(&ctx).unwrap();
1442
1443 let exchange = Exchange::new(Message::new("content"));
1444
1445 let result = producer.oneshot(exchange).await;
1446 assert!(result.is_err(), "Should error when no filename is provided");
1447 }
1448
1449 #[tokio::test]
1454 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1455 use tower::ServiceExt;
1456
1457 let dir = tempfile::tempdir().unwrap();
1458 let dir_path = dir.path().to_str().unwrap();
1459
1460 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1462 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1463
1464 let component = FileComponent::new();
1465 let ctx = NoOpComponentContext;
1466 let endpoint = component
1467 .create_endpoint(&format!("file:{dir_path}/subdir"), &ctx)
1468 .unwrap();
1469 let ctx = test_producer_ctx();
1470 let producer = endpoint.create_producer(&ctx).unwrap();
1471
1472 let mut exchange = Exchange::new(Message::new("malicious"));
1473 exchange.input.set_header(
1474 "CamelFileName",
1475 serde_json::Value::String("../secret.txt".to_string()),
1476 );
1477
1478 let result = producer.oneshot(exchange).await;
1479 assert!(result.is_err(), "Should reject path traversal attempt");
1480
1481 let err = result.unwrap_err();
1482 assert!(
1483 err.to_string().contains("outside"),
1484 "Error should mention path is outside base directory"
1485 );
1486 }
1487
1488 #[tokio::test]
1489 async fn test_file_producer_rejects_absolute_path_outside_base() {
1490 use tower::ServiceExt;
1491
1492 let dir = tempfile::tempdir().unwrap();
1493 let dir_path = dir.path().to_str().unwrap();
1494
1495 let component = FileComponent::new();
1496 let ctx = NoOpComponentContext;
1497 let endpoint = component
1498 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1499 .unwrap();
1500 let ctx = test_producer_ctx();
1501 let producer = endpoint.create_producer(&ctx).unwrap();
1502
1503 let mut exchange = Exchange::new(Message::new("malicious"));
1504 exchange.input.set_header(
1505 "CamelFileName",
1506 serde_json::Value::String("/etc/passwd".to_string()),
1507 );
1508
1509 let result = producer.oneshot(exchange).await;
1510 assert!(result.is_err(), "Should reject absolute path outside base");
1511 }
1512
1513 #[tokio::test]
1518 #[ignore] async fn test_large_file_streaming_constant_memory() {
1520 use std::io::Write;
1521 use tempfile::NamedTempFile;
1522
1523 let mut temp_file = NamedTempFile::new().unwrap();
1525 let file_size = 150 * 1024 * 1024; let chunk = vec![b'X'; 1024 * 1024]; for _ in 0..150 {
1529 temp_file.write_all(&chunk).unwrap();
1530 }
1531 temp_file.flush().unwrap();
1532
1533 let dir = temp_file.path().parent().unwrap();
1534 let dir_path = dir.to_str().unwrap();
1535 let file_name = temp_file
1536 .path()
1537 .file_name()
1538 .unwrap()
1539 .to_str()
1540 .unwrap()
1541 .to_string();
1542
1543 let component = FileComponent::new();
1545 let component_ctx = NoOpComponentContext;
1546 let endpoint = component
1547 .create_endpoint(
1548 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1549 &component_ctx,
1550 )
1551 .unwrap();
1552 let mut consumer = endpoint.create_consumer().unwrap();
1553
1554 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1555 let token = CancellationToken::new();
1556 let ctx = ConsumerContext::new(tx, token.clone());
1557
1558 tokio::spawn(async move {
1559 let _ = consumer.start(ctx).await;
1560 });
1561
1562 let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1563 rx.recv().await.unwrap().exchange
1564 })
1565 .await
1566 .expect("Should receive exchange");
1567 token.cancel();
1568
1569 assert!(matches!(exchange.input.body, Body::Stream(_)));
1571
1572 if let Body::Stream(ref stream_body) = exchange.input.body {
1574 assert!(stream_body.metadata.size_hint.is_some());
1575 let size = stream_body.metadata.size_hint.unwrap();
1576 assert_eq!(size, file_size as u64);
1577 }
1578
1579 if let Body::Stream(stream_body) = exchange.input.body {
1581 let body = Body::Stream(stream_body);
1582 let result = body.into_bytes(100 * 1024 * 1024).await;
1583 assert!(result.is_err());
1584 }
1585
1586 let component2 = FileComponent::new();
1589 let endpoint2 = component2
1590 .create_endpoint(
1591 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1592 &component_ctx,
1593 )
1594 .unwrap();
1595 let mut consumer2 = endpoint2.create_consumer().unwrap();
1596
1597 let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1598 let token2 = CancellationToken::new();
1599 let ctx2 = ConsumerContext::new(tx2, token2.clone());
1600
1601 tokio::spawn(async move {
1602 let _ = consumer2.start(ctx2).await;
1603 });
1604
1605 let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1606 rx2.recv().await.unwrap().exchange
1607 })
1608 .await
1609 .expect("Should receive exchange");
1610 token2.cancel();
1611
1612 if let Body::Stream(stream_body) = exchange2.input.body {
1613 let mut stream_lock = stream_body.stream.lock().await;
1614 let mut stream = stream_lock.take().unwrap();
1615
1616 if let Some(chunk_result) = stream.next().await {
1618 let chunk = chunk_result.unwrap();
1619 assert!(!chunk.is_empty());
1620 assert!(chunk.len() < file_size);
1621 }
1623 }
1624 }
1625
1626 #[tokio::test]
1631 async fn test_producer_writes_stream_body() {
1632 let dir = tempfile::tempdir().unwrap();
1633 let dir_path = dir.path().to_str().unwrap();
1634 let uri = format!("file:{dir_path}?fileName=out.txt");
1635
1636 let component = FileComponent::new();
1637 let ctx = NoOpComponentContext;
1638 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1639 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1640
1641 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1642 Ok(Bytes::from("hello ")),
1643 Ok(Bytes::from("streaming ")),
1644 Ok(Bytes::from("world")),
1645 ];
1646 let stream = futures::stream::iter(chunks);
1647 let body = Body::Stream(StreamBody {
1648 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1649 metadata: StreamMetadata {
1650 size_hint: None,
1651 content_type: None,
1652 origin: None,
1653 },
1654 });
1655
1656 let exchange = Exchange::new(Message::new(body));
1657 tower::ServiceExt::oneshot(producer, exchange)
1658 .await
1659 .unwrap();
1660
1661 let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1662 .await
1663 .unwrap();
1664 assert_eq!(content, "hello streaming world");
1665 }
1666
1667 #[tokio::test]
1668 async fn test_producer_stream_atomic_no_partial_on_error() {
1669 let dir = tempfile::tempdir().unwrap();
1671 let dir_path = dir.path().to_str().unwrap();
1672 let uri = format!("file:{dir_path}?fileName=out.txt");
1673
1674 let component = FileComponent::new();
1675 let ctx = NoOpComponentContext;
1676 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1677 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1678
1679 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1680 Ok(Bytes::from("partial")),
1681 Err(CamelError::ProcessorError(
1682 "simulated stream error".to_string(),
1683 )),
1684 ];
1685 let stream = futures::stream::iter(chunks);
1686 let body = Body::Stream(StreamBody {
1687 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1688 metadata: StreamMetadata {
1689 size_hint: None,
1690 content_type: None,
1691 origin: None,
1692 },
1693 });
1694
1695 let exchange = Exchange::new(Message::new(body));
1696 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1697 assert!(
1698 result.is_err(),
1699 "expected error when stream fails mid-write"
1700 );
1701
1702 assert!(
1704 !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1705 "partial file must not exist after failed write"
1706 );
1707
1708 assert!(
1710 !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1711 "temp file must be cleaned up after failed write"
1712 );
1713 }
1714
1715 #[tokio::test]
1716 async fn test_producer_stream_append() {
1717 let dir = tempfile::tempdir().unwrap();
1718 let dir_path = dir.path().to_str().unwrap();
1719 let target = format!("{dir_path}/out.txt");
1720
1721 tokio::fs::write(&target, b"line1\n").await.unwrap();
1723
1724 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1725 let component = FileComponent::new();
1726 let ctx = NoOpComponentContext;
1727 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1728 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1729
1730 let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1731 let stream = futures::stream::iter(chunks);
1732 let body = Body::Stream(StreamBody {
1733 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1734 metadata: StreamMetadata {
1735 size_hint: None,
1736 content_type: None,
1737 origin: None,
1738 },
1739 });
1740
1741 let exchange = Exchange::new(Message::new(body));
1742 tower::ServiceExt::oneshot(producer, exchange)
1743 .await
1744 .unwrap();
1745
1746 let content = tokio::fs::read_to_string(&target).await.unwrap();
1747 assert_eq!(content, "line1\nline2\n");
1748 }
1749
1750 #[tokio::test]
1751 async fn test_producer_stream_append_partial_on_error() {
1752 let dir = tempfile::tempdir().unwrap();
1755 let dir_path = dir.path().to_str().unwrap();
1756 let target = format!("{dir_path}/out.txt");
1757
1758 tokio::fs::write(&target, b"initial\n").await.unwrap();
1760
1761 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1762 let component = FileComponent::new();
1763 let ctx = NoOpComponentContext;
1764 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1765 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1766
1767 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1769 Ok(Bytes::from("partial-")), Err(CamelError::ProcessorError("stream error".to_string())), Ok(Bytes::from("never-written")), ];
1773 let stream = futures::stream::iter(chunks);
1774 let body = Body::Stream(StreamBody {
1775 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1776 metadata: StreamMetadata {
1777 size_hint: None,
1778 content_type: None,
1779 origin: None,
1780 },
1781 });
1782
1783 let exchange = Exchange::new(Message::new(body));
1784 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1785
1786 assert!(
1788 result.is_err(),
1789 "expected error when stream fails during append"
1790 );
1791
1792 let content = tokio::fs::read_to_string(&target).await.unwrap();
1794 assert_eq!(
1795 content, "initial\npartial-",
1796 "append leaves partial data on stream error (non-atomic by nature)"
1797 );
1798 }
1799
1800 #[tokio::test]
1801 async fn test_producer_stream_already_consumed_errors() {
1802 let dir = tempfile::tempdir().unwrap();
1803 let dir_path = dir.path().to_str().unwrap();
1804 let uri = format!("file:{dir_path}?fileName=out.txt");
1805
1806 let component = FileComponent::new();
1807 let ctx = NoOpComponentContext;
1808 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1809 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1810
1811 type MaybeStream = std::sync::Arc<
1813 tokio::sync::Mutex<
1814 Option<
1815 std::pin::Pin<
1816 Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1817 >,
1818 >,
1819 >,
1820 >;
1821 let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1822 let body = Body::Stream(StreamBody {
1823 stream: arc,
1824 metadata: StreamMetadata {
1825 size_hint: None,
1826 content_type: None,
1827 origin: None,
1828 },
1829 });
1830
1831 let exchange = Exchange::new(Message::new(body));
1832 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1833 assert!(
1834 result.is_err(),
1835 "expected error for already-consumed stream"
1836 );
1837 }
1838
1839 #[test]
1844 fn test_global_config_applied_to_endpoint() {
1845 let global = FileGlobalConfig::default()
1847 .with_delay_ms(2000)
1848 .with_initial_delay_ms(5000)
1849 .with_read_timeout_ms(60_000)
1850 .with_write_timeout_ms(45_000);
1851 let component = FileComponent::with_config(global);
1852 let ctx = NoOpComponentContext;
1853 let endpoint = component.create_endpoint("file:/tmp/inbox", &ctx).unwrap();
1855 let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1858 let global2 = FileGlobalConfig::default()
1859 .with_delay_ms(2000)
1860 .with_initial_delay_ms(5000)
1861 .with_read_timeout_ms(60_000)
1862 .with_write_timeout_ms(45_000);
1863 config.apply_global_defaults(&global2);
1864 assert_eq!(config.delay, Duration::from_millis(2000));
1865 assert_eq!(config.initial_delay, Duration::from_millis(5000));
1866 assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1867 assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1868 let _ = endpoint; }
1871
1872 #[test]
1873 fn test_uri_param_wins_over_global_config() {
1874 let mut config =
1876 FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1877 let global = FileGlobalConfig::default()
1879 .with_delay_ms(3000)
1880 .with_initial_delay_ms(4000);
1881 config.apply_global_defaults(&global);
1882 assert_eq!(config.delay, Duration::from_millis(1000));
1884 assert_eq!(config.initial_delay, Duration::from_millis(2000));
1886 assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1889 }
1890
1891 #[tokio::test]
1892 async fn test_file_producer_filename_simple_language_from_header() {
1893 use tower::ServiceExt;
1894
1895 let dir = tempfile::tempdir().unwrap();
1896 let dir_path = dir.path().to_str().unwrap();
1897
1898 let component = FileComponent::new();
1899 let ctx = NoOpComponentContext;
1900 let endpoint = component
1901 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1902 .unwrap();
1903 let ctx = test_producer_ctx();
1904 let producer = endpoint.create_producer(&ctx).unwrap();
1905
1906 let mut exchange = Exchange::new(Message::new("content"));
1907 exchange
1908 .input
1909 .set_header("CamelTimerCounter", serde_json::Value::Number(42.into()));
1910 exchange.input.set_header(
1911 "CamelFileName",
1912 serde_json::Value::String("test-${header.CamelTimerCounter}.txt".to_string()),
1913 );
1914
1915 producer.oneshot(exchange).await.unwrap();
1916
1917 assert!(
1918 dir.path().join("test-42.txt").exists(),
1919 "fileName should have been evaluated from Simple Language expression"
1920 );
1921 let content = std::fs::read_to_string(dir.path().join("test-42.txt")).unwrap();
1922 assert_eq!(content, "content");
1923 }
1924
1925 #[tokio::test]
1926 async fn test_file_producer_filename_simple_language_from_uri_param() {
1927 use tower::ServiceExt;
1928
1929 let dir = tempfile::tempdir().unwrap();
1930 let dir_path = dir.path().to_str().unwrap();
1931
1932 let component = FileComponent::new();
1933 let ctx = NoOpComponentContext;
1934 let endpoint = component
1935 .create_endpoint(
1936 &format!("file:{dir_path}?fileName=msg-${{header.id}}.dat"),
1937 &ctx,
1938 )
1939 .unwrap();
1940 let ctx = test_producer_ctx();
1941 let producer = endpoint.create_producer(&ctx).unwrap();
1942
1943 let mut exchange = Exchange::new(Message::new("data"));
1944 exchange
1945 .input
1946 .set_header("id", serde_json::Value::String("abc".to_string()));
1947
1948 producer.oneshot(exchange).await.unwrap();
1949
1950 assert!(
1951 dir.path().join("msg-abc.dat").exists(),
1952 "fileName URI param should have been evaluated from Simple Language expression"
1953 );
1954 }
1955
1956 #[tokio::test]
1957 async fn test_file_producer_filename_literal_without_expression() {
1958 use tower::ServiceExt;
1959
1960 let dir = tempfile::tempdir().unwrap();
1961 let dir_path = dir.path().to_str().unwrap();
1962
1963 let component = FileComponent::new();
1964 let ctx = NoOpComponentContext;
1965 let endpoint = component
1966 .create_endpoint(&format!("file:{dir_path}?fileName=plain.txt"), &ctx)
1967 .unwrap();
1968 let ctx = test_producer_ctx();
1969 let producer = endpoint.create_producer(&ctx).unwrap();
1970
1971 let exchange = Exchange::new(Message::new("data"));
1972 producer.oneshot(exchange).await.unwrap();
1973
1974 assert!(
1975 dir.path().join("plain.txt").exists(),
1976 "literal fileName without expressions should still work"
1977 );
1978 }
1979}