1pub mod bundle;
2
3pub use bundle::FileBundle;
4
5use std::collections::HashSet;
6use std::future::Future;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use futures::StreamExt;
15use regex::Regex;
16use tokio::fs;
17use tokio::fs::OpenOptions;
18use tokio::io;
19use tokio::io::AsyncWriteExt;
20use tokio::time;
21use tokio_util::io::ReaderStream;
22use tower::Service;
23use tracing::{debug, warn};
24
25use camel_component_api::{
26 Body, BoxProcessor, CamelError, Exchange, Message, StreamBody, StreamMetadata,
27};
28use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
29use camel_component_api::{UriConfig, parse_uri};
30
31struct TempFileGuard {
40 path: PathBuf,
41 disarm: bool,
42}
43
44impl TempFileGuard {
45 fn new(path: PathBuf) -> Self {
46 Self {
47 path,
48 disarm: false,
49 }
50 }
51
52 fn disarm(&mut self) {
54 self.disarm = true;
55 }
56}
57
58impl Drop for TempFileGuard {
59 fn drop(&mut self) {
60 if !self.disarm {
61 let _ = std::fs::remove_file(&self.path);
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
73pub enum FileExistStrategy {
74 #[default]
76 Override,
77 Append,
79 Fail,
81}
82
83impl FromStr for FileExistStrategy {
84 type Err = String;
85
86 fn from_str(s: &str) -> Result<Self, Self::Err> {
87 match s {
88 "Override" | "override" => Ok(FileExistStrategy::Override),
89 "Append" | "append" => Ok(FileExistStrategy::Append),
90 "Fail" | "fail" => Ok(FileExistStrategy::Fail),
91 _ => Ok(FileExistStrategy::Override), }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
104#[serde(default)]
105pub struct FileGlobalConfig {
106 pub delay_ms: u64,
107 pub initial_delay_ms: u64,
108 pub read_timeout_ms: u64,
109 pub write_timeout_ms: u64,
110}
111
112impl Default for FileGlobalConfig {
113 fn default() -> Self {
114 Self {
115 delay_ms: 500,
116 initial_delay_ms: 1_000,
117 read_timeout_ms: 30_000,
118 write_timeout_ms: 30_000,
119 }
120 }
121}
122
123impl FileGlobalConfig {
124 pub fn new() -> Self {
125 Self::default()
126 }
127 pub fn with_delay_ms(mut self, v: u64) -> Self {
128 self.delay_ms = v;
129 self
130 }
131 pub fn with_initial_delay_ms(mut self, v: u64) -> Self {
132 self.initial_delay_ms = v;
133 self
134 }
135 pub fn with_read_timeout_ms(mut self, v: u64) -> Self {
136 self.read_timeout_ms = v;
137 self
138 }
139 pub fn with_write_timeout_ms(mut self, v: u64) -> Self {
140 self.write_timeout_ms = v;
141 self
142 }
143}
144
145#[derive(Debug, Clone, UriConfig)]
172#[uri_scheme = "file"]
173#[uri_config(skip_impl, crate = "camel_component_api")]
174pub struct FileConfig {
175 pub directory: String,
177
178 #[allow(dead_code)]
180 #[uri_param(name = "delay", default = "500")]
181 delay_ms: u64,
182
183 pub delay: Duration,
185
186 #[allow(dead_code)]
188 #[uri_param(name = "initialDelay", default = "1000")]
189 initial_delay_ms: u64,
190
191 pub initial_delay: Duration,
193
194 #[uri_param(default = "false")]
196 pub noop: bool,
197
198 #[uri_param(default = "false")]
200 pub delete: bool,
201
202 #[uri_param(name = "move")]
205 move_to: Option<String>,
206
207 #[uri_param(name = "fileName")]
209 pub file_name: Option<String>,
210
211 #[uri_param]
213 pub include: Option<String>,
214
215 #[uri_param]
217 pub exclude: Option<String>,
218
219 #[uri_param(default = "false")]
221 pub recursive: bool,
222
223 #[uri_param(name = "fileExist", default = "Override")]
225 pub file_exist: FileExistStrategy,
226
227 #[uri_param(name = "tempPrefix")]
229 pub temp_prefix: Option<String>,
230
231 #[uri_param(name = "autoCreate", default = "true")]
233 pub auto_create: bool,
234
235 #[allow(dead_code)]
237 #[uri_param(name = "readTimeout", default = "30000")]
238 read_timeout_ms: u64,
239
240 pub read_timeout: Duration,
242
243 #[allow(dead_code)]
245 #[uri_param(name = "writeTimeout", default = "30000")]
246 write_timeout_ms: u64,
247
248 pub write_timeout: Duration,
250}
251
252impl UriConfig for FileConfig {
253 fn scheme() -> &'static str {
254 "file"
255 }
256
257 fn from_uri(uri: &str) -> Result<Self, CamelError> {
258 let parts = parse_uri(uri)?;
259 Self::from_components(parts)
260 }
261
262 fn from_components(parts: camel_component_api::UriComponents) -> Result<Self, CamelError> {
263 Self::parse_uri_components(parts)?.validate()
264 }
265
266 fn validate(self) -> Result<Self, CamelError> {
267 let move_to = if self.noop || self.delete {
271 None
272 } else {
273 Some(self.move_to.unwrap_or_else(|| ".camel".to_string()))
274 };
275
276 Ok(Self { move_to, ..self })
277 }
278}
279
280impl FileConfig {
281 pub fn apply_global_defaults(&mut self, global: &FileGlobalConfig) {
289 if self.delay == Duration::from_millis(500) {
290 self.delay = Duration::from_millis(global.delay_ms);
291 }
292 if self.initial_delay == Duration::from_millis(1_000) {
293 self.initial_delay = Duration::from_millis(global.initial_delay_ms);
294 }
295 if self.read_timeout == Duration::from_millis(30_000) {
296 self.read_timeout = Duration::from_millis(global.read_timeout_ms);
297 }
298 if self.write_timeout == Duration::from_millis(30_000) {
299 self.write_timeout = Duration::from_millis(global.write_timeout_ms);
300 }
301 }
302}
303
304pub struct FileComponent {
309 config: Option<FileGlobalConfig>,
310}
311
312impl FileComponent {
313 pub fn new() -> Self {
314 Self { config: None }
315 }
316
317 pub fn with_config(config: FileGlobalConfig) -> Self {
318 Self {
319 config: Some(config),
320 }
321 }
322
323 pub fn with_optional_config(config: Option<FileGlobalConfig>) -> Self {
324 Self { config }
325 }
326}
327
328impl Default for FileComponent {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334impl Component for FileComponent {
335 fn scheme(&self) -> &str {
336 "file"
337 }
338
339 fn create_endpoint(
340 &self,
341 uri: &str,
342 _ctx: &dyn camel_component_api::ComponentContext,
343 ) -> Result<Box<dyn Endpoint>, CamelError> {
344 let mut config = FileConfig::from_uri(uri)?;
345 if let Some(ref global_config) = self.config {
346 config.apply_global_defaults(global_config);
347 }
348 Ok(Box::new(FileEndpoint {
349 uri: uri.to_string(),
350 config,
351 }))
352 }
353}
354
355struct FileEndpoint {
360 uri: String,
361 config: FileConfig,
362}
363
364impl Endpoint for FileEndpoint {
365 fn uri(&self) -> &str {
366 &self.uri
367 }
368
369 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
370 Ok(Box::new(FileConsumer {
371 config: self.config.clone(),
372 seen: HashSet::new(),
373 }))
374 }
375
376 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
377 Ok(BoxProcessor::new(FileProducer {
378 config: self.config.clone(),
379 }))
380 }
381}
382
383struct FileConsumer {
388 config: FileConfig,
389 seen: HashSet<PathBuf>,
390}
391
392#[async_trait]
393impl Consumer for FileConsumer {
394 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
395 let config = self.config.clone();
396
397 let include_re = config
398 .include
399 .as_ref()
400 .map(|p| Regex::new(p))
401 .transpose()
402 .map_err(|e| CamelError::InvalidUri(format!("invalid include regex: {e}")))?;
403 let exclude_re = config
404 .exclude
405 .as_ref()
406 .map(|p| Regex::new(p))
407 .transpose()
408 .map_err(|e| CamelError::InvalidUri(format!("invalid exclude regex: {e}")))?;
409
410 if !config.initial_delay.is_zero() {
411 tokio::select! {
412 _ = time::sleep(config.initial_delay) => {}
413 _ = context.cancelled() => {
414 debug!(directory = config.directory, "File consumer cancelled during initial delay");
415 return Ok(());
416 }
417 }
418 }
419
420 let mut interval = time::interval(config.delay);
421
422 loop {
423 tokio::select! {
424 _ = context.cancelled() => {
425 debug!(directory = config.directory, "File consumer received cancellation, stopping");
426 break;
427 }
428 _ = interval.tick() => {
429 if let Err(e) = poll_directory(
430 &config,
431 &context,
432 &include_re,
433 &exclude_re,
434 &mut self.seen,
435 ).await {
436 warn!(directory = config.directory, error = %e, "Error polling directory");
437 }
438 }
439 }
440 }
441
442 Ok(())
443 }
444
445 async fn stop(&mut self) -> Result<(), CamelError> {
446 Ok(())
447 }
448}
449
450async fn poll_directory(
451 config: &FileConfig,
452 context: &ConsumerContext,
453 include_re: &Option<Regex>,
454 exclude_re: &Option<Regex>,
455 seen: &mut HashSet<PathBuf>,
456) -> Result<(), CamelError> {
457 let base_path = std::path::Path::new(&config.directory);
458
459 let files = list_files(base_path, config.recursive).await?;
460
461 for file_path in files {
462 let file_name = file_path
463 .file_name()
464 .and_then(|n| n.to_str())
465 .unwrap_or_default()
466 .to_string();
467
468 if let Some(ref target_name) = config.file_name
469 && file_name != *target_name
470 {
471 continue;
472 }
473
474 if let Some(re) = include_re
475 && !re.is_match(&file_name)
476 {
477 continue;
478 }
479
480 if let Some(re) = exclude_re
481 && re.is_match(&file_name)
482 {
483 continue;
484 }
485
486 if let Some(ref move_dir) = config.move_to
487 && file_path.starts_with(base_path.join(move_dir))
488 {
489 continue;
490 }
491
492 if config.noop && seen.contains(&file_path) {
494 continue;
495 }
496
497 let (file, metadata) = match tokio::time::timeout(config.read_timeout, async {
498 let f = fs::File::open(&file_path).await?;
499 let m = f.metadata().await?;
500 Ok::<_, std::io::Error>((f, m))
501 })
502 .await
503 {
504 Ok(Ok((f, m))) => (f, Some(m)),
505 Ok(Err(e)) => {
506 warn!(
507 file = %file_path.display(),
508 error = %e,
509 "Failed to open file"
510 );
511 continue;
512 }
513 Err(_) => {
514 warn!(
515 file = %file_path.display(),
516 timeout_ms = config.read_timeout.as_millis(),
517 "Timeout opening file"
518 );
519 continue;
520 }
521 };
522
523 let file_len = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
524 let stream = ReaderStream::new(file).map(|res| res.map_err(CamelError::from));
525
526 let last_modified = metadata
527 .as_ref()
528 .and_then(|m| m.modified().ok())
529 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
530 .map(|d| d.as_millis() as u64)
531 .unwrap_or(0);
532
533 let relative_path = file_path
534 .strip_prefix(base_path)
535 .unwrap_or(&file_path)
536 .to_string_lossy()
537 .to_string();
538
539 let absolute_path = file_path
540 .canonicalize()
541 .unwrap_or_else(|_| file_path.clone())
542 .to_string_lossy()
543 .to_string();
544
545 let body = Body::Stream(StreamBody {
546 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
547 metadata: StreamMetadata {
548 size_hint: Some(file_len),
549 content_type: None,
550 origin: Some(absolute_path.clone()),
551 },
552 });
553
554 let mut exchange = Exchange::new(Message::new(body));
555 exchange
556 .input
557 .set_header("CamelFileName", serde_json::Value::String(relative_path));
558 exchange.input.set_header(
559 "CamelFileNameOnly",
560 serde_json::Value::String(file_name.clone()),
561 );
562 exchange.input.set_header(
563 "CamelFileAbsolutePath",
564 serde_json::Value::String(absolute_path),
565 );
566 exchange.input.set_header(
567 "CamelFileLength",
568 serde_json::Value::Number(file_len.into()),
569 );
570 exchange.input.set_header(
571 "CamelFileLastModified",
572 serde_json::Value::Number(last_modified.into()),
573 );
574
575 debug!(
576 file = %file_path.display(),
577 correlation_id = %exchange.correlation_id(),
578 "Processing file"
579 );
580
581 if context.send(exchange).await.is_err() {
582 break;
583 }
584
585 if config.noop {
586 seen.insert(file_path.clone());
587 }
588
589 if config.noop {
590 } else if config.delete {
592 if let Err(e) = fs::remove_file(&file_path).await {
593 warn!(file = %file_path.display(), error = %e, "Failed to delete file");
594 }
595 } else if let Some(ref move_dir) = config.move_to {
596 let target_dir = base_path.join(move_dir);
597 if let Err(e) = fs::create_dir_all(&target_dir).await {
598 warn!(dir = %target_dir.display(), error = %e, "Failed to create move directory");
599 continue;
600 }
601 let target_path = target_dir.join(&file_name);
602 if let Err(e) = fs::rename(&file_path, &target_path).await {
603 warn!(
604 from = %file_path.display(),
605 to = %target_path.display(),
606 error = %e,
607 "Failed to move file"
608 );
609 }
610 }
611 }
612
613 Ok(())
614}
615
616async fn list_files(
617 dir: &std::path::Path,
618 recursive: bool,
619) -> Result<Vec<std::path::PathBuf>, CamelError> {
620 let mut files = Vec::new();
621 let mut read_dir = fs::read_dir(dir).await.map_err(CamelError::from)?;
622
623 while let Some(entry) = read_dir.next_entry().await.map_err(CamelError::from)? {
624 let path = entry.path();
625 if path.is_file() {
626 files.push(path);
627 } else if path.is_dir() && recursive {
628 let mut sub_files = Box::pin(list_files(&path, true)).await?;
629 files.append(&mut sub_files);
630 }
631 }
632
633 files.sort();
634 Ok(files)
635}
636
637fn validate_path_is_within_base(
642 base_dir: &std::path::Path,
643 target_path: &std::path::Path,
644) -> Result<(), CamelError> {
645 let canonical_base = base_dir.canonicalize().map_err(|e| {
646 CamelError::ProcessorError(format!("Cannot canonicalize base directory: {}", e))
647 })?;
648
649 let canonical_target = if target_path.exists() {
651 target_path.canonicalize().map_err(|e| {
652 CamelError::ProcessorError(format!("Cannot canonicalize target path: {}", e))
653 })?
654 } else if let Some(parent) = target_path.parent() {
655 if !parent.exists() {
657 return Err(CamelError::ProcessorError(format!(
658 "Parent directory '{}' does not exist",
659 parent.display()
660 )));
661 }
662 let canonical_parent = parent.canonicalize().map_err(|e| {
663 CamelError::ProcessorError(format!("Cannot canonicalize parent directory: {}", e))
664 })?;
665 if let Some(filename) = target_path.file_name() {
667 canonical_parent.join(filename)
668 } else {
669 return Err(CamelError::ProcessorError(
670 "Invalid target path: no filename".to_string(),
671 ));
672 }
673 } else {
674 return Err(CamelError::ProcessorError(
675 "Invalid target path: no parent directory".to_string(),
676 ));
677 };
678
679 if !canonical_target.starts_with(&canonical_base) {
680 return Err(CamelError::ProcessorError(format!(
681 "Path '{}' is outside base directory '{}'",
682 canonical_target.display(),
683 canonical_base.display()
684 )));
685 }
686
687 Ok(())
688}
689
690#[derive(Clone)]
695struct FileProducer {
696 config: FileConfig,
697}
698
699impl FileProducer {
700 fn resolve_filename(exchange: &Exchange, config: &FileConfig) -> Result<String, CamelError> {
701 if let Some(name) = exchange
702 .input
703 .header("CamelFileName")
704 .and_then(|v| v.as_str())
705 {
706 return Ok(name.to_string());
707 }
708 if let Some(ref name) = config.file_name {
709 return Ok(name.clone());
710 }
711 Err(CamelError::ProcessorError(
712 "No filename specified: set CamelFileName header or fileName option".to_string(),
713 ))
714 }
715}
716
717impl Service<Exchange> for FileProducer {
718 type Response = Exchange;
719 type Error = CamelError;
720 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
721
722 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
723 Poll::Ready(Ok(()))
724 }
725
726 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
727 let config = self.config.clone();
728
729 Box::pin(async move {
730 let file_name = FileProducer::resolve_filename(&exchange, &config)?;
731 let body = exchange.input.body.clone();
732
733 let dir_path = std::path::Path::new(&config.directory);
734 let target_path = dir_path.join(&file_name);
735
736 if config.auto_create
738 && let Some(parent) = target_path.parent()
739 {
740 tokio::time::timeout(config.write_timeout, fs::create_dir_all(parent))
741 .await
742 .map_err(|_| CamelError::ProcessorError("Timeout creating directories".into()))?
743 .map_err(CamelError::from)?;
744 }
745
746 validate_path_is_within_base(dir_path, &target_path)?;
748
749 match config.file_exist {
751 FileExistStrategy::Fail if target_path.exists() => {
752 return Err(CamelError::ProcessorError(format!(
753 "File already exists: {}",
754 target_path.display()
755 )));
756 }
757 FileExistStrategy::Append => {
758 let mut file = tokio::time::timeout(
760 config.write_timeout,
761 OpenOptions::new()
762 .append(true)
763 .create(true)
764 .open(&target_path),
765 )
766 .await
767 .map_err(|_| {
768 CamelError::ProcessorError("Timeout opening file for append".into())
769 })?
770 .map_err(CamelError::from)?;
771
772 tokio::time::timeout(
773 config.write_timeout,
774 io::copy(&mut body.into_async_read(), &mut file),
775 )
776 .await
777 .map_err(|_| CamelError::ProcessorError("Timeout writing to file".into()))?
778 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
779
780 file.flush().await.map_err(CamelError::from)?;
781 }
782 _ => {
783 let temp_name = if let Some(ref prefix) = config.temp_prefix {
785 format!("{prefix}{file_name}")
786 } else {
787 format!(".tmp.{file_name}")
788 };
789 let temp_path = dir_path.join(&temp_name);
790
791 let mut guard = TempFileGuard::new(temp_path.clone());
793
794 let mut file =
796 tokio::time::timeout(config.write_timeout, fs::File::create(&temp_path))
797 .await
798 .map_err(|_| {
799 CamelError::ProcessorError("Timeout creating temp file".into())
800 })?
801 .map_err(CamelError::from)?;
802
803 let copy_result = tokio::time::timeout(
804 config.write_timeout,
805 io::copy(&mut body.into_async_read(), &mut file),
806 )
807 .await;
808
809 let _ = file.flush().await;
811
812 match copy_result {
813 Ok(Ok(_)) => {}
814 Ok(Err(e)) => {
815 return Err(CamelError::ProcessorError(e.to_string()));
817 }
818 Err(_) => {
819 return Err(CamelError::ProcessorError("Timeout writing file".into()));
821 }
822 }
823
824 let rename_result = tokio::time::timeout(
826 config.write_timeout,
827 fs::rename(&temp_path, &target_path),
828 )
829 .await;
830
831 match rename_result {
832 Ok(Ok(_)) => {
833 guard.disarm();
835 }
836 Ok(Err(e)) => {
837 return Err(CamelError::from(e));
839 }
840 Err(_) => {
841 return Err(CamelError::ProcessorError("Timeout renaming file".into()));
843 }
844 }
845 }
846 }
847
848 let abs_path = target_path
850 .canonicalize()
851 .unwrap_or_else(|_| target_path.clone())
852 .to_string_lossy()
853 .to_string();
854 exchange
855 .input
856 .set_header("CamelFileNameProduced", serde_json::Value::String(abs_path));
857
858 debug!(
859 file = %target_path.display(),
860 correlation_id = %exchange.correlation_id(),
861 "File written"
862 );
863
864 Ok(exchange)
865 })
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872 use bytes::Bytes;
873 use camel_component_api::NoOpComponentContext;
874 use std::time::Duration;
875 use tokio_util::sync::CancellationToken;
876
877 fn test_producer_ctx() -> ProducerContext {
878 ProducerContext::new()
879 }
880
881 #[test]
882 fn test_file_config_defaults() {
883 let config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
884 assert_eq!(config.directory, "/tmp/inbox");
885 assert_eq!(config.delay, Duration::from_millis(500));
886 assert_eq!(config.initial_delay, Duration::from_millis(1000));
887 assert!(!config.noop);
888 assert!(!config.delete);
889 assert_eq!(config.move_to, Some(".camel".to_string()));
890 assert!(config.file_name.is_none());
891 assert!(config.include.is_none());
892 assert!(config.exclude.is_none());
893 assert!(!config.recursive);
894 assert_eq!(config.file_exist, FileExistStrategy::Override);
895 assert!(config.temp_prefix.is_none());
896 assert!(config.auto_create);
897 assert_eq!(config.read_timeout, Duration::from_secs(30));
899 assert_eq!(config.write_timeout, Duration::from_secs(30));
900 }
901
902 #[test]
903 fn test_file_config_consumer_options() {
904 let config = FileConfig::from_uri(
905 "file:/data/input?delay=1000&initialDelay=2000&noop=true&recursive=true&include=.*\\.csv"
906 ).unwrap();
907 assert_eq!(config.directory, "/data/input");
908 assert_eq!(config.delay, Duration::from_millis(1000));
909 assert_eq!(config.initial_delay, Duration::from_millis(2000));
910 assert!(config.noop);
911 assert!(config.recursive);
912 assert_eq!(config.include, Some(".*\\.csv".to_string()));
913 }
914
915 #[test]
916 fn test_file_config_producer_options() {
917 let config = FileConfig::from_uri(
918 "file:/data/output?fileExist=Append&tempPrefix=.tmp&autoCreate=false&fileName=out.txt",
919 )
920 .unwrap();
921 assert_eq!(config.file_exist, FileExistStrategy::Append);
922 assert_eq!(config.temp_prefix, Some(".tmp".to_string()));
923 assert!(!config.auto_create);
924 assert_eq!(config.file_name, Some("out.txt".to_string()));
925 }
926
927 #[test]
928 fn test_file_config_delete_mode() {
929 let config = FileConfig::from_uri("file:/tmp/inbox?delete=true").unwrap();
930 assert!(config.delete);
931 assert!(config.move_to.is_none());
932 }
933
934 #[test]
935 fn test_file_config_noop_mode() {
936 let config = FileConfig::from_uri("file:/tmp/inbox?noop=true").unwrap();
937 assert!(config.noop);
938 assert!(config.move_to.is_none());
939 }
940
941 #[test]
942 fn test_file_config_wrong_scheme() {
943 let result = FileConfig::from_uri("timer:tick");
944 assert!(result.is_err());
945 }
946
947 #[test]
948 fn test_file_component_scheme() {
949 let component = FileComponent::new();
950 assert_eq!(component.scheme(), "file");
951 }
952
953 #[test]
954 fn test_file_component_creates_endpoint() {
955 let component = FileComponent::new();
956 let ctx = NoOpComponentContext;
957 let endpoint = component.create_endpoint("file:/tmp/test", &ctx);
958 assert!(endpoint.is_ok());
959 }
960
961 #[tokio::test]
966 async fn test_file_consumer_reads_files() {
967 let dir = tempfile::tempdir().unwrap();
968 let dir_path = dir.path().to_str().unwrap();
969
970 std::fs::write(dir.path().join("test1.txt"), "hello").unwrap();
971 std::fs::write(dir.path().join("test2.txt"), "world").unwrap();
972
973 let component = FileComponent::new();
974 let ctx = NoOpComponentContext;
975 let endpoint = component
976 .create_endpoint(
977 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100"),
978 &ctx,
979 )
980 .unwrap();
981 let mut consumer = endpoint.create_consumer().unwrap();
982
983 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
984 let token = CancellationToken::new();
985 let ctx = ConsumerContext::new(tx, token.clone());
986
987 tokio::spawn(async move {
988 consumer.start(ctx).await.unwrap();
989 });
990
991 let mut received = Vec::new();
992 let timeout = tokio::time::timeout(Duration::from_secs(2), async {
993 while let Some(envelope) = rx.recv().await {
994 received.push(envelope.exchange);
995 if received.len() == 2 {
996 break;
997 }
998 }
999 })
1000 .await;
1001 token.cancel();
1002
1003 assert!(timeout.is_ok(), "Should have received 2 exchanges");
1004 assert_eq!(received.len(), 2);
1005
1006 for ex in &received {
1007 assert!(ex.input.header("CamelFileName").is_some());
1008 assert!(ex.input.header("CamelFileNameOnly").is_some());
1009 assert!(ex.input.header("CamelFileAbsolutePath").is_some());
1010 assert!(ex.input.header("CamelFileLength").is_some());
1011 assert!(ex.input.header("CamelFileLastModified").is_some());
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn noop_second_poll_does_not_re_emit_seen_files() {
1017 let dir = tempfile::tempdir().unwrap();
1018 let file_path = dir.path().join("test.txt");
1019 tokio::fs::write(&file_path, b"hello").await.unwrap();
1020
1021 let uri = format!(
1022 "file:{}?noop=true&initialDelay=0&delay=50",
1023 dir.path().display()
1024 );
1025 let config = FileConfig::from_uri(&uri).unwrap();
1026 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1027 let token = CancellationToken::new();
1028 let ctx = ConsumerContext::new(tx, token);
1029
1030 let include_re = None;
1031 let exclude_re = None;
1032 let mut seen = std::collections::HashSet::new();
1033
1034 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1035 .await
1036 .unwrap();
1037 assert!(rx.try_recv().is_ok(), "first poll should emit file");
1038 assert!(rx.try_recv().is_err(), "should only emit once");
1039
1040 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1041 .await
1042 .unwrap();
1043 assert!(
1044 rx.try_recv().is_err(),
1045 "second poll should not re-emit seen file"
1046 );
1047 }
1048
1049 #[tokio::test]
1050 async fn noop_new_files_picked_up_after_first_poll() {
1051 let dir = tempfile::tempdir().unwrap();
1052 let file1 = dir.path().join("a.txt");
1053 tokio::fs::write(&file1, b"a").await.unwrap();
1054
1055 let uri = format!(
1056 "file:{}?noop=true&initialDelay=0&delay=50",
1057 dir.path().display()
1058 );
1059 let config = FileConfig::from_uri(&uri).unwrap();
1060 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1061 let token = CancellationToken::new();
1062 let ctx = ConsumerContext::new(tx, token);
1063
1064 let include_re = None;
1065 let exclude_re = None;
1066 let mut seen = std::collections::HashSet::new();
1067
1068 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1069 .await
1070 .unwrap();
1071 let _ = rx.try_recv();
1072
1073 let file2 = dir.path().join("b.txt");
1074 tokio::fs::write(&file2, b"b").await.unwrap();
1075
1076 poll_directory(&config, &ctx, &include_re, &exclude_re, &mut seen)
1077 .await
1078 .unwrap();
1079 assert!(
1080 rx.try_recv().is_ok(),
1081 "b.txt should be emitted on second poll"
1082 );
1083 assert!(rx.try_recv().is_err(), "a.txt should not be re-emitted");
1084 }
1085
1086 #[tokio::test]
1087 async fn test_file_consumer_include_filter() {
1088 let dir = tempfile::tempdir().unwrap();
1089 let dir_path = dir.path().to_str().unwrap();
1090
1091 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
1092 std::fs::write(dir.path().join("readme.txt"), "hello").unwrap();
1093
1094 let component = FileComponent::new();
1095 let ctx = NoOpComponentContext;
1096 let endpoint = component
1097 .create_endpoint(
1098 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&include=.*\\.csv"),
1099 &ctx,
1100 )
1101 .unwrap();
1102 let mut consumer = endpoint.create_consumer().unwrap();
1103
1104 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1105 let token = CancellationToken::new();
1106 let ctx = ConsumerContext::new(tx, token.clone());
1107
1108 tokio::spawn(async move {
1109 consumer.start(ctx).await.unwrap();
1110 });
1111
1112 let mut received = Vec::new();
1113 let _ = tokio::time::timeout(Duration::from_millis(500), async {
1114 while let Some(envelope) = rx.recv().await {
1115 received.push(envelope.exchange);
1116 if received.len() == 1 {
1117 break;
1118 }
1119 }
1120 })
1121 .await;
1122 token.cancel();
1123
1124 assert_eq!(received.len(), 1);
1125 let name = received[0]
1126 .input
1127 .header("CamelFileNameOnly")
1128 .and_then(|v| v.as_str())
1129 .unwrap();
1130 assert_eq!(name, "data.csv");
1131 }
1132
1133 #[tokio::test]
1134 async fn test_file_consumer_delete_mode() {
1135 let dir = tempfile::tempdir().unwrap();
1136 let dir_path = dir.path().to_str().unwrap();
1137
1138 std::fs::write(dir.path().join("deleteme.txt"), "bye").unwrap();
1139
1140 let component = FileComponent::new();
1141 let ctx = NoOpComponentContext;
1142 let endpoint = component
1143 .create_endpoint(
1144 &format!("file:{dir_path}?delete=true&initialDelay=0&delay=100"),
1145 &ctx,
1146 )
1147 .unwrap();
1148 let mut consumer = endpoint.create_consumer().unwrap();
1149
1150 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1151 let token = CancellationToken::new();
1152 let ctx = ConsumerContext::new(tx, token.clone());
1153
1154 tokio::spawn(async move {
1155 consumer.start(ctx).await.unwrap();
1156 });
1157
1158 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1159 token.cancel();
1160
1161 tokio::time::sleep(Duration::from_millis(100)).await;
1162
1163 assert!(
1164 !dir.path().join("deleteme.txt").exists(),
1165 "File should be deleted"
1166 );
1167 }
1168
1169 #[tokio::test]
1170 async fn test_file_consumer_move_mode() {
1171 let dir = tempfile::tempdir().unwrap();
1172 let dir_path = dir.path().to_str().unwrap();
1173
1174 std::fs::write(dir.path().join("moveme.txt"), "data").unwrap();
1175
1176 let component = FileComponent::new();
1177 let ctx = NoOpComponentContext;
1178 let endpoint = component
1179 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=100"), &ctx)
1180 .unwrap();
1181 let mut consumer = endpoint.create_consumer().unwrap();
1182
1183 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1184 let token = CancellationToken::new();
1185 let ctx = ConsumerContext::new(tx, token.clone());
1186
1187 tokio::spawn(async move {
1188 consumer.start(ctx).await.unwrap();
1189 });
1190
1191 let _ = tokio::time::timeout(Duration::from_millis(500), async { rx.recv().await }).await;
1192 token.cancel();
1193
1194 tokio::time::sleep(Duration::from_millis(100)).await;
1195
1196 assert!(
1197 !dir.path().join("moveme.txt").exists(),
1198 "Original file should be gone"
1199 );
1200 assert!(
1201 dir.path().join(".camel").join("moveme.txt").exists(),
1202 "File should be in .camel/"
1203 );
1204 }
1205
1206 #[tokio::test]
1207 async fn test_file_consumer_respects_cancellation() {
1208 let dir = tempfile::tempdir().unwrap();
1209 let dir_path = dir.path().to_str().unwrap();
1210
1211 let component = FileComponent::new();
1212 let ctx = NoOpComponentContext;
1213 let endpoint = component
1214 .create_endpoint(&format!("file:{dir_path}?initialDelay=0&delay=50"), &ctx)
1215 .unwrap();
1216 let mut consumer = endpoint.create_consumer().unwrap();
1217
1218 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1219 let token = CancellationToken::new();
1220 let ctx = ConsumerContext::new(tx, token.clone());
1221
1222 let handle = tokio::spawn(async move {
1223 consumer.start(ctx).await.unwrap();
1224 });
1225
1226 tokio::time::sleep(Duration::from_millis(150)).await;
1227 token.cancel();
1228
1229 let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
1230 assert!(
1231 result.is_ok(),
1232 "Consumer should have stopped after cancellation"
1233 );
1234 }
1235
1236 #[tokio::test]
1241 async fn test_file_producer_writes_file() {
1242 use tower::ServiceExt;
1243
1244 let dir = tempfile::tempdir().unwrap();
1245 let dir_path = dir.path().to_str().unwrap();
1246
1247 let component = FileComponent::new();
1248 let ctx = NoOpComponentContext;
1249 let endpoint = component
1250 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1251 .unwrap();
1252 let ctx = test_producer_ctx();
1253 let producer = endpoint.create_producer(&ctx).unwrap();
1254
1255 let mut exchange = Exchange::new(Message::new("file content"));
1256 exchange.input.set_header(
1257 "CamelFileName",
1258 serde_json::Value::String("output.txt".to_string()),
1259 );
1260
1261 let result = producer.oneshot(exchange).await.unwrap();
1262
1263 let content = std::fs::read_to_string(dir.path().join("output.txt")).unwrap();
1264 assert_eq!(content, "file content");
1265
1266 assert!(result.input.header("CamelFileNameProduced").is_some());
1267 }
1268
1269 #[tokio::test]
1270 async fn test_file_producer_auto_create_dirs() {
1271 use tower::ServiceExt;
1272
1273 let dir = tempfile::tempdir().unwrap();
1274 let dir_path = dir.path().to_str().unwrap();
1275
1276 let component = FileComponent::new();
1277 let ctx = NoOpComponentContext;
1278 let endpoint = component
1279 .create_endpoint(&format!("file:{dir_path}/sub/dir"), &ctx)
1280 .unwrap();
1281 let ctx = test_producer_ctx();
1282 let producer = endpoint.create_producer(&ctx).unwrap();
1283
1284 let mut exchange = Exchange::new(Message::new("nested"));
1285 exchange.input.set_header(
1286 "CamelFileName",
1287 serde_json::Value::String("file.txt".to_string()),
1288 );
1289
1290 producer.oneshot(exchange).await.unwrap();
1291
1292 assert!(dir.path().join("sub/dir/file.txt").exists());
1293 }
1294
1295 #[tokio::test]
1296 async fn test_file_producer_file_exist_fail() {
1297 use tower::ServiceExt;
1298
1299 let dir = tempfile::tempdir().unwrap();
1300 let dir_path = dir.path().to_str().unwrap();
1301
1302 std::fs::write(dir.path().join("existing.txt"), "old").unwrap();
1303
1304 let component = FileComponent::new();
1305 let ctx = NoOpComponentContext;
1306 let endpoint = component
1307 .create_endpoint(&format!("file:{dir_path}?fileExist=Fail"), &ctx)
1308 .unwrap();
1309 let ctx = test_producer_ctx();
1310 let producer = endpoint.create_producer(&ctx).unwrap();
1311
1312 let mut exchange = Exchange::new(Message::new("new"));
1313 exchange.input.set_header(
1314 "CamelFileName",
1315 serde_json::Value::String("existing.txt".to_string()),
1316 );
1317
1318 let result = producer.oneshot(exchange).await;
1319 assert!(
1320 result.is_err(),
1321 "Should fail when file exists with Fail strategy"
1322 );
1323 }
1324
1325 #[tokio::test]
1326 async fn test_file_producer_file_exist_append() {
1327 use tower::ServiceExt;
1328
1329 let dir = tempfile::tempdir().unwrap();
1330 let dir_path = dir.path().to_str().unwrap();
1331
1332 std::fs::write(dir.path().join("append.txt"), "old").unwrap();
1333
1334 let component = FileComponent::new();
1335 let ctx = NoOpComponentContext;
1336 let endpoint = component
1337 .create_endpoint(&format!("file:{dir_path}?fileExist=Append"), &ctx)
1338 .unwrap();
1339 let ctx = test_producer_ctx();
1340 let producer = endpoint.create_producer(&ctx).unwrap();
1341
1342 let mut exchange = Exchange::new(Message::new("new"));
1343 exchange.input.set_header(
1344 "CamelFileName",
1345 serde_json::Value::String("append.txt".to_string()),
1346 );
1347
1348 producer.oneshot(exchange).await.unwrap();
1349
1350 let content = std::fs::read_to_string(dir.path().join("append.txt")).unwrap();
1351 assert_eq!(content, "oldnew");
1352 }
1353
1354 #[tokio::test]
1355 async fn test_file_producer_temp_prefix() {
1356 use tower::ServiceExt;
1357
1358 let dir = tempfile::tempdir().unwrap();
1359 let dir_path = dir.path().to_str().unwrap();
1360
1361 let component = FileComponent::new();
1362 let ctx = NoOpComponentContext;
1363 let endpoint = component
1364 .create_endpoint(&format!("file:{dir_path}?tempPrefix=.tmp"), &ctx)
1365 .unwrap();
1366 let ctx = test_producer_ctx();
1367 let producer = endpoint.create_producer(&ctx).unwrap();
1368
1369 let mut exchange = Exchange::new(Message::new("atomic write"));
1370 exchange.input.set_header(
1371 "CamelFileName",
1372 serde_json::Value::String("final.txt".to_string()),
1373 );
1374
1375 producer.oneshot(exchange).await.unwrap();
1376
1377 assert!(dir.path().join("final.txt").exists());
1378 assert!(!dir.path().join(".tmpfinal.txt").exists());
1379 let content = std::fs::read_to_string(dir.path().join("final.txt")).unwrap();
1380 assert_eq!(content, "atomic write");
1381 }
1382
1383 #[tokio::test]
1384 async fn test_file_producer_uses_filename_option() {
1385 use tower::ServiceExt;
1386
1387 let dir = tempfile::tempdir().unwrap();
1388 let dir_path = dir.path().to_str().unwrap();
1389
1390 let component = FileComponent::new();
1391 let ctx = NoOpComponentContext;
1392 let endpoint = component
1393 .create_endpoint(&format!("file:{dir_path}?fileName=fixed.txt"), &ctx)
1394 .unwrap();
1395 let ctx = test_producer_ctx();
1396 let producer = endpoint.create_producer(&ctx).unwrap();
1397
1398 let exchange = Exchange::new(Message::new("content"));
1399
1400 producer.oneshot(exchange).await.unwrap();
1401 assert!(dir.path().join("fixed.txt").exists());
1402 }
1403
1404 #[tokio::test]
1405 async fn test_file_producer_no_filename_errors() {
1406 use tower::ServiceExt;
1407
1408 let dir = tempfile::tempdir().unwrap();
1409 let dir_path = dir.path().to_str().unwrap();
1410
1411 let component = FileComponent::new();
1412 let ctx = NoOpComponentContext;
1413 let endpoint = component
1414 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1415 .unwrap();
1416 let ctx = test_producer_ctx();
1417 let producer = endpoint.create_producer(&ctx).unwrap();
1418
1419 let exchange = Exchange::new(Message::new("content"));
1420
1421 let result = producer.oneshot(exchange).await;
1422 assert!(result.is_err(), "Should error when no filename is provided");
1423 }
1424
1425 #[tokio::test]
1430 async fn test_file_producer_rejects_path_traversal_parent_directory() {
1431 use tower::ServiceExt;
1432
1433 let dir = tempfile::tempdir().unwrap();
1434 let dir_path = dir.path().to_str().unwrap();
1435
1436 std::fs::create_dir(dir.path().join("subdir")).unwrap();
1438 std::fs::write(dir.path().join("secret.txt"), "secret").unwrap();
1439
1440 let component = FileComponent::new();
1441 let ctx = NoOpComponentContext;
1442 let endpoint = component
1443 .create_endpoint(&format!("file:{dir_path}/subdir"), &ctx)
1444 .unwrap();
1445 let ctx = test_producer_ctx();
1446 let producer = endpoint.create_producer(&ctx).unwrap();
1447
1448 let mut exchange = Exchange::new(Message::new("malicious"));
1449 exchange.input.set_header(
1450 "CamelFileName",
1451 serde_json::Value::String("../secret.txt".to_string()),
1452 );
1453
1454 let result = producer.oneshot(exchange).await;
1455 assert!(result.is_err(), "Should reject path traversal attempt");
1456
1457 let err = result.unwrap_err();
1458 assert!(
1459 err.to_string().contains("outside"),
1460 "Error should mention path is outside base directory"
1461 );
1462 }
1463
1464 #[tokio::test]
1465 async fn test_file_producer_rejects_absolute_path_outside_base() {
1466 use tower::ServiceExt;
1467
1468 let dir = tempfile::tempdir().unwrap();
1469 let dir_path = dir.path().to_str().unwrap();
1470
1471 let component = FileComponent::new();
1472 let ctx = NoOpComponentContext;
1473 let endpoint = component
1474 .create_endpoint(&format!("file:{dir_path}"), &ctx)
1475 .unwrap();
1476 let ctx = test_producer_ctx();
1477 let producer = endpoint.create_producer(&ctx).unwrap();
1478
1479 let mut exchange = Exchange::new(Message::new("malicious"));
1480 exchange.input.set_header(
1481 "CamelFileName",
1482 serde_json::Value::String("/etc/passwd".to_string()),
1483 );
1484
1485 let result = producer.oneshot(exchange).await;
1486 assert!(result.is_err(), "Should reject absolute path outside base");
1487 }
1488
1489 #[tokio::test]
1494 #[ignore] async fn test_large_file_streaming_constant_memory() {
1496 use std::io::Write;
1497 use tempfile::NamedTempFile;
1498
1499 let mut temp_file = NamedTempFile::new().unwrap();
1501 let file_size = 150 * 1024 * 1024; let chunk = vec![b'X'; 1024 * 1024]; for _ in 0..150 {
1505 temp_file.write_all(&chunk).unwrap();
1506 }
1507 temp_file.flush().unwrap();
1508
1509 let dir = temp_file.path().parent().unwrap();
1510 let dir_path = dir.to_str().unwrap();
1511 let file_name = temp_file
1512 .path()
1513 .file_name()
1514 .unwrap()
1515 .to_str()
1516 .unwrap()
1517 .to_string();
1518
1519 let component = FileComponent::new();
1521 let component_ctx = NoOpComponentContext;
1522 let endpoint = component
1523 .create_endpoint(
1524 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1525 &component_ctx,
1526 )
1527 .unwrap();
1528 let mut consumer = endpoint.create_consumer().unwrap();
1529
1530 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1531 let token = CancellationToken::new();
1532 let ctx = ConsumerContext::new(tx, token.clone());
1533
1534 tokio::spawn(async move {
1535 let _ = consumer.start(ctx).await;
1536 });
1537
1538 let exchange = tokio::time::timeout(Duration::from_secs(5), async {
1539 rx.recv().await.unwrap().exchange
1540 })
1541 .await
1542 .expect("Should receive exchange");
1543 token.cancel();
1544
1545 assert!(matches!(exchange.input.body, Body::Stream(_)));
1547
1548 if let Body::Stream(ref stream_body) = exchange.input.body {
1550 assert!(stream_body.metadata.size_hint.is_some());
1551 let size = stream_body.metadata.size_hint.unwrap();
1552 assert_eq!(size, file_size as u64);
1553 }
1554
1555 if let Body::Stream(stream_body) = exchange.input.body {
1557 let body = Body::Stream(stream_body);
1558 let result = body.into_bytes(100 * 1024 * 1024).await;
1559 assert!(result.is_err());
1560 }
1561
1562 let component2 = FileComponent::new();
1565 let endpoint2 = component2
1566 .create_endpoint(
1567 &format!("file:{dir_path}?noop=true&initialDelay=0&delay=100&fileName={file_name}"),
1568 &component_ctx,
1569 )
1570 .unwrap();
1571 let mut consumer2 = endpoint2.create_consumer().unwrap();
1572
1573 let (tx2, mut rx2) = tokio::sync::mpsc::channel(16);
1574 let token2 = CancellationToken::new();
1575 let ctx2 = ConsumerContext::new(tx2, token2.clone());
1576
1577 tokio::spawn(async move {
1578 let _ = consumer2.start(ctx2).await;
1579 });
1580
1581 let exchange2 = tokio::time::timeout(Duration::from_secs(5), async {
1582 rx2.recv().await.unwrap().exchange
1583 })
1584 .await
1585 .expect("Should receive exchange");
1586 token2.cancel();
1587
1588 if let Body::Stream(stream_body) = exchange2.input.body {
1589 let mut stream_lock = stream_body.stream.lock().await;
1590 let mut stream = stream_lock.take().unwrap();
1591
1592 if let Some(chunk_result) = stream.next().await {
1594 let chunk = chunk_result.unwrap();
1595 assert!(!chunk.is_empty());
1596 assert!(chunk.len() < file_size);
1597 }
1599 }
1600 }
1601
1602 #[tokio::test]
1607 async fn test_producer_writes_stream_body() {
1608 let dir = tempfile::tempdir().unwrap();
1609 let dir_path = dir.path().to_str().unwrap();
1610 let uri = format!("file:{dir_path}?fileName=out.txt");
1611
1612 let component = FileComponent::new();
1613 let ctx = NoOpComponentContext;
1614 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1615 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1616
1617 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1618 Ok(Bytes::from("hello ")),
1619 Ok(Bytes::from("streaming ")),
1620 Ok(Bytes::from("world")),
1621 ];
1622 let stream = futures::stream::iter(chunks);
1623 let body = Body::Stream(StreamBody {
1624 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1625 metadata: StreamMetadata {
1626 size_hint: None,
1627 content_type: None,
1628 origin: None,
1629 },
1630 });
1631
1632 let exchange = Exchange::new(Message::new(body));
1633 tower::ServiceExt::oneshot(producer, exchange)
1634 .await
1635 .unwrap();
1636
1637 let content = tokio::fs::read_to_string(format!("{dir_path}/out.txt"))
1638 .await
1639 .unwrap();
1640 assert_eq!(content, "hello streaming world");
1641 }
1642
1643 #[tokio::test]
1644 async fn test_producer_stream_atomic_no_partial_on_error() {
1645 let dir = tempfile::tempdir().unwrap();
1647 let dir_path = dir.path().to_str().unwrap();
1648 let uri = format!("file:{dir_path}?fileName=out.txt");
1649
1650 let component = FileComponent::new();
1651 let ctx = NoOpComponentContext;
1652 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1653 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1654
1655 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1656 Ok(Bytes::from("partial")),
1657 Err(CamelError::ProcessorError(
1658 "simulated stream error".to_string(),
1659 )),
1660 ];
1661 let stream = futures::stream::iter(chunks);
1662 let body = Body::Stream(StreamBody {
1663 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1664 metadata: StreamMetadata {
1665 size_hint: None,
1666 content_type: None,
1667 origin: None,
1668 },
1669 });
1670
1671 let exchange = Exchange::new(Message::new(body));
1672 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1673 assert!(
1674 result.is_err(),
1675 "expected error when stream fails mid-write"
1676 );
1677
1678 assert!(
1680 !std::path::Path::new(&format!("{dir_path}/out.txt")).exists(),
1681 "partial file must not exist after failed write"
1682 );
1683
1684 assert!(
1686 !std::path::Path::new(&format!("{dir_path}/.tmp.out.txt")).exists(),
1687 "temp file must be cleaned up after failed write"
1688 );
1689 }
1690
1691 #[tokio::test]
1692 async fn test_producer_stream_append() {
1693 let dir = tempfile::tempdir().unwrap();
1694 let dir_path = dir.path().to_str().unwrap();
1695 let target = format!("{dir_path}/out.txt");
1696
1697 tokio::fs::write(&target, b"line1\n").await.unwrap();
1699
1700 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1701 let component = FileComponent::new();
1702 let ctx = NoOpComponentContext;
1703 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1704 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1705
1706 let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from("line2\n"))];
1707 let stream = futures::stream::iter(chunks);
1708 let body = Body::Stream(StreamBody {
1709 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1710 metadata: StreamMetadata {
1711 size_hint: None,
1712 content_type: None,
1713 origin: None,
1714 },
1715 });
1716
1717 let exchange = Exchange::new(Message::new(body));
1718 tower::ServiceExt::oneshot(producer, exchange)
1719 .await
1720 .unwrap();
1721
1722 let content = tokio::fs::read_to_string(&target).await.unwrap();
1723 assert_eq!(content, "line1\nline2\n");
1724 }
1725
1726 #[tokio::test]
1727 async fn test_producer_stream_append_partial_on_error() {
1728 let dir = tempfile::tempdir().unwrap();
1731 let dir_path = dir.path().to_str().unwrap();
1732 let target = format!("{dir_path}/out.txt");
1733
1734 tokio::fs::write(&target, b"initial\n").await.unwrap();
1736
1737 let uri = format!("file:{dir_path}?fileName=out.txt&fileExist=Append");
1738 let component = FileComponent::new();
1739 let ctx = NoOpComponentContext;
1740 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1741 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1742
1743 let chunks: Vec<Result<Bytes, CamelError>> = vec![
1745 Ok(Bytes::from("partial-")), Err(CamelError::ProcessorError("stream error".to_string())), Ok(Bytes::from("never-written")), ];
1749 let stream = futures::stream::iter(chunks);
1750 let body = Body::Stream(StreamBody {
1751 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream)))),
1752 metadata: StreamMetadata {
1753 size_hint: None,
1754 content_type: None,
1755 origin: None,
1756 },
1757 });
1758
1759 let exchange = Exchange::new(Message::new(body));
1760 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1761
1762 assert!(
1764 result.is_err(),
1765 "expected error when stream fails during append"
1766 );
1767
1768 let content = tokio::fs::read_to_string(&target).await.unwrap();
1770 assert_eq!(
1771 content, "initial\npartial-",
1772 "append leaves partial data on stream error (non-atomic by nature)"
1773 );
1774 }
1775
1776 #[tokio::test]
1777 async fn test_producer_stream_already_consumed_errors() {
1778 let dir = tempfile::tempdir().unwrap();
1779 let dir_path = dir.path().to_str().unwrap();
1780 let uri = format!("file:{dir_path}?fileName=out.txt");
1781
1782 let component = FileComponent::new();
1783 let ctx = NoOpComponentContext;
1784 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
1785 let producer = endpoint.create_producer(&test_producer_ctx()).unwrap();
1786
1787 type MaybeStream = std::sync::Arc<
1789 tokio::sync::Mutex<
1790 Option<
1791 std::pin::Pin<
1792 Box<dyn futures::Stream<Item = Result<Bytes, CamelError>> + Send>,
1793 >,
1794 >,
1795 >,
1796 >;
1797 let arc: MaybeStream = std::sync::Arc::new(tokio::sync::Mutex::new(None));
1798 let body = Body::Stream(StreamBody {
1799 stream: arc,
1800 metadata: StreamMetadata {
1801 size_hint: None,
1802 content_type: None,
1803 origin: None,
1804 },
1805 });
1806
1807 let exchange = Exchange::new(Message::new(body));
1808 let result = tower::ServiceExt::oneshot(producer, exchange).await;
1809 assert!(
1810 result.is_err(),
1811 "expected error for already-consumed stream"
1812 );
1813 }
1814
1815 #[test]
1820 fn test_global_config_applied_to_endpoint() {
1821 let global = FileGlobalConfig::default()
1823 .with_delay_ms(2000)
1824 .with_initial_delay_ms(5000)
1825 .with_read_timeout_ms(60_000)
1826 .with_write_timeout_ms(45_000);
1827 let component = FileComponent::with_config(global);
1828 let ctx = NoOpComponentContext;
1829 let endpoint = component.create_endpoint("file:/tmp/inbox", &ctx).unwrap();
1831 let mut config = FileConfig::from_uri("file:/tmp/inbox").unwrap();
1834 let global2 = FileGlobalConfig::default()
1835 .with_delay_ms(2000)
1836 .with_initial_delay_ms(5000)
1837 .with_read_timeout_ms(60_000)
1838 .with_write_timeout_ms(45_000);
1839 config.apply_global_defaults(&global2);
1840 assert_eq!(config.delay, Duration::from_millis(2000));
1841 assert_eq!(config.initial_delay, Duration::from_millis(5000));
1842 assert_eq!(config.read_timeout, Duration::from_millis(60_000));
1843 assert_eq!(config.write_timeout, Duration::from_millis(45_000));
1844 let _ = endpoint; }
1847
1848 #[test]
1849 fn test_uri_param_wins_over_global_config() {
1850 let mut config =
1852 FileConfig::from_uri("file:/tmp/inbox?delay=1000&initialDelay=2000").unwrap();
1853 let global = FileGlobalConfig::default()
1855 .with_delay_ms(3000)
1856 .with_initial_delay_ms(4000);
1857 config.apply_global_defaults(&global);
1858 assert_eq!(config.delay, Duration::from_millis(1000));
1860 assert_eq!(config.initial_delay, Duration::from_millis(2000));
1862 assert_eq!(config.read_timeout, Duration::from_millis(30_000));
1865 }
1866}