1use std::collections::HashSet;
2use std::future::Future;
3use std::path::PathBuf;
4use std::pin::Pin;
5use std::str::FromStr;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use futures::StreamExt;
11use regex::Regex;
12use tokio::fs;
13use tokio::fs::OpenOptions;
14use tokio::io;
15use tokio::io::AsyncWriteExt;
16use tokio::time;
17use tokio_util::io::ReaderStream;
18use tower::Service;
19use tracing::{debug, warn};
20
21use camel_api::{
22 BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
23};
24use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
25use camel_endpoint::{UriConfig, parse_uri};
26
27struct TempFileGuard {
36 path: PathBuf,
37 disarm: bool,
38}
39
40impl TempFileGuard {
41 fn new(path: PathBuf) -> Self {
42 Self {
43 path,
44 disarm: false,
45 }
46 }
47
48 fn disarm(&mut self) {
50 self.disarm = true;
51 }
52}
53
54impl Drop for TempFileGuard {
55 fn drop(&mut self) {
56 if !self.disarm {
57 let _ = std::fs::remove_file(&self.path);
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
69pub enum FileExistStrategy {
70 #[default]
72 Override,
73 Append,
75 Fail,
77}
78
79impl FromStr for FileExistStrategy {
80 type Err = String;
81
82 fn from_str(s: &str) -> Result<Self, Self::Err> {
83 match s {
84 "Override" | "override" => Ok(FileExistStrategy::Override),
85 "Append" | "append" => Ok(FileExistStrategy::Append),
86 "Fail" | "fail" => Ok(FileExistStrategy::Fail),
87 _ => Ok(FileExistStrategy::Override), }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq)]
100pub struct FileGlobalConfig {
101 pub delay_ms: u64,
102 pub initial_delay_ms: u64,
103 pub read_timeout_ms: u64,
104 pub write_timeout_ms: u64,
105}
106
107impl Default for FileGlobalConfig {
108 fn default() -> Self {
109 Self {
110 delay_ms: 500,
111 initial_delay_ms: 1_000,
112 read_timeout_ms: 30_000,
113 write_timeout_ms: 30_000,
114 }
115 }
116}
117
118impl FileGlobalConfig {
119 pub fn new() -> Self {
120 Self::default()
121 }
122 pub fn with_delay_ms(mut self, v: u64) -> Self {
123 self.delay_ms = v;
124 self
125 }
126 pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
127 self.initial_delay_ms = v;
128 self
129 }
130 pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
131 self.read_timeout_ms = v;
132 self
133 }
134 pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
135 self.write_timeout_ms = v;
136 self
137 }
138}
139
140#[derive(Debug, Clone, UriConfig)]
167#[uri_scheme = "file"]
168#[uri_config(skip_impl)]
169pub struct FileConfig {
170 pub directory: String,
172
173 #[allow(dead_code)]
175 #[uri_param(name = "delay", default = "500")]
176 delay_ms: u64,
177
178 pub delay: Duration,
180
181 #[allow(dead_code)]
183 #[uri_param(name = "initialDelay", default = "1000")]
184 initial_delay_ms: u64,
185
186 pub initial_delay: Duration,
188
189 #[uri_param(default = "false")]
191 pub noop: bool,
192
193 #[uri_param(default = "false")]
195 pub delete: bool,
196
197 #[uri_param(name = "move")]
200 move_to: Option<String>,
201
202 #[uri_param(name = "fileName")]
204 pub file_name: Option<String>,
205
206 #[uri_param]
208 pub include: Option<String>,
209
210 #[uri_param]
212 pub exclude: Option<String>,
213
214 #[uri_param(default = "false")]
216 pub recursive: bool,
217
218 #[uri_param(name = "fileExist", default = "Override")]
220 pub file_exist: FileExistStrategy,
221
222 #[uri_param(name = "tempPrefix")]
224 pub temp_prefix: Option<String>,
225
226 #[uri_param(name = "autoCreate", default = "true")]
228 pub auto_create: bool,
229
230 #[allow(dead_code)]
232 #[uri_param(name = "readTimeout", default = "30000")]
233 read_timeout_ms: u64,
234
235 pub read_timeout: Duration,
237
238 #[allow(dead_code)]
240 #[uri_param(name = "writeTimeout", default = "30000")]
241 write_timeout_ms: u64,
242
243 pub write_timeout: Duration,
245}
246
247impl UriConfig for FileConfig {
248 fn scheme() -> &'static str {
249 "file"
250 }
251
252 fn from_uri(uri: &str) -> Result<Self, CamelError> {
253 let parts = parse_uri(uri)?;
254 Self::from_components(parts)
255 }
256
257 fn from_components(parts: camel_endpoint::UriComponents) -> Result<Self, CamelError> {
258 Self::parse_uri_components(parts)?.validate()
259 }
260
261 fn validate(self) -> Result<Self, CamelError> {
262 let move_to = if self.noop || self.delete {
266 None
267 } else {
268 Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
269 };
270
271 Ok(Self { move_to, ..self })
272 }
273}
274
275impl FileConfig {
276 pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
284 if self.delay == Duration::from_millis(500) {
285 self.delay = Duration::from_millis(global.delay_ms);
286 }
287 if self.initial_delay == Duration::from_millis(1_000) {
288 self.initial_delay = Duration::from_millis(global.initial_delay_ms);
289 }
290 if self.read_timeout == Duration::from_millis(30_000) {
291 self.read_timeout = Duration::from_millis(global.read_timeout_ms);
292 }
293 if self.write_timeout == Duration::from_millis(30_000) {
294 self.write_timeout = Duration::from_millis(global.write_timeout_ms);
295 }
296 }
297}
298
299pub struct FileComponent {
304 config: Option<FileGlobalConfig>,
305}
306
307impl FileComponent {
308 pub fn new() -> Self {
309 Self { config: None }
310 }
311
312 pub fn with_config(config: FileGlobalConfig) -> Self {
313 Self {
314 config: Some(config),
315 }
316 }
317
318 pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
319 Self { config }
320 }
321}
322
323impl Default for FileComponent {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl Component for FileComponent {
330 fn scheme(&self) -> &str {
331 "file"
332 }
333
334 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
335 let mut config = FileConfig::from_uri(uri)?;
336 if let Some(ref global_config) = self.config {
337 config.apply_global_defaults(global_config);
338 }
339 Ok(Box::new(FileEndpoint {
340 uri: uri.to_string(),
341 config,
342 }))
343 }
344}
345
346struct FileEndpoint {
351 uri: String,
352 config: FileConfig,
353}
354
355impl Endpoint for FileEndpoint {
356 fn uri(&self) -> &str {
357 &self.uri
358 }
359
360 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
361 Ok(Box::new(FileConsumer {
362 config: self.config.clone(),
363 seen: HashSet::new(),
364 }))
365 }
366
367 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
368 Ok(BoxProcessor::new(FileProducer {
369 config: self.config.clone(),
370 }))
371 }
372}
373
374struct FileConsumer {
379 config: FileConfig,
380 seen: HashSet<PathBuf>,
381}
382
383#[async_trait]
384impl Consumer for FileConsumer {
385 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
386 let config = self.config.clone();
387
388 let include_re = config
389 .include
390 .as_ref()
391 .map(|p| Regex::new(p))
392 .transpose()
393 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
394 let exclude_re = config
395 .exclude
396 .as_ref()
397 .map(|p| Regex::new(p))
398 .transpose()
399 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
400
401 if !config.initial_delay.is_zero() {
402 tokio::select! {
403 _ = time::sleep(config.initial_delay) => {}
404 _ = context.cancelled() => {
405 debug!(directory = config.directory, "File consumer cancelled during initial delay");
406 return Ok(());
407 }
408 }
409 }
410
411 let mut interval = time::interval(config.delay);
412
413 loop {
414 tokio::select! {
415 _ = context.cancelled() => {
416 debug!(directory = config.directory, "File consumer received cancellation, stopping");
417 break;
418 }
419 _ = interval.tick() => {
420 if let Err(e) = poll_directory(
421 &config,
422 &context,
423 &include_re,
424 &exclude_re,
425 &mut self.seen,
426 ).await {
427 warn!(directory = config.directory, error = %e, "Error polling directory");
428 }
429 }
430 }
431 }
432
433 Ok(())
434 }
435
436 async fn stop(&mut self) -> Result<(), CamelError> {
437 Ok(())
438 }
439}
440
441async fn poll_directory(
442 config: &FileConfig,
443 context: &ConsumerContext,
444 include_re: &Option<Regex>,
445 exclude_re: &Option<Regex>,
446 seen: &mut HashSet<PathBuf>,
447) -> Result<(), CamelError> {
448 let base_path = std::path::Path::new(&config.directory);
449
450 let files = list_files(base_path, config.recursive).await?;
451
452 for file_path in files {
453 let file_name = file_path
454 .file_name()
455 .and_then(|n| n.to_str())
456 .unwrap_or_default()
457 .to_string();
458
459 if let Some(ref target_name) = config.file_name
460 && file_name != *target_name
461 {
462 continue;
463 }
464
465 if let Some(re) = include_re
466 && !re.is_match(&file_name)
467 {
468 continue;
469 }
470
471 if let Some(re) = exclude_re
472 && re.is_match(&file_name)
473 {
474 continue;
475 }
476
477 if let Some(ref move_dir) = config.move_to
478 && file_path.starts_with(base_path.join(move_dir))
479 {
480 continue;
481 }
482
483 if config.noop && seen.contains(&file_path) {
485 continue;
486 }
487
488 let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
489 let f = fs::File::open(&file_path).await?;
490 let m = f.metadata().await?;
491 Ok::<_, std::io::Error>((f, m))
492 })
493 .await
494 {
495 Ok(Ok((f, m))) => (f, Some(m)),
496 Ok(Err(e)) => {
497 warn!(
498 file = %file_path.display(),
499 error = %e,
500 "Failed to open file"
501 );
502 continue;
503 }
504 Err(_) => {
505 warn!(
506 file = %file_path.display(),
507 timeout_ms = config.read_timeout.as_millis(),
508 "Timeout opening file"
509 );
510 continue;
511 }
512 };
513
514 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
515 let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
516
517 let last_modified = metadata
518 .as_ref()
519 .and_then(|m| m.modified().ok())
520 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
521 .map(|d| d.as_millis() as u64)
522 .unwrap_or(0);
523
524 let relative_path = file_path
525 .strip_prefix(base_path)
526 .unwrap_or(&file_path)
527 .to_string_lossy()
528 .to_string();
529
530 let absolute_path = file_path
531 .canonicalize()
532 .unwrap_or_else(|_| file_path.clone())
533 .to_string_lossy()
534 .to_string();
535
536 let body = Body::Stream(StreamBody {
537 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
538 metadata: StreamMetadata {
539 size_hint: Some(file_len),
540 content_type: None,
541 origin: Some(absolute_path.clone()),
542 },
543 });
544
545 let mut exchange = Exchange::new(Message::new(body));
546 exchange
547 .input
548 .set_header("CamelFileName", serde_json::Value::String(relative_path));
549 exchange.input.set_header(
550 "CamelFileNameOnly",
551 serde_json::Value::String(file_name.clone()),
552 );
553 exchange.input.set_header(
554 "CamelFileAbsolutePath",
555 serde_json::Value::String(absolute_path),
556 );
557 exchange.input.set_header(
558 "CamelFileLength",
559 serde_json::Value::Number(file_len.into()),
560 );
561 exchange.input.set_header(
562 "CamelFileLastModified",
563 serde_json::Value::Number(last_modified.into()),
564 );
565
566 debug!(
567 file = %file_path.display(),
568 correlation_id = %exchange.correlation_id(),
569 "Processing file"
570 );
571
572 if context.send(exchange).await.is_err() {
573 break;
574 }
575
576 if config.noop {
577 seen.insert(file_path.clone());
578 }
579
580 if config.noop {
581 } else if config.delete {
583 if let Err(e) = fs::remove_file(&file_path).await {
584 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
585 }
586 } else if let Some(ref move_dir) = config.move_to {
587 let target_dir = base_path.join(move_dir);
588 if let Err(e) = fs::create_dir_all(&target_dir).await {
589 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
590 continue;
591 }
592 let target_path = target_dir.join(&file_name);
593 if let Err(e) = fs::rename(&file_path, &target_path).await {
594 warn!(
595 from = %file_path.display(),
596 to = %target_path.display(),
597 error = %e,
598 "Failed to move file"
599 );
600 }
601 }
602 }
603
604 Ok(())
605}
606
607async fn list_files(
608 dir: &std::path::Path,
609 recursive: bool,
610) -> Result<Vec<std::path::PathBuf>, CamelError> {
611 let mut files = Vec::new();
612 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
613
614 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
615 let path = entry.path();
616 if path.is_file() {
617 files.push(path);
618 } else if path.is_dir() && recursive {
619 let mut sub_files = Box::pin(list_files(&path, true)).await?;
620 files.append(&mut sub_files);
621 }
622 }
623
624 files.sort();
625 Ok(files)
626}
627
628fn validate_path_is_within_base(
633 base_dir: &std::path::Path,
634 target_path: &std::path::Path,
635) -> Result<(), CamelError> {
636 let canonical_base = base_dir.canonicalize().map_err(|e| {
637 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
638 })?;
639
640 let canonical_target = if target_path.exists() {
642 target_path.canonicalize().map_err(|e| {
643 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
644 })?
645 } else if let Some(parent) = target_path.parent() {
646 if !parent.exists() {
648 return Err(CamelError::ProcessorError(format!(
649 "Parent directory '{}' does not exist",
650 parent.display()
651 )));
652 }
653 let canonical_parent = parent.canonicalize().map_err(|e| {
654 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
655 })?;
656 if let Some(filename) = target_path.file_name() {
658 canonical_parent.join(filename)
659 } else {
660 return Err(CamelError::ProcessorError(
661 "Invalid target path: no filename".to_string(),
662 ));
663 }
664 } else {
665 return Err(CamelError::ProcessorError(
666 "Invalid target path: no parent directory".to_string(),
667 ));
668 };
669
670 if !canonical_target.starts_with(&canonical_base) {
671 return Err(CamelError::ProcessorError(format!(
672 "Path '{}' is outside base directory '{}'",
673 canonical_target.display(),
674 canonical_base.display()
675 )));
676 }
677
678 Ok(())
679}
680
681#[derive(Clone)]
686struct FileProducer {
687 config: FileConfig,
688}
689
690impl FileProducer {
691 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
692 if let Some(name) = exchange
693 .input
694 .header("CamelFileName")
695 .and_then(|v| v.as_str())
696 {
697 return Ok(name.to_string());
698 }
699 if let Some(ref name) = config.file_name {
700 return Ok(name.clone());
701 }
702 Err(CamelError::ProcessorError(
703 "No filename specified: set CamelFileName header or fileName option".to_string(),
704 ))
705 }
706}
707
708impl Service<Exchange> for FileProducer {
709 type Response = Exchange;
710 type Error = CamelError;
711 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
712
713 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
714 Poll::Ready(Ok(()))
715 }
716
717 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
718 let config = self.config.clone();
719
720 Box::pin(async move {
721 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
722 let body = exchange.input.body.clone();
723
724 let dir_path = std::path::Path::new(&config.directory);
725 let target_path = dir_path.join(&file_name);
726
727 if config.auto_create
729 && let Some(parent) = target_path.parent()
730 {
731 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
732 .await
733 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
734 .map_err(CamelError::from)?;
735 }
736
737 validate_path_is_within_base(dir_path, &target_path)?;
739
740 match config.file_exist {
742 FileExistStrategy::Fail if target_path.exists() => {
743 return Err(CamelError::ProcessorError(format!(
744 "File already exists: {}",
745 target_path.display()
746 )));
747 }
748 FileExistStrategy::Append => {
749 let mut file = tokio::time::timeout(
751 config.write_timeout,
752 OpenOptions::new()
753 .append(true)
754 .create(true)
755 .open(&target_path),
756 )
757 .await
758 .map_err(|_| {
759 CamelError::ProcessorError("Timeout opening file for append".into())
760 })?
761 .map_err(CamelError::from)?;
762
763 tokio::time::timeout(
764 config.write_timeout,
765 io::copy(&mut body.into_async_read(), &mut file),
766 )
767 .await
768 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
769 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
770
771 file.flush().await.map_err(CamelError::from)?;
772 }
773 _ => {
774 let temp_name = if let Some(ref prefix) = config.temp_prefix {
776 format!("{prefix}{file_name}")
777 } else {
778 format!(".tmp.{file_name}")
779 };
780 let temp_path = dir_path.join(&temp_name);
781
782 let mut guard = TempFileGuard::new(temp_path.clone());
784
785 let mut file =
787 tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
788 .await
789 .map_err(|_| {
790 CamelError::ProcessorError("Timeout creating temp file".into())
791 })?
792 .map_err(CamelError::from)?;
793
794 let copy_result = tokio::time::timeout(
795 config.write_timeout,
796 io::copy(&mut body.into_async_read(), &mut file),
797 )
798 .await;
799
800 let _ = file.flush().await;
802
803 match copy_result {
804 Ok(Ok(_)) => {}
805 Ok(Err(e)) => {
806 return Err(CamelError::ProcessorError(e.to_string()));
808 }
809 Err(_) => {
810 return Err(CamelError::ProcessorError("Timeout writing file".into()));
812 }
813 }
814
815 let rename_result = tokio::time::timeout(
817 config.write_timeout,
818 fs::rename(&temp_path, &target_path),
819 )
820 .await;
821
822 match rename_result {
823 Ok(Ok(_)) => {
824 guard.disarm();
826 }
827 Ok(Err(e)) => {
828 return Err(CamelError::from(e));
830 }
831 Err(_) => {
832 return Err(CamelError::ProcessorError("Timeout renaming file".into()));
834 }
835 }
836 }
837 }
838
839 let abs_path = target_path
841 .canonicalize()
842 .unwrap_or_else(|_| target_path.clone())
843 .to_string_lossy()
844 .to_string();
845 exchange
846 .input
847 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
848
849 debug!(
850 file = %target_path.display(),
851 correlation_id = %exchange.correlation_id(),
852 "File written"
853 );
854
855 Ok(exchange)
856 })
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use super::*;
863 use bytes::Bytes;
864 use std::time::Duration;
865 use tokio_util::sync::CancellationToken;
866
867 fn test_producer_ctx() -> ProducerContext {
868 ProducerContext::new()
869 }
870
871 #[test]
872 fn test_file_config_defaults() {
873 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
874 assert_eq!(config.directory, "/tmp/inbox");
875 assert_eq!(config.delay, Duration::from_millis(500));
876 assert_eq!(config.initial_delay, Duration::from_millis(1000));
877 assert!(!config.noop);
878 assert!(!config.delete);
879 assert_eq!(config.move_to, Some(".camel".to_string()));
880 assert!(config.file_name.is_none());
881 assert!(config.include.is_none());
882 assert!(config.exclude.is_none());
883 assert!(!config.recursive);
884 assert_eq!(config.file_exist, FileExistStrategy::Override);
885 assert!(config.temp_prefix.is_none());
886 assert!(config.auto_create);
887 assert_eq!(config.read_timeout, Duration::from_secs(30));
889 assert_eq!(config.write_timeout, Duration::from_secs(30));
890 }
891
892 #[test]
893 fn test_file_config_consumer_options() {
894 let config = FileConfig::from_uri(
895 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
896 ).unwrap();
897 assert_eq!(config.directory, "/data/input");
898 assert_eq!(config.delay, Duration::from_millis(1000));
899 assert_eq!(config.initial_delay, Duration::from_millis(2000));
900 assert!(config.noop);
901 assert!(config.recursive);
902 assert_eq!(config.include, Some(".*\\.csv".to_string()));
903 }
904
905 #[test]
906 fn test_file_config_producer_options() {
907 let config = FileConfig::from_uri(
908 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
909 )
910 .unwrap();
911 assert_eq!(config.file_exist, FileExistStrategy::Append);
912 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
913 assert!(!config.auto_create);
914 assert_eq!(config.file_name, Some("out.txt".to_string()));
915 }
916
917 #[test]
918 fn test_file_config_delete_mode() {
919 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
920 assert!(config.delete);
921 assert!(config.move_to.is_none());
922 }
923
924 #[test]
925 fn test_file_config_noop_mode() {
926 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
927 assert!(config.noop);
928 assert!(config.move_to.is_none());
929 }
930
931 #[test]
932 fn test_file_config_wrong_scheme() {
933 let result = FileConfig::from_uri("timer:tick");
934 assert!(result.is_err());
935 }
936
937 #[test]
938 fn test_file_component_scheme() {
939 let component = FileComponent::new();
940 assert_eq!(component.scheme(), "file");
941 }
942
943 #[test]
944 fn test_file_component_creates_endpoint() {
945 let component = FileComponent::new();
946 let endpoint = component.create_endpoint("file:/tmp/test");
947 assert!(endpoint.is_ok());
948 }
949
950 #[tokio::test]
955 async fn test_file_consumer_reads_files() {
956 let dir = tempfile::tempdir().unwrap();
957 let dir_path = dir.path().to_str().unwrap();
958
959 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
960 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
961
962 let component = FileComponent::new();
963 let endpoint = component
964 .create_endpoint(&format!(
965 "file:{dir_path}?noop=true&initialDelay=0&delay=100"
966 ))
967 .unwrap();
968 let mut consumer = endpoint.create_consumer().unwrap();
969
970 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
971 let token = CancellationToken::new();
972 let ctx = ConsumerContext::new(tx, token.clone());
973
974 tokio::spawn(async move {
975 consumer.start(ctx).await.unwrap();
976 });
977
978 let mut received = Vec::new();
979 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
980 while let Some(envelope) = rx.recv().await {
981 received.push(envelope.exchange);
982 if received.len() == 2 {
983 break;
984 }
985 }
986 })
987 .await;
988 token.cancel();
989
990 assert!(timeout.is_ok(), "Should have received 2 exchanges");
991 assert_eq!(received.len(), 2);
992
993 for ex in &received {
994 assert!(ex.input.header("CamelFileName").is_some());
995 assert!(ex.input.header("CamelFileNameOnly").is_some());
996 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
997 assert!(ex.input.header("CamelFileLength").is_some());
998 assert!(ex.input.header("CamelFileLastModified").is_some());
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn noop_second_poll_does_not_re_emit_seen_files() {
1004 let dir = tempfile::tempdir().unwrap();
1005 let file_path = dir.path().join("test.txt");
1006 tokio::fs::write(&file_path, b"hello").await.unwrap();
1007
1008 let uri = format!(
1009 "file:{}?noop=true&initialDelay=0&delay=50",
1010 dir.path().display()
1011 );
1012 let config = FileConfig::from_uri(&uri).unwrap();
1013 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1014 let token = CancellationToken::new();
1015 let ctx = ConsumerContext::new(tx, token);
1016
1017 let include_re = None;
1018 let exclude_re = None;
1019 let mut seen = std::collections::HashSet::new();
1020
1021 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1022 .await
1023 .unwrap();
1024 assert!(rx.try_recv().is_ok(), "first poll should emit file");
1025 assert!(rx.try_recv().is_err(), "should only emit once");
1026
1027 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1028 .await
1029 .unwrap();
1030 assert!(
1031 rx.try_recv().is_err(),
1032 "second poll should not re-emit seen file"
1033 );
1034 }
1035
1036 #[tokio::test]
1037 async fn noop_new_files_picked_up_after_first_poll() {
1038 let dir = tempfile::tempdir().unwrap();
1039 let file1 = dir.path().join("a.txt");
1040 tokio::fs::write(&file1, b"a").await.unwrap();
1041
1042 let uri = format!(
1043 "file:{}?noop=true&initialDelay=0&delay=50",
1044 dir.path().display()
1045 );
1046 let config = FileConfig::from_uri(&uri).unwrap();
1047 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1048 let token = CancellationToken::new();
1049 let ctx = ConsumerContext::new(tx, token);
1050
1051 let include_re = None;
1052 let exclude_re = None;
1053 let mut seen = std::collections::HashSet::new();
1054
1055 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1056 .await
1057 .unwrap();
1058 let _ = rx.try_recv();
1059
1060 let file2 = dir.path().join("b.txt");
1061 tokio::fs::write(&file2, b"b").await.unwrap();
1062
1063 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1064 .await
1065 .unwrap();
1066 assert!(
1067 rx.try_recv().is_ok(),
1068 "b.txt should be emitted on second poll"
1069 );
1070 assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_file_consumer_include_filter() {
1075 let dir = tempfile::tempdir().unwrap();
1076 let dir_path = dir.path().to_str().unwrap();
1077
1078 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1079 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1080
1081 let component = FileComponent::new();
1082 let endpoint = component
1083 .create_endpoint(&format!(
1084 "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
1085 ))
1086 .unwrap();
1087 let mut consumer = endpoint.create_consumer().unwrap();
1088
1089 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1090 let token = CancellationToken::new();
1091 let ctx = ConsumerContext::new(tx, token.clone());
1092
1093 tokio::spawn(async move {
1094 consumer.start(ctx).await.unwrap();
1095 });
1096
1097 let mut received = Vec::new();
1098 let _ = tokio::time::timeout(Duration::from_millis(500), async {
1099 while let Some(envelope) = rx.recv().await {
1100 received.push(envelope.exchange);
1101 if received.len() == 1 {
1102 break;
1103 }
1104 }
1105 })
1106 .await;
1107 token.cancel();
1108
1109 assert_eq!(received.len(), 1);
1110 let name = received[0]
1111 .input
1112 .header("CamelFileNameOnly")
1113 .and_then(|v| v.as_str())
1114 .unwrap();
1115 assert_eq!(name, "data.csv");
1116 }
1117
1118 #[tokio::test]
1119 async fn test_file_consumer_delete_mode() {
1120 let dir = tempfile::tempdir().unwrap();
1121 let dir_path = dir.path().to_str().unwrap();
1122
1123 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1124
1125 let component = FileComponent::new();
1126 let endpoint = component
1127 .create_endpoint(&format!(
1128 "file:{dir_path}?delete=true&initialDelay=0&delay=100"
1129 ))
1130 .unwrap();
1131 let mut consumer = endpoint.create_consumer().unwrap();
1132
1133 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1134 let token = CancellationToken::new();
1135 let ctx = ConsumerContext::new(tx, token.clone());
1136
1137 tokio::spawn(async move {
1138 consumer.start(ctx).await.unwrap();
1139 });
1140
1141 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1142 token.cancel();
1143
1144 tokio::time::sleep(Duration::from_millis(100)).await;
1145
1146 assert!(
1147 !dir.path().join("deleteme.txt").exists(),
1148 "File should be deleted"
1149 );
1150 }
1151
1152 #[tokio::test]
1153 async fn test_file_consumer_move_mode() {
1154 let dir = tempfile::tempdir().unwrap();
1155 let dir_path = dir.path().to_str().unwrap();
1156
1157 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1158
1159 let component = FileComponent::new();
1160 let endpoint = component
1161 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
1162 .unwrap();
1163 let mut consumer = endpoint.create_consumer().unwrap();
1164
1165 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1166 let token = CancellationToken::new();
1167 let ctx = ConsumerContext::new(tx, token.clone());
1168
1169 tokio::spawn(async move {
1170 consumer.start(ctx).await.unwrap();
1171 });
1172
1173 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1174 token.cancel();
1175
1176 tokio::time::sleep(Duration::from_millis(100)).await;
1177
1178 assert!(
1179 !dir.path().join("moveme.txt").exists(),
1180 "Original file should be gone"
1181 );
1182 assert!(
1183 dir.path().join(".camel").join("moveme.txt").exists(),
1184 "File should be in .camel/"
1185 );
1186 }
1187
1188 #[tokio::test]
1189 async fn test_file_consumer_respects_cancellation() {
1190 let dir = tempfile::tempdir().unwrap();
1191 let dir_path = dir.path().to_str().unwrap();
1192
1193 let component = FileComponent::new();
1194 let endpoint = component
1195 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1196 .unwrap();
1197 let mut consumer = endpoint.create_consumer().unwrap();
1198
1199 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1200 let token = CancellationToken::new();
1201 let ctx = ConsumerContext::new(tx, token.clone());
1202
1203 let handle = tokio::spawn(async move {
1204 consumer.start(ctx).await.unwrap();
1205 });
1206
1207 tokio::time::sleep(Duration::from_millis(150)).await;
1208 token.cancel();
1209
1210 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1211 assert!(
1212 result.is_ok(),
1213 "Consumer should have stopped after cancellation"
1214 );
1215 }
1216
1217 #[tokio::test]
1222 async fn test_file_producer_writes_file() {
1223 use tower::ServiceExt;
1224
1225 let dir = tempfile::tempdir().unwrap();
1226 let dir_path = dir.path().to_str().unwrap();
1227
1228 let component = FileComponent::new();
1229 let endpoint = component
1230 .create_endpoint(&format!("file:{dir_path}"))
1231 .unwrap();
1232 let ctx = test_producer_ctx();
1233 let producer = endpoint.create_producer(&ctx).unwrap();
1234
1235 let mut exchange = Exchange::new(Message::new("file content"));
1236 exchange.input.set_header(
1237 "CamelFileName",
1238 serde_json::Value::String("output.txt".to_string()),
1239 );
1240
1241 let result = producer.oneshot(exchange).await.unwrap();
1242
1243 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1244 assert_eq!(content, "file content");
1245
1246 assert!(result.input.header("CamelFileNameProduced").is_some());
1247 }
1248
1249 #[tokio::test]
1250 async fn test_file_producer_auto_create_dirs() {
1251 use tower::ServiceExt;
1252
1253 let dir = tempfile::tempdir().unwrap();
1254 let dir_path = dir.path().to_str().unwrap();
1255
1256 let component = FileComponent::new();
1257 let endpoint = component
1258 .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1259 .unwrap();
1260 let ctx = test_producer_ctx();
1261 let producer = endpoint.create_producer(&ctx).unwrap();
1262
1263 let mut exchange = Exchange::new(Message::new("nested"));
1264 exchange.input.set_header(
1265 "CamelFileName",
1266 serde_json::Value::String("file.txt".to_string()),
1267 );
1268
1269 producer.oneshot(exchange).await.unwrap();
1270
1271 assert!(dir.path().join("sub/dir/file.txt").exists());
1272 }
1273
1274 #[tokio::test]
1275 async fn test_file_producer_file_exist_fail() {
1276 use tower::ServiceExt;
1277
1278 let dir = tempfile::tempdir().unwrap();
1279 let dir_path = dir.path().to_str().unwrap();
1280
1281 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1282
1283 let component = FileComponent::new();
1284 let endpoint = component
1285 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1286 .unwrap();
1287 let ctx = test_producer_ctx();
1288 let producer = endpoint.create_producer(&ctx).unwrap();
1289
1290 let mut exchange = Exchange::new(Message::new("new"));
1291 exchange.input.set_header(
1292 "CamelFileName",
1293 serde_json::Value::String("existing.txt".to_string()),
1294 );
1295
1296 let result = producer.oneshot(exchange).await;
1297 assert!(
1298 result.is_err(),
1299 "Should fail when file exists with Fail strategy"
1300 );
1301 }
1302
1303 #[tokio::test]
1304 async fn test_file_producer_file_exist_append() {
1305 use tower::ServiceExt;
1306
1307 let dir = tempfile::tempdir().unwrap();
1308 let dir_path = dir.path().to_str().unwrap();
1309
1310 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1311
1312 let component = FileComponent::new();
1313 let endpoint = component
1314 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1315 .unwrap();
1316 let ctx = test_producer_ctx();
1317 let producer = endpoint.create_producer(&ctx).unwrap();
1318
1319 let mut exchange = Exchange::new(Message::new("new"));
1320 exchange.input.set_header(
1321 "CamelFileName",
1322 serde_json::Value::String("append.txt".to_string()),
1323 );
1324
1325 producer.oneshot(exchange).await.unwrap();
1326
1327 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1328 assert_eq!(content, "oldnew");
1329 }
1330
1331 #[tokio::test]
1332 async fn test_file_producer_temp_prefix() {
1333 use tower::ServiceExt;
1334
1335 let dir = tempfile::tempdir().unwrap();
1336 let dir_path = dir.path().to_str().unwrap();
1337
1338 let component = FileComponent::new();
1339 let endpoint = component
1340 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1341 .unwrap();
1342 let ctx = test_producer_ctx();
1343 let producer = endpoint.create_producer(&ctx).unwrap();
1344
1345 let mut exchange = Exchange::new(Message::new("atomic write"));
1346 exchange.input.set_header(
1347 "CamelFileName",
1348 serde_json::Value::String("final.txt".to_string()),
1349 );
1350
1351 producer.oneshot(exchange).await.unwrap();
1352
1353 assert!(dir.path().join("final.txt").exists());
1354 assert!(!dir.path().join(".tmpfinal.txt").exists());
1355 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1356 assert_eq!(content, "atomic write");
1357 }
1358
1359 #[tokio::test]
1360 async fn test_file_producer_uses_filename_option() {
1361 use tower::ServiceExt;
1362
1363 let dir = tempfile::tempdir().unwrap();
1364 let dir_path = dir.path().to_str().unwrap();
1365
1366 let component = FileComponent::new();
1367 let endpoint = component
1368 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1369 .unwrap();
1370 let ctx = test_producer_ctx();
1371 let producer = endpoint.create_producer(&ctx).unwrap();
1372
1373 let exchange = Exchange::new(Message::new("content"));
1374
1375 producer.oneshot(exchange).await.unwrap();
1376 assert!(dir.path().join("fixed.txt").exists());
1377 }
1378
1379 #[tokio::test]
1380 async fn test_file_producer_no_filename_errors() {
1381 use tower::ServiceExt;
1382
1383 let dir = tempfile::tempdir().unwrap();
1384 let dir_path = dir.path().to_str().unwrap();
1385
1386 let component = FileComponent::new();
1387 let endpoint = component
1388 .create_endpoint(&format!("file:{dir_path}"))
1389 .unwrap();
1390 let ctx = test_producer_ctx();
1391 let producer = endpoint.create_producer(&ctx).unwrap();
1392
1393 let exchange = Exchange::new(Message::new("content"));
1394
1395 let result = producer.oneshot(exchange).await;
1396 assert!(result.is_err(), "Should error when no filename is provided");
1397 }
1398
1399 #[tokio::test]
1404 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1405 use tower::ServiceExt;
1406
1407 let dir = tempfile::tempdir().unwrap();
1408 let dir_path = dir.path().to_str().unwrap();
1409
1410 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1412 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1413
1414 let component = FileComponent::new();
1415 let endpoint = component
1416 .create_endpoint(&format!("file:{dir_path}/subdir"))
1417 .unwrap();
1418 let ctx = test_producer_ctx();
1419 let producer = endpoint.create_producer(&ctx).unwrap();
1420
1421 let mut exchange = Exchange::new(Message::new("malicious"));
1422 exchange.input.set_header(
1423 "CamelFileName",
1424 serde_json::Value::String("../secret.txt".to_string()),
1425 );
1426
1427 let result = producer.oneshot(exchange).await;
1428 assert!(result.is_err(), "Should reject path traversal attempt");
1429
1430 let err = result.unwrap_err();
1431 assert!(
1432 err.to_string().contains("outside"),
1433 "Error should mention path is outside base directory"
1434 );
1435 }
1436
1437 #[tokio::test]
1438 async fn test_file_producer_rejects_absolute_path_outside_base() {
1439 use tower::ServiceExt;
1440
1441 let dir = tempfile::tempdir().unwrap();
1442 let dir_path = dir.path().to_str().unwrap();
1443
1444 let component = FileComponent::new();
1445 let endpoint = component
1446 .create_endpoint(&format!("file:{dir_path}"))
1447 .unwrap();
1448 let ctx = test_producer_ctx();
1449 let producer = endpoint.create_producer(&ctx).unwrap();
1450
1451 let mut exchange = Exchange::new(Message::new("malicious"));
1452 exchange.input.set_header(
1453 "CamelFileName",
1454 serde_json::Value::String("/etc/passwd".to_string()),
1455 );
1456
1457 let result = producer.oneshot(exchange).await;
1458 assert!(result.is_err(), "Should reject absolute path outside base");
1459 }
1460
1461 #[tokio::test]
1466 #[ignore] async fn test_large_file_streaming_constant_memory() {
1468 use std::io::Write;
1469 use tempfile::NamedTempFile;
1470
1471 let mut temp_file = NamedTempFile::new().unwrap();
1473 let file_size = 150 * 1024 * 1024; let chunk = vec![b'X'; 1024 * 1024]; for _ in 0..150 {
1477 temp_file.write_all(&chunk).unwrap();
1478 }
1479 temp_file.flush().unwrap();
1480
1481 let dir = temp_file.path().parent().unwrap();
1482 let dir_path = dir.to_str().unwrap();
1483 let file_name = temp_file
1484 .path()
1485 .file_name()
1486 .unwrap()
1487 .to_str()
1488 .unwrap()
1489 .to_string();
1490
1491 let component = FileComponent::new();
1493 let endpoint = component
1494 .create_endpoint(&format!(
1495 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1496 ))
1497 .unwrap();
1498 let mut consumer = endpoint.create_consumer().unwrap();
1499
1500 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1501 let token = CancellationToken::new();
1502 let ctx = ConsumerContext::new(tx, token.clone());
1503
1504 tokio::spawn(async move {
1505 let _ = consumer.start(ctx).await;
1506 });
1507
1508 let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1509 rx.recv().await.unwrap().exchange
1510 })
1511 .await
1512 .expect("Should receive exchange");
1513 token.cancel();
1514
1515 assert!(matches!(exchange.input.body, Body::Stream(_)));
1517
1518 if let Body::Stream(ref stream_body) = exchange.input.body {
1520 assert!(stream_body.metadata.size_hint.is_some());
1521 let size = stream_body.metadata.size_hint.unwrap();
1522 assert_eq!(size, file_size as u64);
1523 }
1524
1525 if let Body::Stream(stream_body) = exchange.input.body {
1527 let body = Body::Stream(stream_body);
1528 let result = body.into_bytes(100 * 1024 * 1024).await;
1529 assert!(result.is_err());
1530 }
1531
1532 let component2 = FileComponent::new();
1535 let endpoint2 = component2
1536 .create_endpoint(&format!(
1537 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1538 ))
1539 .unwrap();
1540 let mut consumer2 = endpoint2.create_consumer().unwrap();
1541
1542 let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1543 let token2 = CancellationToken::new();
1544 let ctx2 = ConsumerContext::new(tx2, token2.clone());
1545
1546 tokio::spawn(async move {
1547 let _ = consumer2.start(ctx2).await;
1548 });
1549
1550 let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1551 rx2.recv().await.unwrap().exchange
1552 })
1553 .await
1554 .expect("Should receive exchange");
1555 token2.cancel();
1556
1557 if let Body::Stream(stream_body) = exchange2.input.body {
1558 let mut stream_lock = stream_body.stream.lock().await;
1559 let mut stream = stream_lock.take().unwrap();
1560
1561 if let Some(chunk_result) = stream.next().await {
1563 let chunk = chunk_result.unwrap();
1564 assert!(!chunk.is_empty());
1565 assert!(chunk.len() < file_size);
1566 }
1568 }
1569 }
1570
1571 #[tokio::test]
1576 async fn test_producer_writes_stream_body() {
1577 let dir = tempfile::tempdir().unwrap();
1578 let dir_path = dir.path().to_str().unwrap();
1579 let uri = format!("file:{dir_path}?fileName=out.txt");
1580
1581 let component = FileComponent::new();
1582 let endpoint = component.create_endpoint(&uri).unwrap();
1583 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1584
1585 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1586 Ok(Bytes::from("hello ")),
1587 Ok(Bytes::from("streaming ")),
1588 Ok(Bytes::from("world")),
1589 ];
1590 let stream = futures::stream::iter(chunks);
1591 let body = Body::Stream(camel_api::body::StreamBody {
1592 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1593 metadata: camel_api::body::StreamMetadata {
1594 size_hint: None,
1595 content_type: None,
1596 origin: None,
1597 },
1598 });
1599
1600 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1601 tower::ServiceExt::oneshot(producer, exchange)
1602 .await
1603 .unwrap();
1604
1605 let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1606 .await
1607 .unwrap();
1608 assert_eq!(content, "hello streaming world");
1609 }
1610
1611 #[tokio::test]
1612 async fn test_producer_stream_atomic_no_partial_on_error() {
1613 let dir = tempfile::tempdir().unwrap();
1615 let dir_path = dir.path().to_str().unwrap();
1616 let uri = format!("file:{dir_path}?fileName=out.txt");
1617
1618 let component = FileComponent::new();
1619 let endpoint = component.create_endpoint(&uri).unwrap();
1620 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1621
1622 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1623 Ok(Bytes::from("partial")),
1624 Err(CamelError::ProcessorError(
1625 "simulated stream error".to_string(),
1626 )),
1627 ];
1628 let stream = futures::stream::iter(chunks);
1629 let body = Body::Stream(camel_api::body::StreamBody {
1630 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1631 metadata: camel_api::body::StreamMetadata {
1632 size_hint: None,
1633 content_type: None,
1634 origin: None,
1635 },
1636 });
1637
1638 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1639 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1640 assert!(
1641 result.is_err(),
1642 "expected error when stream fails mid-write"
1643 );
1644
1645 assert!(
1647 !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1648 "partial file must not exist after failed write"
1649 );
1650
1651 assert!(
1653 !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1654 "temp file must be cleaned up after failed write"
1655 );
1656 }
1657
1658 #[tokio::test]
1659 async fn test_producer_stream_append() {
1660 let dir = tempfile::tempdir().unwrap();
1661 let dir_path = dir.path().to_str().unwrap();
1662 let target = format!("{dir_path}/out.txt");
1663
1664 tokio::fs::write(&target, b"line1\n").await.unwrap();
1666
1667 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1668 let component = FileComponent::new();
1669 let endpoint = component.create_endpoint(&uri).unwrap();
1670 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1671
1672 let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1673 let stream = futures::stream::iter(chunks);
1674 let body = Body::Stream(camel_api::body::StreamBody {
1675 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1676 metadata: camel_api::body::StreamMetadata {
1677 size_hint: None,
1678 content_type: None,
1679 origin: None,
1680 },
1681 });
1682
1683 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1684 tower::ServiceExt::oneshot(producer, exchange)
1685 .await
1686 .unwrap();
1687
1688 let content = tokio::fs::read_to_string(&target).await.unwrap();
1689 assert_eq!(content, "line1\nline2\n");
1690 }
1691
1692 #[tokio::test]
1693 async fn test_producer_stream_append_partial_on_error() {
1694 let dir = tempfile::tempdir().unwrap();
1697 let dir_path = dir.path().to_str().unwrap();
1698 let target = format!("{dir_path}/out.txt");
1699
1700 tokio::fs::write(&target, b"initial\n").await.unwrap();
1702
1703 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1704 let component = FileComponent::new();
1705 let endpoint = component.create_endpoint(&uri).unwrap();
1706 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1707
1708 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1710 Ok(Bytes::from("partial-")), Err(CamelError::ProcessorError("stream error".to_string())), Ok(Bytes::from("never-written")), ];
1714 let stream = futures::stream::iter(chunks);
1715 let body = Body::Stream(camel_api::body::StreamBody {
1716 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1717 metadata: camel_api::body::StreamMetadata {
1718 size_hint: None,
1719 content_type: None,
1720 origin: None,
1721 },
1722 });
1723
1724 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1725 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1726
1727 assert!(
1729 result.is_err(),
1730 "expected error when stream fails during append"
1731 );
1732
1733 let content = tokio::fs::read_to_string(&target).await.unwrap();
1735 assert_eq!(
1736 content, "initial\npartial-",
1737 "append leaves partial data on stream error (non-atomic by nature)"
1738 );
1739 }
1740
1741 #[tokio::test]
1742 async fn test_producer_stream_already_consumed_errors() {
1743 let dir = tempfile::tempdir().unwrap();
1744 let dir_path = dir.path().to_str().unwrap();
1745 let uri = format!("file:{dir_path}?fileName=out.txt");
1746
1747 let component = FileComponent::new();
1748 let endpoint = component.create_endpoint(&uri).unwrap();
1749 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1750
1751 type MaybeStream = std::sync::Arc<
1753 tokio::sync::Mutex<
1754 Option<
1755 std::pin::Pin<
1756 Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1757 >,
1758 >,
1759 >,
1760 >;
1761 let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1762 let body = Body::Stream(camel_api::body::StreamBody {
1763 stream: arc,
1764 metadata: camel_api::body::StreamMetadata {
1765 size_hint: None,
1766 content_type: None,
1767 origin: None,
1768 },
1769 });
1770
1771 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1772 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1773 assert!(
1774 result.is_err(),
1775 "expected error for already-consumed stream"
1776 );
1777 }
1778
1779 #[test]
1784 fn test_global_config_applied_to_endpoint() {
1785 let global = FileGlobalConfig::default()
1787 .with_delay_ms(2000)
1788 .with_initial_delay_ms(5000)
1789 .with_read_timeout_ms(60_000)
1790 .with_write_timeout_ms(45_000);
1791 let component = FileComponent::with_config(global);
1792 let endpoint = component.create_endpoint("file:/tmp/inbox").unwrap();
1794 let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1797 let global2 = FileGlobalConfig::default()
1798 .with_delay_ms(2000)
1799 .with_initial_delay_ms(5000)
1800 .with_read_timeout_ms(60_000)
1801 .with_write_timeout_ms(45_000);
1802 config.apply_global_defaults(&global2);
1803 assert_eq!(config.delay, Duration::from_millis(2000));
1804 assert_eq!(config.initial_delay, Duration::from_millis(5000));
1805 assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1806 assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1807 let _ = endpoint; }
1810
1811 #[test]
1812 fn test_uri_param_wins_over_global_config() {
1813 let mut config =
1815 FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1816 let global = FileGlobalConfig::default()
1818 .with_delay_ms(3000)
1819 .with_initial_delay_ms(4000);
1820 config.apply_global_defaults(&global);
1821 assert_eq!(config.delay, Duration::from_millis(1000));
1823 assert_eq!(config.initial_delay, Duration::from_millis(2000));
1825 assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1828 }
1829}