1use std::future::Future;
2use std::path::PathBuf;
3use std::pin::Pin;
4use std::str::FromStr;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use regex::Regex;
11use tokio::fs;
12use tokio::fs::OpenOptions;
13use tokio::io;
14use tokio::io::AsyncWriteExt;
15use tokio::time;
16use tokio_util::io::ReaderStream;
17use tower::Service;
18use tracing::{debug, warn};
19
20use camel_api::{
21 BoxProcessor, CamelError, Exchange, Message, body::Body, body::StreamBody, body::StreamMetadata,
22};
23use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
24use camel_endpoint::{UriConfig, parse_uri};
25
26struct TempFileGuard {
35 path: PathBuf,
36 disarm: bool,
37}
38
39impl TempFileGuard {
40 fn new(path: PathBuf) -> Self {
41 Self {
42 path,
43 disarm: false,
44 }
45 }
46
47 fn disarm(&mut self) {
49 self.disarm = true;
50 }
51}
52
53impl Drop for TempFileGuard {
54 fn drop(&mut self) {
55 if !self.disarm {
56 let _ = std::fs::remove_file(&self.path);
58 }
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
68pub enum FileExistStrategy {
69 #[default]
71 Override,
72 Append,
74 Fail,
76}
77
78impl FromStr for FileExistStrategy {
79 type Err = String;
80
81 fn from_str(s: &str) -> Result<Self, Self::Err> {
82 match s {
83 "Override" | "override" => Ok(FileExistStrategy::Override),
84 "Append" | "append" => Ok(FileExistStrategy::Append),
85 "Fail" | "fail" => Ok(FileExistStrategy::Fail),
86 _ => Ok(FileExistStrategy::Override), }
88 }
89}
90
91#[derive(Debug, Clone, PartialEq)]
99pub struct FileGlobalConfig {
100 pub delay_ms: u64,
101 pub initial_delay_ms: u64,
102 pub read_timeout_ms: u64,
103 pub write_timeout_ms: u64,
104}
105
106impl Default for FileGlobalConfig {
107 fn default() -> Self {
108 Self {
109 delay_ms: 500,
110 initial_delay_ms: 1_000,
111 read_timeout_ms: 30_000,
112 write_timeout_ms: 30_000,
113 }
114 }
115}
116
117impl FileGlobalConfig {
118 pub fn new() -> Self {
119 Self::default()
120 }
121 pub fn with_delay_ms(mut self, v: u64) -> Self {
122 self.delay_ms = v;
123 self
124 }
125 pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
126 self.initial_delay_ms = v;
127 self
128 }
129 pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
130 self.read_timeout_ms = v;
131 self
132 }
133 pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
134 self.write_timeout_ms = v;
135 self
136 }
137}
138
139#[derive(Debug, Clone, UriConfig)]
166#[uri_scheme = "file"]
167#[uri_config(skip_impl)]
168pub struct FileConfig {
169 pub directory: String,
171
172 #[allow(dead_code)]
174 #[uri_param(name = "delay", default = "500")]
175 delay_ms: u64,
176
177 pub delay: Duration,
179
180 #[allow(dead_code)]
182 #[uri_param(name = "initialDelay", default = "1000")]
183 initial_delay_ms: u64,
184
185 pub initial_delay: Duration,
187
188 #[uri_param(default = "false")]
190 pub noop: bool,
191
192 #[uri_param(default = "false")]
194 pub delete: bool,
195
196 #[uri_param(name = "move")]
199 move_to: Option<String>,
200
201 #[uri_param(name = "fileName")]
203 pub file_name: Option<String>,
204
205 #[uri_param]
207 pub include: Option<String>,
208
209 #[uri_param]
211 pub exclude: Option<String>,
212
213 #[uri_param(default = "false")]
215 pub recursive: bool,
216
217 #[uri_param(name = "fileExist", default = "Override")]
219 pub file_exist: FileExistStrategy,
220
221 #[uri_param(name = "tempPrefix")]
223 pub temp_prefix: Option<String>,
224
225 #[uri_param(name = "autoCreate", default = "true")]
227 pub auto_create: bool,
228
229 #[allow(dead_code)]
231 #[uri_param(name = "readTimeout", default = "30000")]
232 read_timeout_ms: u64,
233
234 pub read_timeout: Duration,
236
237 #[allow(dead_code)]
239 #[uri_param(name = "writeTimeout", default = "30000")]
240 write_timeout_ms: u64,
241
242 pub write_timeout: Duration,
244}
245
246impl UriConfig for FileConfig {
247 fn scheme() -> &'static str {
248 "file"
249 }
250
251 fn from_uri(uri: &str) -> Result<Self, CamelError> {
252 let parts = parse_uri(uri)?;
253 Self::from_components(parts)
254 }
255
256 fn from_components(parts: camel_endpoint::UriComponents) -> Result<Self, CamelError> {
257 Self::parse_uri_components(parts)?.validate()
258 }
259
260 fn validate(self) -> Result<Self, CamelError> {
261 let move_to = if self.noop || self.delete {
265 None
266 } else {
267 Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
268 };
269
270 Ok(Self { move_to, ..self })
271 }
272}
273
274impl FileConfig {
275 pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
283 if self.delay == Duration::from_millis(500) {
284 self.delay = Duration::from_millis(global.delay_ms);
285 }
286 if self.initial_delay == Duration::from_millis(1_000) {
287 self.initial_delay = Duration::from_millis(global.initial_delay_ms);
288 }
289 if self.read_timeout == Duration::from_millis(30_000) {
290 self.read_timeout = Duration::from_millis(global.read_timeout_ms);
291 }
292 if self.write_timeout == Duration::from_millis(30_000) {
293 self.write_timeout = Duration::from_millis(global.write_timeout_ms);
294 }
295 }
296}
297
298pub struct FileComponent {
303 config: Option<FileGlobalConfig>,
304}
305
306impl FileComponent {
307 pub fn new() -> Self {
308 Self { config: None }
309 }
310
311 pub fn with_config(config: FileGlobalConfig) -> Self {
312 Self {
313 config: Some(config),
314 }
315 }
316
317 pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
318 Self { config }
319 }
320}
321
322impl Default for FileComponent {
323 fn default() -> Self {
324 Self::new()
325 }
326}
327
328impl Component for FileComponent {
329 fn scheme(&self) -> &str {
330 "file"
331 }
332
333 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
334 let mut config = FileConfig::from_uri(uri)?;
335 if let Some(ref global_config) = self.config {
336 config.apply_global_defaults(global_config);
337 }
338 Ok(Box::new(FileEndpoint {
339 uri: uri.to_string(),
340 config,
341 }))
342 }
343}
344
345struct FileEndpoint {
350 uri: String,
351 config: FileConfig,
352}
353
354impl Endpoint for FileEndpoint {
355 fn uri(&self) -> &str {
356 &self.uri
357 }
358
359 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
360 Ok(Box::new(FileConsumer {
361 config: self.config.clone(),
362 }))
363 }
364
365 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
366 Ok(BoxProcessor::new(FileProducer {
367 config: self.config.clone(),
368 }))
369 }
370}
371
372struct FileConsumer {
377 config: FileConfig,
378}
379
380#[async_trait]
381impl Consumer for FileConsumer {
382 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
383 let config = self.config.clone();
384
385 let include_re = config
386 .include
387 .as_ref()
388 .map(|p| Regex::new(p))
389 .transpose()
390 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
391 let exclude_re = config
392 .exclude
393 .as_ref()
394 .map(|p| Regex::new(p))
395 .transpose()
396 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
397
398 if !config.initial_delay.is_zero() {
399 tokio::select! {
400 _ = time::sleep(config.initial_delay) => {}
401 _ = context.cancelled() => {
402 debug!(directory = config.directory, "File consumer cancelled during initial delay");
403 return Ok(());
404 }
405 }
406 }
407
408 let mut interval = time::interval(config.delay);
409
410 loop {
411 tokio::select! {
412 _ = context.cancelled() => {
413 debug!(directory = config.directory, "File consumer received cancellation, stopping");
414 break;
415 }
416 _ = interval.tick() => {
417 if let Err(e) = poll_directory(
418 &config,
419 &context,
420 &include_re,
421 &exclude_re,
422 ).await {
423 warn!(directory = config.directory, error = %e, "Error polling directory");
424 }
425 }
426 }
427 }
428
429 Ok(())
430 }
431
432 async fn stop(&mut self) -> Result<(), CamelError> {
433 Ok(())
434 }
435}
436
437async fn poll_directory(
438 config: &FileConfig,
439 context: &ConsumerContext,
440 include_re: &Option<Regex>,
441 exclude_re: &Option<Regex>,
442) -> Result<(), CamelError> {
443 let base_path = std::path::Path::new(&config.directory);
444
445 let files = list_files(base_path, config.recursive).await?;
446
447 for file_path in files {
448 let file_name = file_path
449 .file_name()
450 .and_then(|n| n.to_str())
451 .unwrap_or_default()
452 .to_string();
453
454 if let Some(ref target_name) = config.file_name
455 && file_name != *target_name
456 {
457 continue;
458 }
459
460 if let Some(re) = include_re
461 && !re.is_match(&file_name)
462 {
463 continue;
464 }
465
466 if let Some(re) = exclude_re
467 && re.is_match(&file_name)
468 {
469 continue;
470 }
471
472 if let Some(ref move_dir) = config.move_to
473 && file_path.starts_with(base_path.join(move_dir))
474 {
475 continue;
476 }
477
478 let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
479 let f = fs::File::open(&file_path).await?;
480 let m = f.metadata().await?;
481 Ok::<_, std::io::Error>((f, m))
482 })
483 .await
484 {
485 Ok(Ok((f, m))) => (f, Some(m)),
486 Ok(Err(e)) => {
487 warn!(
488 file = %file_path.display(),
489 error = %e,
490 "Failed to open file"
491 );
492 continue;
493 }
494 Err(_) => {
495 warn!(
496 file = %file_path.display(),
497 timeout_ms = config.read_timeout.as_millis(),
498 "Timeout opening file"
499 );
500 continue;
501 }
502 };
503
504 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
505 let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
506
507 let last_modified = metadata
508 .as_ref()
509 .and_then(|m| m.modified().ok())
510 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
511 .map(|d| d.as_millis() as u64)
512 .unwrap_or(0);
513
514 let relative_path = file_path
515 .strip_prefix(base_path)
516 .unwrap_or(&file_path)
517 .to_string_lossy()
518 .to_string();
519
520 let absolute_path = file_path
521 .canonicalize()
522 .unwrap_or_else(|_| file_path.clone())
523 .to_string_lossy()
524 .to_string();
525
526 let body = Body::Stream(StreamBody {
527 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
528 metadata: StreamMetadata {
529 size_hint: Some(file_len),
530 content_type: None,
531 origin: Some(absolute_path.clone()),
532 },
533 });
534
535 let mut exchange = Exchange::new(Message::new(body));
536 exchange
537 .input
538 .set_header("CamelFileName", serde_json::Value::String(relative_path));
539 exchange.input.set_header(
540 "CamelFileNameOnly",
541 serde_json::Value::String(file_name.clone()),
542 );
543 exchange.input.set_header(
544 "CamelFileAbsolutePath",
545 serde_json::Value::String(absolute_path),
546 );
547 exchange.input.set_header(
548 "CamelFileLength",
549 serde_json::Value::Number(file_len.into()),
550 );
551 exchange.input.set_header(
552 "CamelFileLastModified",
553 serde_json::Value::Number(last_modified.into()),
554 );
555
556 debug!(
557 file = %file_path.display(),
558 correlation_id = %exchange.correlation_id(),
559 "Processing file"
560 );
561
562 if context.send(exchange).await.is_err() {
563 break;
564 }
565
566 if config.noop {
567 } else if config.delete {
569 if let Err(e) = fs::remove_file(&file_path).await {
570 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
571 }
572 } else if let Some(ref move_dir) = config.move_to {
573 let target_dir = base_path.join(move_dir);
574 if let Err(e) = fs::create_dir_all(&target_dir).await {
575 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
576 continue;
577 }
578 let target_path = target_dir.join(&file_name);
579 if let Err(e) = fs::rename(&file_path, &target_path).await {
580 warn!(
581 from = %file_path.display(),
582 to = %target_path.display(),
583 error = %e,
584 "Failed to move file"
585 );
586 }
587 }
588 }
589
590 Ok(())
591}
592
593async fn list_files(
594 dir: &std::path::Path,
595 recursive: bool,
596) -> Result<Vec<std::path::PathBuf>, CamelError> {
597 let mut files = Vec::new();
598 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
599
600 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
601 let path = entry.path();
602 if path.is_file() {
603 files.push(path);
604 } else if path.is_dir() && recursive {
605 let mut sub_files = Box::pin(list_files(&path, true)).await?;
606 files.append(&mut sub_files);
607 }
608 }
609
610 files.sort();
611 Ok(files)
612}
613
614fn validate_path_is_within_base(
619 base_dir: &std::path::Path,
620 target_path: &std::path::Path,
621) -> Result<(), CamelError> {
622 let canonical_base = base_dir.canonicalize().map_err(|e| {
623 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
624 })?;
625
626 let canonical_target = if target_path.exists() {
628 target_path.canonicalize().map_err(|e| {
629 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
630 })?
631 } else if let Some(parent) = target_path.parent() {
632 if !parent.exists() {
634 return Err(CamelError::ProcessorError(format!(
635 "Parent directory '{}' does not exist",
636 parent.display()
637 )));
638 }
639 let canonical_parent = parent.canonicalize().map_err(|e| {
640 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
641 })?;
642 if let Some(filename) = target_path.file_name() {
644 canonical_parent.join(filename)
645 } else {
646 return Err(CamelError::ProcessorError(
647 "Invalid target path: no filename".to_string(),
648 ));
649 }
650 } else {
651 return Err(CamelError::ProcessorError(
652 "Invalid target path: no parent directory".to_string(),
653 ));
654 };
655
656 if !canonical_target.starts_with(&canonical_base) {
657 return Err(CamelError::ProcessorError(format!(
658 "Path '{}' is outside base directory '{}'",
659 canonical_target.display(),
660 canonical_base.display()
661 )));
662 }
663
664 Ok(())
665}
666
667#[derive(Clone)]
672struct FileProducer {
673 config: FileConfig,
674}
675
676impl FileProducer {
677 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
678 if let Some(name) = exchange
679 .input
680 .header("CamelFileName")
681 .and_then(|v| v.as_str())
682 {
683 return Ok(name.to_string());
684 }
685 if let Some(ref name) = config.file_name {
686 return Ok(name.clone());
687 }
688 Err(CamelError::ProcessorError(
689 "No filename specified: set CamelFileName header or fileName option".to_string(),
690 ))
691 }
692}
693
694impl Service<Exchange> for FileProducer {
695 type Response = Exchange;
696 type Error = CamelError;
697 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
698
699 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
700 Poll::Ready(Ok(()))
701 }
702
703 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
704 let config = self.config.clone();
705
706 Box::pin(async move {
707 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
708 let body = exchange.input.body.clone();
709
710 let dir_path = std::path::Path::new(&config.directory);
711 let target_path = dir_path.join(&file_name);
712
713 if config.auto_create
715 && let Some(parent) = target_path.parent()
716 {
717 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
718 .await
719 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
720 .map_err(CamelError::from)?;
721 }
722
723 validate_path_is_within_base(dir_path, &target_path)?;
725
726 match config.file_exist {
728 FileExistStrategy::Fail if target_path.exists() => {
729 return Err(CamelError::ProcessorError(format!(
730 "File already exists: {}",
731 target_path.display()
732 )));
733 }
734 FileExistStrategy::Append => {
735 let mut file = tokio::time::timeout(
737 config.write_timeout,
738 OpenOptions::new()
739 .append(true)
740 .create(true)
741 .open(&target_path),
742 )
743 .await
744 .map_err(|_| {
745 CamelError::ProcessorError("Timeout opening file for append".into())
746 })?
747 .map_err(CamelError::from)?;
748
749 tokio::time::timeout(
750 config.write_timeout,
751 io::copy(&mut body.into_async_read(), &mut file),
752 )
753 .await
754 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
755 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
756
757 file.flush().await.map_err(CamelError::from)?;
758 }
759 _ => {
760 let temp_name = if let Some(ref prefix) = config.temp_prefix {
762 format!("{prefix}{file_name}")
763 } else {
764 format!(".tmp.{file_name}")
765 };
766 let temp_path = dir_path.join(&temp_name);
767
768 let mut guard = TempFileGuard::new(temp_path.clone());
770
771 let mut file =
773 tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
774 .await
775 .map_err(|_| {
776 CamelError::ProcessorError("Timeout creating temp file".into())
777 })?
778 .map_err(CamelError::from)?;
779
780 let copy_result = tokio::time::timeout(
781 config.write_timeout,
782 io::copy(&mut body.into_async_read(), &mut file),
783 )
784 .await;
785
786 let _ = file.flush().await;
788
789 match copy_result {
790 Ok(Ok(_)) => {}
791 Ok(Err(e)) => {
792 return Err(CamelError::ProcessorError(e.to_string()));
794 }
795 Err(_) => {
796 return Err(CamelError::ProcessorError("Timeout writing file".into()));
798 }
799 }
800
801 let rename_result = tokio::time::timeout(
803 config.write_timeout,
804 fs::rename(&temp_path, &target_path),
805 )
806 .await;
807
808 match rename_result {
809 Ok(Ok(_)) => {
810 guard.disarm();
812 }
813 Ok(Err(e)) => {
814 return Err(CamelError::from(e));
816 }
817 Err(_) => {
818 return Err(CamelError::ProcessorError("Timeout renaming file".into()));
820 }
821 }
822 }
823 }
824
825 let abs_path = target_path
827 .canonicalize()
828 .unwrap_or_else(|_| target_path.clone())
829 .to_string_lossy()
830 .to_string();
831 exchange
832 .input
833 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
834
835 debug!(
836 file = %target_path.display(),
837 correlation_id = %exchange.correlation_id(),
838 "File written"
839 );
840
841 Ok(exchange)
842 })
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849 use bytes::Bytes;
850 use std::time::Duration;
851 use tokio_util::sync::CancellationToken;
852
853 fn test_producer_ctx() -> ProducerContext {
854 ProducerContext::new()
855 }
856
857 #[test]
858 fn test_file_config_defaults() {
859 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
860 assert_eq!(config.directory, "/tmp/inbox");
861 assert_eq!(config.delay, Duration::from_millis(500));
862 assert_eq!(config.initial_delay, Duration::from_millis(1000));
863 assert!(!config.noop);
864 assert!(!config.delete);
865 assert_eq!(config.move_to, Some(".camel".to_string()));
866 assert!(config.file_name.is_none());
867 assert!(config.include.is_none());
868 assert!(config.exclude.is_none());
869 assert!(!config.recursive);
870 assert_eq!(config.file_exist, FileExistStrategy::Override);
871 assert!(config.temp_prefix.is_none());
872 assert!(config.auto_create);
873 assert_eq!(config.read_timeout, Duration::from_secs(30));
875 assert_eq!(config.write_timeout, Duration::from_secs(30));
876 }
877
878 #[test]
879 fn test_file_config_consumer_options() {
880 let config = FileConfig::from_uri(
881 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
882 ).unwrap();
883 assert_eq!(config.directory, "/data/input");
884 assert_eq!(config.delay, Duration::from_millis(1000));
885 assert_eq!(config.initial_delay, Duration::from_millis(2000));
886 assert!(config.noop);
887 assert!(config.recursive);
888 assert_eq!(config.include, Some(".*\\.csv".to_string()));
889 }
890
891 #[test]
892 fn test_file_config_producer_options() {
893 let config = FileConfig::from_uri(
894 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
895 )
896 .unwrap();
897 assert_eq!(config.file_exist, FileExistStrategy::Append);
898 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
899 assert!(!config.auto_create);
900 assert_eq!(config.file_name, Some("out.txt".to_string()));
901 }
902
903 #[test]
904 fn test_file_config_delete_mode() {
905 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
906 assert!(config.delete);
907 assert!(config.move_to.is_none());
908 }
909
910 #[test]
911 fn test_file_config_noop_mode() {
912 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
913 assert!(config.noop);
914 assert!(config.move_to.is_none());
915 }
916
917 #[test]
918 fn test_file_config_wrong_scheme() {
919 let result = FileConfig::from_uri("timer:tick");
920 assert!(result.is_err());
921 }
922
923 #[test]
924 fn test_file_component_scheme() {
925 let component = FileComponent::new();
926 assert_eq!(component.scheme(), "file");
927 }
928
929 #[test]
930 fn test_file_component_creates_endpoint() {
931 let component = FileComponent::new();
932 let endpoint = component.create_endpoint("file:/tmp/test");
933 assert!(endpoint.is_ok());
934 }
935
936 #[tokio::test]
941 async fn test_file_consumer_reads_files() {
942 let dir = tempfile::tempdir().unwrap();
943 let dir_path = dir.path().to_str().unwrap();
944
945 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
946 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
947
948 let component = FileComponent::new();
949 let endpoint = component
950 .create_endpoint(&format!(
951 "file:{dir_path}?noop=true&initialDelay=0&delay=100"
952 ))
953 .unwrap();
954 let mut consumer = endpoint.create_consumer().unwrap();
955
956 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
957 let token = CancellationToken::new();
958 let ctx = ConsumerContext::new(tx, token.clone());
959
960 tokio::spawn(async move {
961 consumer.start(ctx).await.unwrap();
962 });
963
964 let mut received = Vec::new();
965 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
966 while let Some(envelope) = rx.recv().await {
967 received.push(envelope.exchange);
968 if received.len() == 2 {
969 break;
970 }
971 }
972 })
973 .await;
974 token.cancel();
975
976 assert!(timeout.is_ok(), "Should have received 2 exchanges");
977 assert_eq!(received.len(), 2);
978
979 for ex in &received {
980 assert!(ex.input.header("CamelFileName").is_some());
981 assert!(ex.input.header("CamelFileNameOnly").is_some());
982 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
983 assert!(ex.input.header("CamelFileLength").is_some());
984 assert!(ex.input.header("CamelFileLastModified").is_some());
985 }
986 }
987
988 #[tokio::test]
989 async fn test_file_consumer_include_filter() {
990 let dir = tempfile::tempdir().unwrap();
991 let dir_path = dir.path().to_str().unwrap();
992
993 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
994 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
995
996 let component = FileComponent::new();
997 let endpoint = component
998 .create_endpoint(&format!(
999 "file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"
1000 ))
1001 .unwrap();
1002 let mut consumer = endpoint.create_consumer().unwrap();
1003
1004 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1005 let token = CancellationToken::new();
1006 let ctx = ConsumerContext::new(tx, token.clone());
1007
1008 tokio::spawn(async move {
1009 consumer.start(ctx).await.unwrap();
1010 });
1011
1012 let mut received = Vec::new();
1013 let _ = tokio::time::timeout(Duration::from_millis(500), async {
1014 while let Some(envelope) = rx.recv().await {
1015 received.push(envelope.exchange);
1016 if received.len() == 1 {
1017 break;
1018 }
1019 }
1020 })
1021 .await;
1022 token.cancel();
1023
1024 assert_eq!(received.len(), 1);
1025 let name = received[0]
1026 .input
1027 .header("CamelFileNameOnly")
1028 .and_then(|v| v.as_str())
1029 .unwrap();
1030 assert_eq!(name, "data.csv");
1031 }
1032
1033 #[tokio::test]
1034 async fn test_file_consumer_delete_mode() {
1035 let dir = tempfile::tempdir().unwrap();
1036 let dir_path = dir.path().to_str().unwrap();
1037
1038 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1039
1040 let component = FileComponent::new();
1041 let endpoint = component
1042 .create_endpoint(&format!(
1043 "file:{dir_path}?delete=true&initialDelay=0&delay=100"
1044 ))
1045 .unwrap();
1046 let mut consumer = endpoint.create_consumer().unwrap();
1047
1048 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1049 let token = CancellationToken::new();
1050 let ctx = ConsumerContext::new(tx, token.clone());
1051
1052 tokio::spawn(async move {
1053 consumer.start(ctx).await.unwrap();
1054 });
1055
1056 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1057 token.cancel();
1058
1059 tokio::time::sleep(Duration::from_millis(100)).await;
1060
1061 assert!(
1062 !dir.path().join("deleteme.txt").exists(),
1063 "File should be deleted"
1064 );
1065 }
1066
1067 #[tokio::test]
1068 async fn test_file_consumer_move_mode() {
1069 let dir = tempfile::tempdir().unwrap();
1070 let dir_path = dir.path().to_str().unwrap();
1071
1072 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1073
1074 let component = FileComponent::new();
1075 let endpoint = component
1076 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"))
1077 .unwrap();
1078 let mut consumer = endpoint.create_consumer().unwrap();
1079
1080 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1081 let token = CancellationToken::new();
1082 let ctx = ConsumerContext::new(tx, token.clone());
1083
1084 tokio::spawn(async move {
1085 consumer.start(ctx).await.unwrap();
1086 });
1087
1088 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1089 token.cancel();
1090
1091 tokio::time::sleep(Duration::from_millis(100)).await;
1092
1093 assert!(
1094 !dir.path().join("moveme.txt").exists(),
1095 "Original file should be gone"
1096 );
1097 assert!(
1098 dir.path().join(".camel").join("moveme.txt").exists(),
1099 "File should be in .camel/"
1100 );
1101 }
1102
1103 #[tokio::test]
1104 async fn test_file_consumer_respects_cancellation() {
1105 let dir = tempfile::tempdir().unwrap();
1106 let dir_path = dir.path().to_str().unwrap();
1107
1108 let component = FileComponent::new();
1109 let endpoint = component
1110 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"))
1111 .unwrap();
1112 let mut consumer = endpoint.create_consumer().unwrap();
1113
1114 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1115 let token = CancellationToken::new();
1116 let ctx = ConsumerContext::new(tx, token.clone());
1117
1118 let handle = tokio::spawn(async move {
1119 consumer.start(ctx).await.unwrap();
1120 });
1121
1122 tokio::time::sleep(Duration::from_millis(150)).await;
1123 token.cancel();
1124
1125 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1126 assert!(
1127 result.is_ok(),
1128 "Consumer should have stopped after cancellation"
1129 );
1130 }
1131
1132 #[tokio::test]
1137 async fn test_file_producer_writes_file() {
1138 use tower::ServiceExt;
1139
1140 let dir = tempfile::tempdir().unwrap();
1141 let dir_path = dir.path().to_str().unwrap();
1142
1143 let component = FileComponent::new();
1144 let endpoint = component
1145 .create_endpoint(&format!("file:{dir_path}"))
1146 .unwrap();
1147 let ctx = test_producer_ctx();
1148 let producer = endpoint.create_producer(&ctx).unwrap();
1149
1150 let mut exchange = Exchange::new(Message::new("file content"));
1151 exchange.input.set_header(
1152 "CamelFileName",
1153 serde_json::Value::String("output.txt".to_string()),
1154 );
1155
1156 let result = producer.oneshot(exchange).await.unwrap();
1157
1158 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1159 assert_eq!(content, "file content");
1160
1161 assert!(result.input.header("CamelFileNameProduced").is_some());
1162 }
1163
1164 #[tokio::test]
1165 async fn test_file_producer_auto_create_dirs() {
1166 use tower::ServiceExt;
1167
1168 let dir = tempfile::tempdir().unwrap();
1169 let dir_path = dir.path().to_str().unwrap();
1170
1171 let component = FileComponent::new();
1172 let endpoint = component
1173 .create_endpoint(&format!("file:{dir_path}/sub/dir"))
1174 .unwrap();
1175 let ctx = test_producer_ctx();
1176 let producer = endpoint.create_producer(&ctx).unwrap();
1177
1178 let mut exchange = Exchange::new(Message::new("nested"));
1179 exchange.input.set_header(
1180 "CamelFileName",
1181 serde_json::Value::String("file.txt".to_string()),
1182 );
1183
1184 producer.oneshot(exchange).await.unwrap();
1185
1186 assert!(dir.path().join("sub/dir/file.txt").exists());
1187 }
1188
1189 #[tokio::test]
1190 async fn test_file_producer_file_exist_fail() {
1191 use tower::ServiceExt;
1192
1193 let dir = tempfile::tempdir().unwrap();
1194 let dir_path = dir.path().to_str().unwrap();
1195
1196 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1197
1198 let component = FileComponent::new();
1199 let endpoint = component
1200 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"))
1201 .unwrap();
1202 let ctx = test_producer_ctx();
1203 let producer = endpoint.create_producer(&ctx).unwrap();
1204
1205 let mut exchange = Exchange::new(Message::new("new"));
1206 exchange.input.set_header(
1207 "CamelFileName",
1208 serde_json::Value::String("existing.txt".to_string()),
1209 );
1210
1211 let result = producer.oneshot(exchange).await;
1212 assert!(
1213 result.is_err(),
1214 "Should fail when file exists with Fail strategy"
1215 );
1216 }
1217
1218 #[tokio::test]
1219 async fn test_file_producer_file_exist_append() {
1220 use tower::ServiceExt;
1221
1222 let dir = tempfile::tempdir().unwrap();
1223 let dir_path = dir.path().to_str().unwrap();
1224
1225 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1226
1227 let component = FileComponent::new();
1228 let endpoint = component
1229 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"))
1230 .unwrap();
1231 let ctx = test_producer_ctx();
1232 let producer = endpoint.create_producer(&ctx).unwrap();
1233
1234 let mut exchange = Exchange::new(Message::new("new"));
1235 exchange.input.set_header(
1236 "CamelFileName",
1237 serde_json::Value::String("append.txt".to_string()),
1238 );
1239
1240 producer.oneshot(exchange).await.unwrap();
1241
1242 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1243 assert_eq!(content, "oldnew");
1244 }
1245
1246 #[tokio::test]
1247 async fn test_file_producer_temp_prefix() {
1248 use tower::ServiceExt;
1249
1250 let dir = tempfile::tempdir().unwrap();
1251 let dir_path = dir.path().to_str().unwrap();
1252
1253 let component = FileComponent::new();
1254 let endpoint = component
1255 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"))
1256 .unwrap();
1257 let ctx = test_producer_ctx();
1258 let producer = endpoint.create_producer(&ctx).unwrap();
1259
1260 let mut exchange = Exchange::new(Message::new("atomic write"));
1261 exchange.input.set_header(
1262 "CamelFileName",
1263 serde_json::Value::String("final.txt".to_string()),
1264 );
1265
1266 producer.oneshot(exchange).await.unwrap();
1267
1268 assert!(dir.path().join("final.txt").exists());
1269 assert!(!dir.path().join(".tmpfinal.txt").exists());
1270 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1271 assert_eq!(content, "atomic write");
1272 }
1273
1274 #[tokio::test]
1275 async fn test_file_producer_uses_filename_option() {
1276 use tower::ServiceExt;
1277
1278 let dir = tempfile::tempdir().unwrap();
1279 let dir_path = dir.path().to_str().unwrap();
1280
1281 let component = FileComponent::new();
1282 let endpoint = component
1283 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"))
1284 .unwrap();
1285 let ctx = test_producer_ctx();
1286 let producer = endpoint.create_producer(&ctx).unwrap();
1287
1288 let exchange = Exchange::new(Message::new("content"));
1289
1290 producer.oneshot(exchange).await.unwrap();
1291 assert!(dir.path().join("fixed.txt").exists());
1292 }
1293
1294 #[tokio::test]
1295 async fn test_file_producer_no_filename_errors() {
1296 use tower::ServiceExt;
1297
1298 let dir = tempfile::tempdir().unwrap();
1299 let dir_path = dir.path().to_str().unwrap();
1300
1301 let component = FileComponent::new();
1302 let endpoint = component
1303 .create_endpoint(&format!("file:{dir_path}"))
1304 .unwrap();
1305 let ctx = test_producer_ctx();
1306 let producer = endpoint.create_producer(&ctx).unwrap();
1307
1308 let exchange = Exchange::new(Message::new("content"));
1309
1310 let result = producer.oneshot(exchange).await;
1311 assert!(result.is_err(), "Should error when no filename is provided");
1312 }
1313
1314 #[tokio::test]
1319 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1320 use tower::ServiceExt;
1321
1322 let dir = tempfile::tempdir().unwrap();
1323 let dir_path = dir.path().to_str().unwrap();
1324
1325 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1327 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1328
1329 let component = FileComponent::new();
1330 let endpoint = component
1331 .create_endpoint(&format!("file:{dir_path}/subdir"))
1332 .unwrap();
1333 let ctx = test_producer_ctx();
1334 let producer = endpoint.create_producer(&ctx).unwrap();
1335
1336 let mut exchange = Exchange::new(Message::new("malicious"));
1337 exchange.input.set_header(
1338 "CamelFileName",
1339 serde_json::Value::String("../secret.txt".to_string()),
1340 );
1341
1342 let result = producer.oneshot(exchange).await;
1343 assert!(result.is_err(), "Should reject path traversal attempt");
1344
1345 let err = result.unwrap_err();
1346 assert!(
1347 err.to_string().contains("outside"),
1348 "Error should mention path is outside base directory"
1349 );
1350 }
1351
1352 #[tokio::test]
1353 async fn test_file_producer_rejects_absolute_path_outside_base() {
1354 use tower::ServiceExt;
1355
1356 let dir = tempfile::tempdir().unwrap();
1357 let dir_path = dir.path().to_str().unwrap();
1358
1359 let component = FileComponent::new();
1360 let endpoint = component
1361 .create_endpoint(&format!("file:{dir_path}"))
1362 .unwrap();
1363 let ctx = test_producer_ctx();
1364 let producer = endpoint.create_producer(&ctx).unwrap();
1365
1366 let mut exchange = Exchange::new(Message::new("malicious"));
1367 exchange.input.set_header(
1368 "CamelFileName",
1369 serde_json::Value::String("/etc/passwd".to_string()),
1370 );
1371
1372 let result = producer.oneshot(exchange).await;
1373 assert!(result.is_err(), "Should reject absolute path outside base");
1374 }
1375
1376 #[tokio::test]
1381 #[ignore] async fn test_large_file_streaming_constant_memory() {
1383 use std::io::Write;
1384 use tempfile::NamedTempFile;
1385
1386 let mut temp_file = NamedTempFile::new().unwrap();
1388 let file_size = 150 * 1024 * 1024; let chunk = vec![b'X'; 1024 * 1024]; for _ in 0..150 {
1392 temp_file.write_all(&chunk).unwrap();
1393 }
1394 temp_file.flush().unwrap();
1395
1396 let dir = temp_file.path().parent().unwrap();
1397 let dir_path = dir.to_str().unwrap();
1398 let file_name = temp_file
1399 .path()
1400 .file_name()
1401 .unwrap()
1402 .to_str()
1403 .unwrap()
1404 .to_string();
1405
1406 let component = FileComponent::new();
1408 let endpoint = component
1409 .create_endpoint(&format!(
1410 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1411 ))
1412 .unwrap();
1413 let mut consumer = endpoint.create_consumer().unwrap();
1414
1415 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1416 let token = CancellationToken::new();
1417 let ctx = ConsumerContext::new(tx, token.clone());
1418
1419 tokio::spawn(async move {
1420 let _ = consumer.start(ctx).await;
1421 });
1422
1423 let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1424 rx.recv().await.unwrap().exchange
1425 })
1426 .await
1427 .expect("Should receive exchange");
1428 token.cancel();
1429
1430 assert!(matches!(exchange.input.body, Body::Stream(_)));
1432
1433 if let Body::Stream(ref stream_body) = exchange.input.body {
1435 assert!(stream_body.metadata.size_hint.is_some());
1436 let size = stream_body.metadata.size_hint.unwrap();
1437 assert_eq!(size, file_size as u64);
1438 }
1439
1440 if let Body::Stream(stream_body) = exchange.input.body {
1442 let body = Body::Stream(stream_body);
1443 let result = body.into_bytes(100 * 1024 * 1024).await;
1444 assert!(result.is_err());
1445 }
1446
1447 let component2 = FileComponent::new();
1450 let endpoint2 = component2
1451 .create_endpoint(&format!(
1452 "file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"
1453 ))
1454 .unwrap();
1455 let mut consumer2 = endpoint2.create_consumer().unwrap();
1456
1457 let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1458 let token2 = CancellationToken::new();
1459 let ctx2 = ConsumerContext::new(tx2, token2.clone());
1460
1461 tokio::spawn(async move {
1462 let _ = consumer2.start(ctx2).await;
1463 });
1464
1465 let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1466 rx2.recv().await.unwrap().exchange
1467 })
1468 .await
1469 .expect("Should receive exchange");
1470 token2.cancel();
1471
1472 if let Body::Stream(stream_body) = exchange2.input.body {
1473 let mut stream_lock = stream_body.stream.lock().await;
1474 let mut stream = stream_lock.take().unwrap();
1475
1476 if let Some(chunk_result) = stream.next().await {
1478 let chunk = chunk_result.unwrap();
1479 assert!(!chunk.is_empty());
1480 assert!(chunk.len() < file_size);
1481 }
1483 }
1484 }
1485
1486 #[tokio::test]
1491 async fn test_producer_writes_stream_body() {
1492 let dir = tempfile::tempdir().unwrap();
1493 let dir_path = dir.path().to_str().unwrap();
1494 let uri = format!("file:{dir_path}?fileName=out.txt");
1495
1496 let component = FileComponent::new();
1497 let endpoint = component.create_endpoint(&uri).unwrap();
1498 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1499
1500 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1501 Ok(Bytes::from("hello ")),
1502 Ok(Bytes::from("streaming ")),
1503 Ok(Bytes::from("world")),
1504 ];
1505 let stream = futures::stream::iter(chunks);
1506 let body = Body::Stream(camel_api::body::StreamBody {
1507 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1508 metadata: camel_api::body::StreamMetadata {
1509 size_hint: None,
1510 content_type: None,
1511 origin: None,
1512 },
1513 });
1514
1515 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1516 tower::ServiceExt::oneshot(producer, exchange)
1517 .await
1518 .unwrap();
1519
1520 let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1521 .await
1522 .unwrap();
1523 assert_eq!(content, "hello streaming world");
1524 }
1525
1526 #[tokio::test]
1527 async fn test_producer_stream_atomic_no_partial_on_error() {
1528 let dir = tempfile::tempdir().unwrap();
1530 let dir_path = dir.path().to_str().unwrap();
1531 let uri = format!("file:{dir_path}?fileName=out.txt");
1532
1533 let component = FileComponent::new();
1534 let endpoint = component.create_endpoint(&uri).unwrap();
1535 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1536
1537 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1538 Ok(Bytes::from("partial")),
1539 Err(CamelError::ProcessorError(
1540 "simulated stream error".to_string(),
1541 )),
1542 ];
1543 let stream = futures::stream::iter(chunks);
1544 let body = Body::Stream(camel_api::body::StreamBody {
1545 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1546 metadata: camel_api::body::StreamMetadata {
1547 size_hint: None,
1548 content_type: None,
1549 origin: None,
1550 },
1551 });
1552
1553 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1554 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1555 assert!(
1556 result.is_err(),
1557 "expected error when stream fails mid-write"
1558 );
1559
1560 assert!(
1562 !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1563 "partial file must not exist after failed write"
1564 );
1565
1566 assert!(
1568 !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1569 "temp file must be cleaned up after failed write"
1570 );
1571 }
1572
1573 #[tokio::test]
1574 async fn test_producer_stream_append() {
1575 let dir = tempfile::tempdir().unwrap();
1576 let dir_path = dir.path().to_str().unwrap();
1577 let target = format!("{dir_path}/out.txt");
1578
1579 tokio::fs::write(&target, b"line1\n").await.unwrap();
1581
1582 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1583 let component = FileComponent::new();
1584 let endpoint = component.create_endpoint(&uri).unwrap();
1585 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1586
1587 let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1588 let stream = futures::stream::iter(chunks);
1589 let body = Body::Stream(camel_api::body::StreamBody {
1590 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1591 metadata: camel_api::body::StreamMetadata {
1592 size_hint: None,
1593 content_type: None,
1594 origin: None,
1595 },
1596 });
1597
1598 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1599 tower::ServiceExt::oneshot(producer, exchange)
1600 .await
1601 .unwrap();
1602
1603 let content = tokio::fs::read_to_string(&target).await.unwrap();
1604 assert_eq!(content, "line1\nline2\n");
1605 }
1606
1607 #[tokio::test]
1608 async fn test_producer_stream_append_partial_on_error() {
1609 let dir = tempfile::tempdir().unwrap();
1612 let dir_path = dir.path().to_str().unwrap();
1613 let target = format!("{dir_path}/out.txt");
1614
1615 tokio::fs::write(&target, b"initial\n").await.unwrap();
1617
1618 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1619 let component = FileComponent::new();
1620 let endpoint = component.create_endpoint(&uri).unwrap();
1621 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1622
1623 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1625 Ok(Bytes::from("partial-")), Err(CamelError::ProcessorError("stream error".to_string())), Ok(Bytes::from("never-written")), ];
1629 let stream = futures::stream::iter(chunks);
1630 let body = Body::Stream(camel_api::body::StreamBody {
1631 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1632 metadata: camel_api::body::StreamMetadata {
1633 size_hint: None,
1634 content_type: None,
1635 origin: None,
1636 },
1637 });
1638
1639 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1640 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1641
1642 assert!(
1644 result.is_err(),
1645 "expected error when stream fails during append"
1646 );
1647
1648 let content = tokio::fs::read_to_string(&target).await.unwrap();
1650 assert_eq!(
1651 content, "initial\npartial-",
1652 "append leaves partial data on stream error (non-atomic by nature)"
1653 );
1654 }
1655
1656 #[tokio::test]
1657 async fn test_producer_stream_already_consumed_errors() {
1658 let dir = tempfile::tempdir().unwrap();
1659 let dir_path = dir.path().to_str().unwrap();
1660 let uri = format!("file:{dir_path}?fileName=out.txt");
1661
1662 let component = FileComponent::new();
1663 let endpoint = component.create_endpoint(&uri).unwrap();
1664 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1665
1666 let arc: std::sync::Arc<
1668 tokio::sync::Mutex<
1669 Option<
1670 std::pin::Pin<
1671 Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1672 >,
1673 >,
1674 >,
1675 > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1676 let body = Body::Stream(camel_api::body::StreamBody {
1677 stream: arc,
1678 metadata: camel_api::body::StreamMetadata {
1679 size_hint: None,
1680 content_type: None,
1681 origin: None,
1682 },
1683 });
1684
1685 let exchange = camel_api::Exchange::new(camel_api::Message::new(body));
1686 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1687 assert!(
1688 result.is_err(),
1689 "expected error for already-consumed stream"
1690 );
1691 }
1692
1693 #[test]
1698 fn test_global_config_applied_to_endpoint() {
1699 let global = FileGlobalConfig::default()
1701 .with_delay_ms(2000)
1702 .with_initial_delay_ms(5000)
1703 .with_read_timeout_ms(60_000)
1704 .with_write_timeout_ms(45_000);
1705 let component = FileComponent::with_config(global);
1706 let endpoint = component.create_endpoint("file:/tmp/inbox").unwrap();
1708 let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1711 let global2 = FileGlobalConfig::default()
1712 .with_delay_ms(2000)
1713 .with_initial_delay_ms(5000)
1714 .with_read_timeout_ms(60_000)
1715 .with_write_timeout_ms(45_000);
1716 config.apply_global_defaults(&global2);
1717 assert_eq!(config.delay, Duration::from_millis(2000));
1718 assert_eq!(config.initial_delay, Duration::from_millis(5000));
1719 assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1720 assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1721 let _ = endpoint; }
1724
1725 #[test]
1726 fn test_uri_param_wins_over_global_config() {
1727 let mut config =
1729 FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1730 let global = FileGlobalConfig::default()
1732 .with_delay_ms(3000)
1733 .with_initial_delay_ms(4000);
1734 config.apply_global_defaults(&global);
1735 assert_eq!(config.delay, Duration::from_millis(1000));
1737 assert_eq!(config.initial_delay, Duration::from_millis(2000));
1739 assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1742 }
1743}