1pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21 ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24 CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25 ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26 StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::NetworkRetryPolicy;
30use camel_component_api::parse_uri;
31use camel_component_api::retry_async_cancelable;
32use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
33use camel_component_api::{
34 Component, Consumer, ConsumerContext, Endpoint, ProducerContext, RuntimeObservability,
35};
36use tower::Service;
37
38static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
41 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
42
43fn track_container(id: String) {
45 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
46 tracker.insert(id);
47 }
48}
49
50fn untrack_container(id: &str) {
52 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
53 tracker.remove(id);
54 }
55}
56
57pub async fn cleanup_tracked_containers() {
59 let ids: Vec<String> = {
60 match CONTAINER_TRACKER.lock() {
61 Ok(tracker) => tracker.iter().cloned().collect(),
62 Err(_) => return,
63 }
64 };
65
66 if ids.is_empty() {
67 return;
68 }
69
70 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
71
72 let docker = match Docker::connect_with_local_defaults() {
73 Ok(d) => d,
74 Err(e) => {
75 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
77 return;
78 }
79 };
80
81 for id in ids {
82 match docker
83 .remove_container(
84 &id,
85 Some(RemoveContainerOptions {
86 force: true,
87 ..Default::default()
88 }),
89 )
90 .await
91 {
92 Ok(_) => {
93 tracing::debug!("Cleaned up container {}", id);
94 untrack_container(&id);
95 }
96 Err(e) => {
97 tracing::warn!("Failed to cleanup container {}: {}", id, e);
98 }
99 }
100 }
101}
102
103const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
107
108pub const HEADER_ACTION: &str = "CamelContainerAction";
110
111pub const HEADER_IMAGE: &str = "CamelContainerImage";
113
114pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
116
117pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
119
120pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
122
123pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
125
126pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
128
129pub const HEADER_CMD: &str = "CamelContainerCmd";
131
132pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
134
135pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
137
138pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
140
141pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
143
144fn container_reconnect_default() -> NetworkRetryPolicy {
152 NetworkRetryPolicy {
153 enabled: true,
154 max_attempts: 0, initial_delay: std::time::Duration::from_secs(5),
156 multiplier: 1.0, max_delay: std::time::Duration::from_secs(5),
158 jitter_factor: 0.0,
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
166#[serde(default)]
167pub struct ContainerGlobalConfig {
168 pub docker_host: String,
170 #[serde(default = "container_reconnect_default")]
172 pub reconnect: NetworkRetryPolicy,
173}
174
175impl Default for ContainerGlobalConfig {
176 fn default() -> Self {
177 Self {
178 docker_host: "unix:///var/run/docker.sock".to_string(),
179 reconnect: container_reconnect_default(),
180 }
181 }
182}
183
184impl ContainerGlobalConfig {
185 pub fn new() -> Self {
186 Self::default()
187 }
188
189 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
190 self.docker_host = v.into();
191 self
192 }
193}
194
195#[derive(Debug, Clone)]
204pub struct ContainerConfig {
205 pub operation: String,
207 pub image: Option<String>,
209 pub name: Option<String>,
211 pub host: Option<String>,
213 pub cmd: Option<String>,
215 pub ports: Option<String>,
217 pub env: Option<String>,
219 pub network: Option<String>,
221 pub container_id: Option<String>,
223 pub follow: bool,
225 pub timestamps: bool,
227 pub tail: Option<String>,
229 pub auto_pull: bool,
231 pub auto_remove: bool,
233 pub volumes: Option<String>,
235 pub user: Option<String>,
237 pub workdir: Option<String>,
239 pub detach: bool,
241 pub driver: Option<String>,
243 pub force: bool,
245 pub reconnect: NetworkRetryPolicy,
247}
248
249impl ContainerConfig {
250 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
258 let parts = parse_uri(uri)?;
259 if parts.scheme != "container" {
260 return Err(CamelError::InvalidUri(format!(
261 "expected scheme 'container', got '{}'",
262 parts.scheme
263 )));
264 }
265
266 let image = parts.params.get("image").cloned();
267 let name = parts.params.get("name").cloned();
268 let cmd = parts.params.get("cmd").cloned();
269 let ports = parts.params.get("ports").cloned();
270 let env = parts.params.get("env").cloned();
271 let network = parts.params.get("network").cloned();
272 let container_id = parts.params.get("containerId").cloned();
273 let follow = parts
274 .params
275 .get("follow")
276 .map(|v| v.eq_ignore_ascii_case("true"))
277 .unwrap_or(true);
278 let timestamps = parts
279 .params
280 .get("timestamps")
281 .map(|v| v.eq_ignore_ascii_case("true"))
282 .unwrap_or(false);
283 let tail = parts.params.get("tail").cloned();
284 let auto_pull = parts
285 .params
286 .get("autoPull")
287 .map(|v| v.eq_ignore_ascii_case("true"))
288 .unwrap_or(true);
289 let auto_remove = parts
290 .params
291 .get("autoRemove")
292 .map(|v| v.eq_ignore_ascii_case("true"))
293 .unwrap_or(true);
294 let host = parts.params.get("host").cloned();
296 let volumes = parts.params.get("volumes").cloned();
297 let user = parts.params.get("user").cloned();
298 let workdir = parts.params.get("workdir").cloned();
299 let detach = parts
300 .params
301 .get("detach")
302 .map(|v| v.eq_ignore_ascii_case("true"))
303 .unwrap_or(false);
304 let driver = parts.params.get("driver").cloned();
305 let force = parts
306 .params
307 .get("force")
308 .map(|v| v.eq_ignore_ascii_case("true"))
309 .unwrap_or(false);
310
311 Ok(Self {
312 operation: parts.path,
313 image,
314 name,
315 host,
316 cmd,
317 ports,
318 env,
319 network,
320 container_id,
321 follow,
322 timestamps,
323 tail,
324 auto_pull,
325 auto_remove,
326 volumes,
327 user,
328 workdir,
329 detach,
330 driver,
331 force,
332 reconnect: NetworkRetryPolicy::default(),
333 })
334 }
335
336 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
339 if self.host.is_none() {
340 self.host = Some(global.docker_host.clone());
341 }
342 self.reconnect = global.reconnect.clone();
343 }
344
345 fn docker_socket_path(&self) -> Result<&str, CamelError> {
346 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
347 "npipe:////./pipe/docker_engine"
348 } else {
349 "unix:///var/run/docker.sock"
350 });
351
352 if host.starts_with("unix://") || host.starts_with("npipe://") {
353 return Ok(host);
354 }
355
356 if host.contains("://") {
357 return Err(CamelError::ProcessorError(format!(
358 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
359 host
360 )));
361 }
362
363 Ok(host)
364 }
365
366 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
367 let socket_path = self.docker_socket_path()?;
368 Docker::connect_with_socket(
369 socket_path,
370 DOCKER_CONNECT_TIMEOUT_SECS,
371 bollard::API_DEFAULT_VERSION,
372 )
373 .map_err(|e| {
374 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
375 })
376 }
377
378 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
386 let docker = self.connect_docker_client()?;
387 docker
388 .ping()
389 .await
390 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
391 Ok(docker)
392 }
393
394 #[allow(clippy::type_complexity)]
395 fn parse_ports(
396 &self,
397 ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
398 let ports_str = match self.ports.as_ref() {
399 Some(s) => s,
400 None => return Ok((Vec::new(), HashMap::new())),
401 };
402
403 let mut exposed_ports: Vec<String> = Vec::new();
404 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
405
406 for mapping in ports_str.split(',') {
407 let mapping = mapping.trim();
408 if mapping.is_empty() {
409 continue;
410 }
411
412 let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
413 CamelError::ProcessorError(format!(
414 "malformed port mapping '{}': expected hostPort:containerPort",
415 mapping
416 ))
417 })?;
418
419 let (container_port, protocol) = if container_spec.contains('/') {
420 let parts: Vec<&str> = container_spec.split('/').collect();
421 (parts[0], parts[1])
422 } else {
423 (container_spec, "tcp")
424 };
425
426 let container_key = format!("{}/{}", container_port, protocol);
427
428 exposed_ports.push(container_key.clone());
429
430 port_bindings.insert(
431 container_key,
432 Some(vec![PortBinding {
433 host_ip: None,
434 host_port: Some(host_port.to_string()),
435 }]),
436 );
437 }
438
439 Ok((exposed_ports, port_bindings))
440 }
441
442 fn parse_env(&self) -> Option<Vec<String>> {
443 let env_str = self.env.as_ref()?;
444
445 let env_vars: Vec<String> = env_str
446 .split(',')
447 .map(|s| s.trim().to_string())
448 .filter(|s| !s.is_empty())
449 .collect();
450
451 if env_vars.is_empty() {
452 None
453 } else {
454 Some(env_vars)
455 }
456 }
457
458 #[cfg(test)]
459 #[allow(clippy::type_complexity)]
460 fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
461 self.volumes.as_deref().and_then(parse_volume_str)
462 }
463}
464
465#[allow(clippy::type_complexity)]
470fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
471 let mut binds: Vec<String> = Vec::new();
472 let mut anonymous_volumes: Vec<String> = Vec::new();
473
474 for entry in volumes_str.split(',') {
475 let entry = entry.trim();
476 if entry.is_empty() {
477 continue;
478 }
479
480 let segments: Vec<&str> = entry.split(':').collect();
481
482 match segments.len() {
483 3 => {
484 let source = segments[0];
485 let target = segments[1];
486 let mode = segments[2];
487 if mode != "ro" && mode != "rw" {
488 continue;
489 }
490 binds.push(format!("{}:{}:{}", source, target, mode));
491 }
492 2 => {
493 let a = segments[0];
494 let b = segments[1];
495 if b == "ro" || b == "rw" {
496 anonymous_volumes.push(a.to_string());
497 } else {
498 binds.push(format!("{}:{}", a, b));
499 }
500 }
501 1 => {
502 anonymous_volumes.push(segments[0].to_string());
503 }
504 _ => continue,
505 }
506 }
507
508 if binds.is_empty() && anonymous_volumes.is_empty() {
509 None
510 } else {
511 Some((binds, anonymous_volumes))
512 }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516enum ProducerOperation {
517 List,
518 Run,
519 Start,
520 Stop,
521 Remove,
522 Exec,
523 NetworkCreate,
524 NetworkConnect,
525 NetworkDisconnect,
526 NetworkRemove,
527 NetworkList,
528}
529
530fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
531 match operation {
532 "list" => Ok(ProducerOperation::List),
533 "run" => Ok(ProducerOperation::Run),
534 "start" => Ok(ProducerOperation::Start),
535 "stop" => Ok(ProducerOperation::Stop),
536 "remove" => Ok(ProducerOperation::Remove),
537 "exec" => Ok(ProducerOperation::Exec),
538 "network-create" => Ok(ProducerOperation::NetworkCreate),
539 "network-connect" => Ok(ProducerOperation::NetworkConnect),
540 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
541 "network-remove" => Ok(ProducerOperation::NetworkRemove),
542 "network-list" => Ok(ProducerOperation::NetworkList),
543 _ => Err(CamelError::ProcessorError(format!(
544 "Unknown container operation: {}",
545 operation
546 ))),
547 }
548}
549
550fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
551 exchange
552 .input
553 .header(HEADER_CONTAINER_NAME)
554 .and_then(|v| v.as_str().map(|s| s.to_string()))
555 .or_else(|| config.name.clone())
556}
557
558async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
559 let images = docker
560 .list_images(None::<ListImagesOptions>)
561 .await
562 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
563
564 Ok(images.iter().any(|img| {
565 img.repo_tags
566 .iter()
567 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
568 }))
569}
570
571async fn pull_image_with_progress(
572 docker: &Docker,
573 image: &str,
574 timeout_secs: u64,
575) -> Result<(), CamelError> {
576 use futures::StreamExt;
577
578 tracing::info!("Pulling image: {}", image);
579
580 let mut stream = docker.create_image(
581 Some(CreateImageOptions {
582 from_image: Some(image.to_string()),
583 ..Default::default()
584 }),
585 None,
586 None,
587 );
588
589 let start = std::time::Instant::now();
590 let mut last_progress = std::time::Instant::now();
591
592 while let Some(item) = stream.next().await {
593 if start.elapsed().as_secs() > timeout_secs {
594 return Err(CamelError::ProcessorError(format!(
595 "Image pull timeout after {}s. Try manually: docker pull {}",
596 timeout_secs, image
597 )));
598 }
599
600 match item {
601 Ok(update) => {
602 if last_progress.elapsed().as_secs() >= 2 {
604 if let Some(status) = update.status {
605 tracing::debug!("Pull progress: {}", status);
606 }
607 last_progress = std::time::Instant::now();
608 }
609 }
610 Err(e) => {
611 let err_str = e.to_string().to_lowercase();
612 if err_str.contains("unauthorized") || err_str.contains("401") {
613 return Err(CamelError::ProcessorError(format!(
614 "Authentication required for image '{}'. Configure Docker credentials: docker login",
615 image
616 )));
617 }
618 if err_str.contains("not found") || err_str.contains("404") {
619 return Err(CamelError::ProcessorError(format!(
620 "Image '{}' not found in registry. Check the image name and tag",
621 image
622 )));
623 }
624 return Err(CamelError::ProcessorError(format!(
625 "Failed to pull image '{}': {}",
626 image, e
627 )));
628 }
629 }
630 }
631
632 tracing::info!("Successfully pulled image: {}", image);
633 Ok(())
634}
635
636async fn ensure_image_available(
637 docker: &Docker,
638 image: &str,
639 auto_pull: bool,
640 timeout_secs: u64,
641) -> Result<(), CamelError> {
642 if image_exists_locally(docker, image).await? {
643 tracing::debug!("Image '{}' already available locally", image);
644 return Ok(());
645 }
646
647 if !auto_pull {
648 return Err(CamelError::ProcessorError(format!(
649 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
650 image, image
651 )));
652 }
653
654 pull_image_with_progress(docker, image, timeout_secs).await
655}
656
657fn format_docker_event(event: &bollard::models::EventMessage) -> String {
658 let action = event.action.as_deref().unwrap_or("unknown");
659 let actor = event.actor.as_ref();
660
661 let container_name = actor
662 .and_then(|a| a.attributes.as_ref())
663 .and_then(|attrs| attrs.get("name"))
664 .map(|s| s.as_str())
665 .unwrap_or("unknown");
666
667 let image = actor
668 .and_then(|a| a.attributes.as_ref())
669 .and_then(|attrs| attrs.get("image"))
670 .map(|s| s.as_str())
671 .unwrap_or("");
672
673 let exit_code = actor
674 .and_then(|a| a.attributes.as_ref())
675 .and_then(|attrs| attrs.get("exitCode"))
676 .map(|s| s.as_str());
677
678 match action {
679 "create" => {
680 if image.is_empty() {
681 format!("[CREATE] Container {}", container_name)
682 } else {
683 format!("[CREATE] Container {} ({})", container_name, image)
684 }
685 }
686 "start" => format!("[START] Container {}", container_name),
687 "die" => {
688 if let Some(code) = exit_code {
689 format!("[DIE] Container {} (exit: {})", container_name, code)
690 } else {
691 format!("[DIE] Container {}", container_name)
692 }
693 }
694 "destroy" => format!("[DESTROY] Container {}", container_name),
695 "stop" => format!("[STOP] Container {}", container_name),
696 "pause" => format!("[PAUSE] Container {}", container_name),
697 "unpause" => format!("[UNPAUSE] Container {}", container_name),
698 "restart" => format!("[RESTART] Container {}", container_name),
699 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
700 }
701}
702
703async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
704 create: CreateFn,
705 start: StartFn,
706 remove: RemoveFn,
707) -> Result<String, CamelError>
708where
709 CreateFn: FnOnce() -> CreateFut,
710 CreateFut: Future<Output = Result<String, CamelError>>,
711 StartFn: FnOnce(String) -> StartFut,
712 StartFut: Future<Output = Result<(), CamelError>>,
713 RemoveFn: FnOnce(String) -> RemoveFut,
714 RemoveFut: Future<Output = Result<(), CamelError>>,
715{
716 let container_id = create().await?;
717 if let Err(start_err) = start(container_id.clone()).await {
718 if let Err(remove_err) = remove(container_id.clone()).await {
719 return Err(CamelError::ProcessorError(format!(
720 "Failed to start container: {}. Cleanup failed: {}",
721 start_err, remove_err
722 )));
723 }
724 return Err(start_err);
725 }
726
727 Ok(container_id)
728}
729
730async fn handle_list(
731 docker: Docker,
732 _config: ContainerConfig,
733 exchange: &mut Exchange,
734) -> Result<(), CamelError> {
735 let containers = docker
736 .list_containers(None::<ListContainersOptions>)
737 .await
738 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
739
740 let json_value = serde_json::to_value(&containers).map_err(|e| {
741 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
742 })?;
743
744 exchange.input.body = Body::Json(json_value);
745 exchange.input.set_header(
746 HEADER_ACTION_RESULT,
747 serde_json::Value::String("success".to_string()),
748 );
749 Ok(())
750}
751
752async fn handle_run(
753 docker: Docker,
754 config: ContainerConfig,
755 exchange: &mut Exchange,
756) -> Result<(), CamelError> {
757 let image = exchange
758 .input
759 .header(HEADER_IMAGE)
760 .and_then(|v| v.as_str().map(|s| s.to_string()))
761 .or(config.image.clone())
762 .ok_or_else(|| {
763 CamelError::ProcessorError(
764 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
765 )
766 })?;
767
768 let image = if !image.contains(':') && !image.contains('@') {
769 format!("{}:latest", image)
770 } else {
771 image
772 };
773
774 let pull_timeout = 300;
775 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
776 .await
777 .map_err(|e| {
778 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
779 })?;
780
781 let container_name = resolve_container_name(exchange, &config);
782 let container_name_ref = container_name.as_deref().unwrap_or("");
783 let cmd_parts: Option<Vec<String>> = config
784 .cmd
785 .as_ref()
786 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
787 let auto_remove = config.auto_remove;
788 let (exposed_ports, port_bindings) = config.parse_ports()?;
789 let env_vars = config.parse_env();
790 let network_mode = config.network.clone();
791
792 let volumes_str = exchange
793 .input
794 .header(HEADER_VOLUMES)
795 .and_then(|v| v.as_str().map(|s| s.to_string()))
796 .or(config.volumes.clone());
797 let (binds, anon_volumes) = volumes_str
798 .as_deref()
799 .and_then(parse_volume_str)
800 .unwrap_or_default();
801
802 let docker_create = docker.clone();
803 let docker_start = docker.clone();
804 let docker_remove = docker.clone();
805
806 let container_id = run_container_with_cleanup(
807 move || async move {
808 let create_options = CreateContainerOptions {
809 name: Some(container_name_ref.to_string()),
810 ..Default::default()
811 };
812 let container_config = ContainerCreateBody {
813 image: Some(image.clone()),
814 cmd: cmd_parts,
815 env: env_vars,
816 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
817 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
818 host_config: Some(HostConfig {
819 auto_remove: Some(auto_remove),
820 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
821 network_mode,
822 binds: if binds.is_empty() { None } else { Some(binds) },
823 ..Default::default()
824 }),
825 ..Default::default()
826 };
827
828 let create_response = docker_create
829 .create_container(Some(create_options), container_config)
830 .await
831 .map_err(|e| {
832 let err_str = e.to_string().to_lowercase();
833 if err_str.contains("409") || err_str.contains("conflict") {
834 CamelError::ProcessorError(format!(
835 "Container name '{}' already exists. Use a unique name or remove the existing container first",
836 container_name_ref
837 ))
838 } else {
839 CamelError::ProcessorError(format!(
840 "Failed to create container: {}",
841 e
842 ))
843 }
844 })?;
845
846 Ok(create_response.id)
847 },
848 move |container_id| async move {
849 docker_start
850 .start_container(&container_id, None::<StartContainerOptions>)
851 .await
852 .map_err(|e| {
853 CamelError::ProcessorError(format!(
854 "Failed to start container: {}",
855 e
856 ))
857 })
858 },
859 move |container_id| async move {
860 docker_remove
861 .remove_container(&container_id, None)
862 .await
863 .map_err(|e| {
864 CamelError::ProcessorError(format!(
865 "Failed to remove container after start failure: {}",
866 e
867 ))
868 })
869 },
870 )
871 .await?;
872
873 track_container(container_id.clone());
874
875 exchange
876 .input
877 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
878 exchange.input.set_header(
879 HEADER_ACTION_RESULT,
880 serde_json::Value::String("success".to_string()),
881 );
882 Ok(())
883}
884
885async fn handle_lifecycle(
886 docker: Docker,
887 _config: ContainerConfig,
888 exchange: &mut Exchange,
889 operation: ProducerOperation,
890 operation_name: &str,
891) -> Result<(), CamelError> {
892 let container_id = exchange
893 .input
894 .header(HEADER_CONTAINER_ID)
895 .and_then(|v| v.as_str().map(|s| s.to_string()))
896 .ok_or_else(|| {
897 CamelError::ProcessorError(format!(
898 "{} header is required for {} operation",
899 HEADER_CONTAINER_ID, operation_name
900 ))
901 })?;
902
903 match operation {
904 ProducerOperation::Start => {
905 docker
906 .start_container(&container_id, None::<StartContainerOptions>)
907 .await
908 .map_err(|e| {
909 CamelError::ProcessorError(format!("Failed to start container: {}", e))
910 })?;
911 }
912 ProducerOperation::Stop => {
913 docker
914 .stop_container(&container_id, None)
915 .await
916 .map_err(|e| {
917 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
918 })?;
919 }
920 ProducerOperation::Remove => {
921 docker
922 .remove_container(&container_id, None)
923 .await
924 .map_err(|e| {
925 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
926 })?;
927 untrack_container(&container_id);
928 }
929 _ => {}
930 }
931
932 exchange.input.set_header(
933 HEADER_ACTION_RESULT,
934 serde_json::Value::String("success".to_string()),
935 );
936 Ok(())
937}
938
939async fn handle_exec(
940 docker: Docker,
941 config: ContainerConfig,
942 exchange: &mut Exchange,
943) -> Result<(), CamelError> {
944 let container_id = exchange
945 .input
946 .header(HEADER_CONTAINER_ID)
947 .and_then(|v| v.as_str().map(|s| s.to_string()))
948 .or(config.container_id.clone())
949 .ok_or_else(|| {
950 CamelError::ProcessorError(format!(
951 "{} header or containerId param is required for exec operation",
952 HEADER_CONTAINER_ID
953 ))
954 })?;
955
956 let cmd = exchange
957 .input
958 .header(HEADER_CMD)
959 .and_then(|v| v.as_str().map(|s| s.to_string()))
960 .or(config.cmd.clone())
961 .ok_or_else(|| {
962 CamelError::ProcessorError(
963 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
964 )
965 })?;
966
967 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
968 let env_vars = config.parse_env();
969
970 let exec_config = bollard::exec::CreateExecOptions {
971 cmd: Some(cmd_parts),
972 env: env_vars,
973 user: config.user.clone(),
974 working_dir: config.workdir.clone(),
975 attach_stdout: Some(true),
976 attach_stderr: Some(true),
977 ..Default::default()
978 };
979
980 let create_result = docker
981 .create_exec(&container_id, exec_config)
982 .await
983 .map_err(|e| {
984 let err_str = e.to_string().to_lowercase();
985 if err_str.contains("404") || err_str.contains("no such") {
986 CamelError::ProcessorError(format!(
987 "Container '{}' not found for exec",
988 container_id
989 ))
990 } else {
991 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
992 }
993 })?;
994
995 let exec_id = create_result.id;
996
997 if config.detach {
998 docker
999 .start_exec(
1000 &exec_id,
1001 Some(bollard::exec::StartExecOptions {
1002 detach: true,
1003 ..Default::default()
1004 }),
1005 )
1006 .await
1007 .map_err(|e| {
1008 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
1009 })?;
1010
1011 exchange
1012 .input
1013 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
1014 exchange
1015 .input
1016 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1017 } else {
1018 let start_result = docker
1019 .start_exec(&exec_id, None)
1020 .await
1021 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
1022
1023 let mut output = String::new();
1024
1025 match start_result {
1026 bollard::exec::StartExecResults::Attached {
1027 output: mut stream, ..
1028 } => {
1029 use futures::StreamExt;
1030 while let Some(msg) = stream.next().await {
1031 match msg {
1032 Ok(bollard::container::LogOutput::StdOut { message }) => {
1033 output.push_str(&String::from_utf8_lossy(&message));
1034 }
1035 Ok(bollard::container::LogOutput::StdErr { message }) => {
1036 output.push_str(&String::from_utf8_lossy(&message));
1037 }
1038 Ok(_) => {}
1039 Err(e) => {
1040 output.push_str(&format!("[error reading stream: {}]", e));
1041 }
1042 }
1043 }
1044 }
1045 bollard::exec::StartExecResults::Detached => {}
1046 }
1047
1048 let inspect = docker
1049 .inspect_exec(&exec_id)
1050 .await
1051 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1052
1053 let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1054 CamelError::ProcessorError("container exec returned no exit code".into())
1055 })?;
1056
1057 let output = output.trim_end().to_string();
1058 exchange.input.body = Body::Text(output);
1059 exchange.input.set_header(
1060 HEADER_EXIT_CODE,
1061 serde_json::Value::Number(exit_code.into()),
1062 );
1063 exchange
1064 .input
1065 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1066 }
1067
1068 exchange.input.set_header(
1069 HEADER_ACTION_RESULT,
1070 serde_json::Value::String("success".to_string()),
1071 );
1072 Ok(())
1073}
1074
1075async fn handle_network_create(
1076 docker: Docker,
1077 config: ContainerConfig,
1078 exchange: &mut Exchange,
1079) -> Result<(), CamelError> {
1080 let network_name = exchange
1081 .input
1082 .header(HEADER_CONTAINER_NAME)
1083 .and_then(|v| v.as_str().map(|s| s.to_string()))
1084 .or(config.name.clone())
1085 .ok_or_else(|| {
1086 CamelError::ProcessorError(
1087 "CamelContainerName header or name param is required for network-create"
1088 .to_string(),
1089 )
1090 })?;
1091
1092 let driver = config.driver.as_deref().unwrap_or("bridge");
1093
1094 let options = NetworkCreateRequest {
1095 name: network_name.clone(),
1096 driver: Some(driver.to_string()),
1097 ..Default::default()
1098 };
1099
1100 let result = docker.create_network(options).await.map_err(|e| {
1101 let err_str = e.to_string().to_lowercase();
1102 if err_str.contains("409") || err_str.contains("already exists") {
1103 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1104 } else {
1105 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1106 }
1107 })?;
1108
1109 let network_id = result.id.clone();
1110 let json_value = serde_json::to_value(&result).map_err(|e| {
1111 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1112 })?;
1113
1114 exchange.input.body = Body::Json(json_value);
1115 exchange
1116 .input
1117 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1118 exchange.input.set_header(
1119 HEADER_ACTION_RESULT,
1120 serde_json::Value::String("success".to_string()),
1121 );
1122 Ok(())
1123}
1124
1125async fn handle_network_connect(
1126 docker: Docker,
1127 config: ContainerConfig,
1128 exchange: &mut Exchange,
1129) -> Result<(), CamelError> {
1130 let network = exchange
1131 .input
1132 .header(HEADER_NETWORK)
1133 .and_then(|v| v.as_str().map(|s| s.to_string()))
1134 .or(config.network.clone())
1135 .ok_or_else(|| {
1136 CamelError::ProcessorError(
1137 "CamelContainerNetwork header or network param is required for network-connect"
1138 .to_string(),
1139 )
1140 })?;
1141
1142 let container = exchange
1143 .input
1144 .header(HEADER_CONTAINER_ID)
1145 .and_then(|v| v.as_str().map(|s| s.to_string()))
1146 .or(config.container_id.clone())
1147 .ok_or_else(|| {
1148 CamelError::ProcessorError(
1149 "CamelContainerId header or container param is required for network-connect"
1150 .to_string(),
1151 )
1152 })?;
1153
1154 docker
1155 .connect_network(
1156 &network,
1157 NetworkConnectRequest {
1158 container,
1159 ..Default::default()
1160 },
1161 )
1162 .await
1163 .map_err(|e| {
1164 let err_str = e.to_string().to_lowercase();
1165 if err_str.contains("404") || err_str.contains("not found") {
1166 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1167 } else {
1168 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1169 }
1170 })?;
1171
1172 exchange.input.set_header(
1173 HEADER_ACTION_RESULT,
1174 serde_json::Value::String("success".to_string()),
1175 );
1176 Ok(())
1177}
1178
1179async fn handle_network_disconnect(
1180 docker: Docker,
1181 config: ContainerConfig,
1182 exchange: &mut Exchange,
1183) -> Result<(), CamelError> {
1184 let network = exchange
1185 .input
1186 .header(HEADER_NETWORK)
1187 .and_then(|v| v.as_str().map(|s| s.to_string()))
1188 .or(config.network.clone())
1189 .ok_or_else(|| {
1190 CamelError::ProcessorError(
1191 "CamelContainerNetwork header or network param is required for network-disconnect"
1192 .to_string(),
1193 )
1194 })?;
1195
1196 let container = exchange
1197 .input
1198 .header(HEADER_CONTAINER_ID)
1199 .and_then(|v| v.as_str().map(|s| s.to_string()))
1200 .or(config.container_id.clone())
1201 .ok_or_else(|| {
1202 CamelError::ProcessorError(
1203 "CamelContainerId header or container param is required for network-disconnect"
1204 .to_string(),
1205 )
1206 })?;
1207
1208 docker
1209 .disconnect_network(
1210 &network,
1211 NetworkDisconnectRequest {
1212 container,
1213 force: Some(config.force),
1214 },
1215 )
1216 .await
1217 .map_err(|e| {
1218 let err_str = e.to_string().to_lowercase();
1219 if err_str.contains("404") || err_str.contains("not found") {
1220 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1221 } else {
1222 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1223 }
1224 })?;
1225
1226 exchange.input.set_header(
1227 HEADER_ACTION_RESULT,
1228 serde_json::Value::String("success".to_string()),
1229 );
1230 Ok(())
1231}
1232
1233async fn handle_network_remove(
1234 docker: Docker,
1235 config: ContainerConfig,
1236 exchange: &mut Exchange,
1237) -> Result<(), CamelError> {
1238 let network = exchange
1239 .input
1240 .header(HEADER_NETWORK)
1241 .and_then(|v| v.as_str().map(|s| s.to_string()))
1242 .or(config.network.clone())
1243 .ok_or_else(|| {
1244 CamelError::ProcessorError(
1245 "CamelContainerNetwork header or network param is required for network-remove"
1246 .to_string(),
1247 )
1248 })?;
1249
1250 docker.remove_network(&network).await.map_err(|e| {
1251 let err_str = e.to_string().to_lowercase();
1252 if err_str.contains("404") || err_str.contains("not found") {
1253 CamelError::ProcessorError(format!("Network '{}' not found", network))
1254 } else if err_str.contains("409") || err_str.contains("in use") {
1255 CamelError::ProcessorError(format!(
1256 "Network '{}' is in use and cannot be removed",
1257 network
1258 ))
1259 } else {
1260 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1261 }
1262 })?;
1263
1264 exchange.input.set_header(
1265 HEADER_ACTION_RESULT,
1266 serde_json::Value::String("success".to_string()),
1267 );
1268 Ok(())
1269}
1270
1271async fn handle_network_list(
1272 docker: Docker,
1273 _config: ContainerConfig,
1274 exchange: &mut Exchange,
1275) -> Result<(), CamelError> {
1276 let networks = docker
1277 .list_networks(None::<ListNetworksOptions>)
1278 .await
1279 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1280
1281 let json_value = serde_json::to_value(&networks)
1282 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1283
1284 exchange.input.body = Body::Json(json_value);
1285 exchange.input.set_header(
1286 HEADER_ACTION_RESULT,
1287 serde_json::Value::String("success".to_string()),
1288 );
1289 Ok(())
1290}
1291
1292#[derive(Clone)]
1297pub struct ContainerProducer {
1298 config: ContainerConfig,
1299 docker: Docker,
1300}
1301
1302impl Service<Exchange> for ContainerProducer {
1303 type Response = Exchange;
1304 type Error = CamelError;
1305 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1306
1307 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1308 Poll::Ready(Ok(()))
1309 }
1310
1311 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1312 let config = self.config.clone();
1313 let docker = self.docker.clone();
1314 Box::pin(async move {
1315 let operation_name = exchange
1316 .input
1317 .header(HEADER_ACTION)
1318 .and_then(|v| v.as_str().map(|s| s.to_string()))
1319 .unwrap_or_else(|| config.operation.clone());
1320
1321 let operation = parse_producer_operation(&operation_name)?;
1322
1323 match operation {
1324 ProducerOperation::List => {
1325 handle_list(docker, config, &mut exchange).await?;
1326 }
1327 ProducerOperation::Run => {
1328 handle_run(docker, config, &mut exchange).await?;
1329 }
1330 ProducerOperation::Start => {
1331 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1332 .await?;
1333 }
1334 ProducerOperation::Stop => {
1335 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1336 .await?;
1337 }
1338 ProducerOperation::Remove => {
1339 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1340 .await?;
1341 }
1342 ProducerOperation::Exec => {
1343 handle_exec(docker, config, &mut exchange).await?;
1344 }
1345 ProducerOperation::NetworkCreate => {
1346 handle_network_create(docker, config, &mut exchange).await?;
1347 }
1348 ProducerOperation::NetworkConnect => {
1349 handle_network_connect(docker, config, &mut exchange).await?;
1350 }
1351 ProducerOperation::NetworkDisconnect => {
1352 handle_network_disconnect(docker, config, &mut exchange).await?;
1353 }
1354 ProducerOperation::NetworkRemove => {
1355 handle_network_remove(docker, config, &mut exchange).await?;
1356 }
1357 ProducerOperation::NetworkList => {
1358 handle_network_list(docker, config, &mut exchange).await?;
1359 }
1360 }
1361
1362 Ok(exchange)
1363 })
1364 }
1365}
1366
1367pub struct ContainerConsumer {
1372 config: ContainerConfig,
1373 runtime: Arc<dyn RuntimeObservability>,
1376}
1377
1378impl ContainerConsumer {
1379 pub fn new(config: ContainerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
1380 Self { config, runtime }
1381 }
1382}
1383
1384#[async_trait]
1385impl Consumer for ContainerConsumer {
1386 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1387 match self.config.operation.as_str() {
1388 "events" => self.start_events_consumer(context).await,
1389 "logs" => self.start_logs_consumer(context).await,
1390 _ => Err(CamelError::EndpointCreationFailed(format!(
1391 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1392 self.config.operation
1393 ))),
1394 }
1395 }
1396
1397 async fn stop(&mut self) -> Result<(), CamelError> {
1398 Ok(())
1399 }
1400
1401 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1402 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1403 }
1404}
1405
1406impl ContainerConsumer {
1407 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1408 use futures::StreamExt;
1409
1410 let cancel = context.cancel_token();
1411
1412 loop {
1416 let docker = match retry_async_cancelable(
1417 &self.config.reconnect,
1418 Some("container-events"),
1419 || async { self.config.connect_docker().await },
1420 |_| true,
1421 &cancel,
1422 )
1423 .await
1424 {
1425 Ok(d) => d,
1426 Err(_) if context.is_cancelled() => {
1427 tracing::info!("Container events consumer shutting down");
1428 return Ok(());
1429 }
1430 Err(e) => {
1431 self.runtime
1432 .metrics()
1433 .increment_errors(context.route_id(), "e:container:events-connect");
1434 tracing::error!(error = %e, "Container events consumer exhausted reconnect attempts");
1436 return Err(e);
1437 }
1438 };
1439
1440 let mut event_stream = docker.events(None::<EventsOptions>);
1441
1442 loop {
1443 tokio::select! {
1444 _ = context.cancelled() => {
1445 tracing::info!("Container events consumer shutting down");
1446 return Ok(());
1447 }
1448
1449 msg = event_stream.next() => {
1450 match msg {
1451 Some(Ok(event)) => {
1452 let formatted = format_docker_event(&event);
1453 let message = Message::new(Body::Text(formatted));
1454 let exchange = Exchange::new(message);
1455
1456 if let Err(e) = context.send(exchange).await {
1457 tracing::error!("Failed to send exchange: {:?}", e);
1459 break;
1460 }
1461 }
1462 Some(Err(e)) => {
1463 self.runtime.metrics().increment_errors(context.route_id(), "e:container:events-stream");
1464 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1466 break;
1467 }
1468 None => {
1469 tracing::info!("Docker event stream ended. Reconnecting...");
1470 break;
1471 }
1472 }
1473 }
1474 }
1475 }
1476
1477 tokio::select! {
1478 _ = context.cancelled() => {
1479 tracing::info!("Container events consumer shutting down");
1480 return Ok(());
1481 }
1482 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1483 }
1484 }
1485 }
1486
1487 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1488 use futures::StreamExt;
1489
1490 let container_id = self.config.container_id.clone().ok_or_else(|| {
1491 CamelError::EndpointCreationFailed(
1492 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1493 .to_string(),
1494 )
1495 })?;
1496
1497 let cancel = context.cancel_token();
1498
1499 loop {
1503 let docker = match retry_async_cancelable(
1504 &self.config.reconnect,
1505 Some("container-logs"),
1506 || async { self.config.connect_docker().await },
1507 |_| true,
1508 &cancel,
1509 )
1510 .await
1511 {
1512 Ok(d) => d,
1513 Err(_) if context.is_cancelled() => {
1514 tracing::info!("Container logs consumer shutting down");
1515 return Ok(());
1516 }
1517 Err(e) => {
1518 self.runtime
1519 .metrics()
1520 .increment_errors(context.route_id(), "e:container:logs-connect");
1521 tracing::error!(error = %e, "Container logs consumer exhausted reconnect attempts");
1523 return Err(e);
1524 }
1525 };
1526
1527 let tail = self
1528 .config
1529 .tail
1530 .clone()
1531 .unwrap_or_else(|| "all".to_string());
1532
1533 let options = LogsOptions {
1534 follow: self.config.follow,
1535 stdout: true,
1536 stderr: true,
1537 timestamps: self.config.timestamps,
1538 tail,
1539 ..Default::default()
1540 };
1541
1542 let mut log_stream = docker.logs(&container_id, Some(options));
1543 let container_id_header = container_id.clone();
1544
1545 loop {
1546 tokio::select! {
1547 _ = context.cancelled() => {
1548 tracing::info!("Container logs consumer shutting down");
1549 return Ok(());
1550 }
1551
1552 msg = log_stream.next() => {
1553 match msg {
1554 Some(Ok(log_output)) => {
1555 let (stream_type, content) = match log_output {
1556 bollard::container::LogOutput::StdOut { message } => {
1557 ("stdout", String::from_utf8_lossy(&message).into_owned())
1558 }
1559 bollard::container::LogOutput::StdErr { message } => {
1560 ("stderr", String::from_utf8_lossy(&message).into_owned())
1561 }
1562 bollard::container::LogOutput::Console { message } => {
1563 ("console", String::from_utf8_lossy(&message).into_owned())
1564 }
1565 bollard::container::LogOutput::StdIn { message } => {
1566 ("stdin", String::from_utf8_lossy(&message).into_owned())
1567 }
1568 };
1569
1570 let content = content.trim_end();
1571 if content.is_empty() {
1572 continue;
1573 }
1574
1575 let mut message = Message::new(Body::Text(content.to_string()));
1576 message.set_header(
1577 HEADER_CONTAINER_ID,
1578 serde_json::Value::String(container_id_header.clone()),
1579 );
1580 message.set_header(
1581 HEADER_LOG_STREAM,
1582 serde_json::Value::String(stream_type.to_string()),
1583 );
1584
1585 if self.config.timestamps
1586 && let Some(ts) = extract_timestamp(content) {
1587 message.set_header(
1588 HEADER_LOG_TIMESTAMP,
1589 serde_json::Value::String(ts),
1590 );
1591 }
1592
1593 let exchange = Exchange::new(message);
1594
1595 if let Err(e) = context.send(exchange).await {
1596 tracing::error!("Failed to send log exchange: {:?}", e);
1598 break;
1599 }
1600 }
1601 Some(Err(e)) => {
1602 self.runtime.metrics().increment_errors(context.route_id(), "e:container:logs-stream");
1603 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1605 break;
1606 }
1607 None => {
1608 if self.config.follow {
1609 tracing::info!("Docker log stream ended. Reconnecting...");
1610 break;
1611 } else {
1612 tracing::info!("Container logs consumer finished (follow=false)");
1613 return Ok(());
1614 }
1615 }
1616 }
1617 }
1618 }
1619 }
1620
1621 tokio::select! {
1622 _ = context.cancelled() => {
1623 tracing::info!("Container logs consumer shutting down");
1624 return Ok(());
1625 }
1626 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1627 }
1628 }
1629 }
1630}
1631
1632fn extract_timestamp(log_line: &str) -> Option<String> {
1633 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1634 if parts.len() > 1 && parts[0].contains('T') {
1635 Some(parts[0].to_string())
1636 } else {
1637 None
1638 }
1639}
1640
1641pub struct ContainerComponent {
1649 config: Option<ContainerGlobalConfig>,
1650}
1651
1652impl ContainerComponent {
1653 pub fn new() -> Self {
1655 Self { config: None }
1656 }
1657
1658 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1660 Self {
1661 config: Some(config),
1662 }
1663 }
1664
1665 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1667 Self { config }
1668 }
1669}
1670
1671impl Default for ContainerComponent {
1672 fn default() -> Self {
1673 Self::new()
1674 }
1675}
1676
1677impl Component for ContainerComponent {
1678 fn scheme(&self) -> &str {
1679 "container"
1680 }
1681
1682 fn create_endpoint(
1683 &self,
1684 uri: &str,
1685 ctx: &dyn camel_component_api::ComponentContext,
1686 ) -> Result<Box<dyn Endpoint>, CamelError> {
1687 let mut config = ContainerConfig::from_uri(uri)?;
1688 if let Some(ref global) = self.config {
1690 config.apply_global_defaults(global);
1691 }
1692 let health_check = ContainerHealthCheck::new(&config);
1693 ctx.register_current_route_health_check(Arc::new(health_check));
1694 Ok(Box::new(ContainerEndpoint {
1695 uri: uri.to_string(),
1696 config,
1697 }))
1698 }
1699}
1700
1701pub struct ContainerEndpoint {
1708 uri: String,
1709 config: ContainerConfig,
1710}
1711
1712impl ContainerEndpoint {
1713 pub fn docker_host(&self) -> Option<&str> {
1716 self.config.host.as_deref()
1717 }
1718}
1719
1720impl Endpoint for ContainerEndpoint {
1721 fn uri(&self) -> &str {
1722 &self.uri
1723 }
1724
1725 fn create_consumer(
1726 &self,
1727 rt: Arc<dyn camel_component_api::RuntimeObservability>,
1728 ) -> Result<Box<dyn Consumer>, CamelError> {
1729 Ok(Box::new(ContainerConsumer::new(self.config.clone(), rt)))
1730 }
1731
1732 fn create_producer(
1733 &self,
1734 _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1735 _ctx: &ProducerContext,
1736 ) -> Result<BoxProcessor, CamelError> {
1737 let docker = self.config.connect_docker_client()?;
1738 Ok(BoxProcessor::new(ContainerProducer {
1739 config: self.config.clone(),
1740 docker,
1741 }))
1742 }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use camel_component_api::test_support::PanicRuntimeObservability;
1748 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1749 std::sync::Arc::new(PanicRuntimeObservability)
1750 }
1751 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1752 std::sync::Arc::new(PanicRuntimeObservability)
1753 }
1754
1755 use super::*;
1756 use camel_api::MetricsCollector;
1757 use camel_component_api::HealthCheckRegistry;
1758 use camel_component_api::NoOpComponentContext;
1759
1760 #[test]
1761 fn test_container_config() {
1762 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1763 assert_eq!(config.operation, "run");
1764 assert_eq!(config.image.as_deref(), Some("alpine"));
1765 assert!(config.host.is_none());
1767 }
1768
1769 #[test]
1770 fn test_global_config_applied_to_endpoint() {
1771 let global =
1774 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1775 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1776 assert!(
1777 config.host.is_none(),
1778 "URI without ?host= should leave host as None"
1779 );
1780 config.apply_global_defaults(&global);
1781 assert_eq!(
1782 config.host.as_deref(),
1783 Some("unix:///custom/docker.sock"),
1784 "global docker_host must be applied when URI did not set host"
1785 );
1786 }
1787
1788 #[test]
1789 fn test_uri_param_wins_over_global_config() {
1790 let global =
1792 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1793 let mut config =
1794 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1795 .unwrap();
1796 assert_eq!(
1797 config.host.as_deref(),
1798 Some("unix:///override.sock"),
1799 "URI-set host should be parsed correctly"
1800 );
1801 config.apply_global_defaults(&global);
1802 assert_eq!(
1803 config.host.as_deref(),
1804 Some("unix:///override.sock"),
1805 "global config must NOT override a host already set by URI"
1806 );
1807 }
1808
1809 #[test]
1810 fn test_container_config_parses_name() {
1811 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1812 assert_eq!(config.name.as_deref(), Some("my-container"));
1813 }
1814
1815 #[test]
1816 fn test_parse_producer_operation_known() {
1817 assert_eq!(
1818 parse_producer_operation("list").unwrap(),
1819 ProducerOperation::List
1820 );
1821 assert_eq!(
1822 parse_producer_operation("run").unwrap(),
1823 ProducerOperation::Run
1824 );
1825 assert_eq!(
1826 parse_producer_operation("start").unwrap(),
1827 ProducerOperation::Start
1828 );
1829 assert_eq!(
1830 parse_producer_operation("stop").unwrap(),
1831 ProducerOperation::Stop
1832 );
1833 assert_eq!(
1834 parse_producer_operation("remove").unwrap(),
1835 ProducerOperation::Remove
1836 );
1837 }
1838
1839 #[test]
1840 fn test_parse_producer_operation_unknown() {
1841 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1842 match err {
1843 CamelError::ProcessorError(msg) => {
1844 assert!(
1845 msg.contains("Unknown container operation"),
1846 "Unexpected error message: {}",
1847 msg
1848 );
1849 }
1850 _ => panic!("Expected ProcessorError for unknown operation"),
1851 }
1852 }
1853
1854 #[test]
1855 fn test_parse_producer_operation_new_variants() {
1856 assert_eq!(
1857 parse_producer_operation("exec").unwrap(),
1858 ProducerOperation::Exec
1859 );
1860 assert_eq!(
1861 parse_producer_operation("network-create").unwrap(),
1862 ProducerOperation::NetworkCreate
1863 );
1864 assert_eq!(
1865 parse_producer_operation("network-connect").unwrap(),
1866 ProducerOperation::NetworkConnect
1867 );
1868 assert_eq!(
1869 parse_producer_operation("network-disconnect").unwrap(),
1870 ProducerOperation::NetworkDisconnect
1871 );
1872 assert_eq!(
1873 parse_producer_operation("network-remove").unwrap(),
1874 ProducerOperation::NetworkRemove
1875 );
1876 assert_eq!(
1877 parse_producer_operation("network-list").unwrap(),
1878 ProducerOperation::NetworkList
1879 );
1880 }
1881
1882 #[test]
1883 fn test_resolve_container_name_header_overrides_config() {
1884 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1885 let mut exchange = Exchange::new(Message::new(""));
1886 exchange.input.set_header(
1887 HEADER_CONTAINER_NAME,
1888 serde_json::Value::String("header-name".to_string()),
1889 );
1890
1891 let resolved = resolve_container_name(&exchange, &config);
1892 assert_eq!(resolved.as_deref(), Some("header-name"));
1893 }
1894
1895 #[test]
1896 fn test_container_config_rejects_tcp_host() {
1897 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1898 let err = config.connect_docker_client().unwrap_err();
1899 match err {
1900 CamelError::ProcessorError(msg) => {
1901 assert!(
1902 msg.to_lowercase().contains("tcp"),
1903 "Expected TCP scheme error, got: {}",
1904 msg
1905 );
1906 }
1907 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1908 }
1909 }
1910
1911 #[tokio::test]
1912 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1913 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1914 let remove_called_clone = remove_called.clone();
1915
1916 let result = run_container_with_cleanup(
1917 || async { Ok("container-123".to_string()) },
1918 |_id| async move {
1919 Err(CamelError::ProcessorError(
1920 "Failed to start container".to_string(),
1921 ))
1922 },
1923 move |_id| {
1924 let remove_called_inner = remove_called_clone.clone();
1925 async move {
1926 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1927 Ok(())
1928 }
1929 },
1930 )
1931 .await;
1932
1933 assert!(result.is_err(), "Expected start failure to bubble up");
1934 assert!(
1935 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1936 "Expected cleanup to remove container"
1937 );
1938 }
1939
1940 #[test]
1941 fn test_container_component_creates_endpoint() {
1942 let component = ContainerComponent::new();
1943 assert_eq!(component.scheme(), "container");
1944 let ctx = NoOpComponentContext;
1945 let endpoint = component
1946 .create_endpoint("container:run?image=alpine", &ctx)
1947 .unwrap();
1948 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1949 }
1950
1951 #[test]
1952 fn test_container_config_parses_ports() {
1953 let config =
1954 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1955 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1956 }
1957
1958 #[test]
1959 fn test_container_config_parses_env() {
1960 let config =
1961 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1962 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1963 }
1964
1965 #[test]
1966 fn test_container_config_parses_logs_options() {
1967 let config = ContainerConfig::from_uri(
1968 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1969 )
1970 .unwrap();
1971 assert_eq!(config.operation, "logs");
1972 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1973 assert!(config.follow);
1974 assert!(config.timestamps);
1975 assert_eq!(config.tail.as_deref(), Some("100"));
1976 }
1977
1978 #[test]
1979 fn test_container_config_logs_defaults() {
1980 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1981 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1985
1986 #[test]
1987 fn test_parse_ports_single() {
1988 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1989 let (exposed, bindings) = config.parse_ports().unwrap();
1990
1991 assert!(exposed.contains(&"80/tcp".to_string()));
1992 assert!(bindings.contains_key("80/tcp"));
1993
1994 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1995 assert_eq!(binding.len(), 1);
1996 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1997 }
1998
1999 #[test]
2000 fn test_parse_ports_multiple() {
2001 let config =
2002 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
2003 let (exposed, bindings) = config.parse_ports().unwrap();
2004
2005 assert!(exposed.contains(&"80/tcp".to_string()));
2006 assert!(exposed.contains(&"443/tcp".to_string()));
2007 assert_eq!(bindings.len(), 2);
2008 }
2009
2010 #[test]
2011 fn test_parse_ports_with_protocol() {
2012 let config =
2013 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
2014 .unwrap();
2015 let (exposed, _bindings) = config.parse_ports().unwrap();
2016
2017 assert!(exposed.contains(&"80/tcp".to_string()));
2018 assert!(exposed.contains(&"53/udp".to_string()));
2019 }
2020
2021 #[test]
2022 fn test_parse_ports_none() {
2023 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2024 let (exposed, bindings) = config.parse_ports().unwrap();
2025 assert!(exposed.is_empty());
2026 assert!(bindings.is_empty());
2027 }
2028
2029 #[test]
2030 fn test_parse_env_single() {
2031 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
2032 let env = config.parse_env().unwrap();
2033
2034 assert_eq!(env.len(), 1);
2035 assert_eq!(env[0], "FOO=bar");
2036 }
2037
2038 #[test]
2039 fn test_parse_env_multiple() {
2040 let config =
2041 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
2042 .unwrap();
2043 let env = config.parse_env().unwrap();
2044
2045 assert_eq!(env.len(), 3);
2046 assert!(env.contains(&"FOO=bar".to_string()));
2047 assert!(env.contains(&"BAZ=qux".to_string()));
2048 assert!(env.contains(&"NUM=123".to_string()));
2049 }
2050
2051 #[test]
2052 fn test_parse_env_none() {
2053 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2054 assert!(config.parse_env().is_none());
2055 }
2056
2057 use camel_component_api::Message;
2058 use std::sync::Arc;
2059
2060 #[tokio::test]
2061 async fn test_container_producer_resolves_operation_from_header() {
2062 let docker = match Docker::connect_with_local_defaults() {
2064 Ok(d) => d,
2065 Err(_) => {
2066 eprintln!("Skipping test: Could not connect to Docker daemon");
2067 return;
2068 }
2069 };
2070
2071 if docker.ping().await.is_err() {
2072 eprintln!("Skipping test: Docker daemon not responding to ping");
2073 return;
2074 }
2075
2076 let component = ContainerComponent::new();
2077 let ctx = NoOpComponentContext;
2078 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2079
2080 let ctx = ProducerContext::new();
2081 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2082
2083 let mut exchange = Exchange::new(Message::new(""));
2084 exchange
2085 .input
2086 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2087
2088 use tower::ServiceExt;
2089 let result = producer
2090 .ready()
2091 .await
2092 .unwrap()
2093 .call(exchange)
2094 .await
2095 .unwrap();
2096
2097 assert_eq!(
2099 result
2100 .input
2101 .header(HEADER_ACTION_RESULT)
2102 .map(|v| v.as_str().unwrap()),
2103 Some("success")
2104 );
2105 }
2106
2107 #[tokio::test]
2108 async fn test_container_producer_connection_error_on_invalid_host() {
2109 let component = ContainerComponent::new();
2111 let ctx = NoOpComponentContext;
2112 let endpoint = component
2113 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2114 .unwrap();
2115
2116 let ctx = ProducerContext::new();
2117 let result = endpoint.create_producer(rt(), &ctx);
2118
2119 assert!(
2121 result.is_err(),
2122 "Expected error when connecting to invalid host"
2123 );
2124 let err = result.unwrap_err();
2125 match &err {
2126 CamelError::ProcessorError(msg) => {
2127 assert!(
2128 msg.to_lowercase().contains("connection")
2129 || msg.to_lowercase().contains("connect")
2130 || msg.to_lowercase().contains("socket")
2131 || msg.contains("docker"),
2132 "Error message should indicate connection failure, got: {}",
2133 msg
2134 );
2135 }
2136 _ => panic!("Expected ProcessorError, got: {:?}", err),
2137 }
2138 }
2139
2140 #[tokio::test]
2142 async fn test_container_producer_lifecycle_operations_missing_id() {
2143 let docker = match Docker::connect_with_local_defaults() {
2145 Ok(d) => d,
2146 Err(_) => {
2147 eprintln!("Skipping test: Could not connect to Docker daemon");
2148 return;
2149 }
2150 };
2151
2152 if docker.ping().await.is_err() {
2153 eprintln!("Skipping test: Docker daemon not responding to ping");
2154 return;
2155 }
2156
2157 let component = ContainerComponent::new();
2158 let ctx = NoOpComponentContext;
2159 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2160 let ctx = ProducerContext::new();
2161 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2162
2163 for operation in ["start", "stop", "remove"] {
2165 let mut exchange = Exchange::new(Message::new(""));
2166 exchange.input.set_header(
2167 HEADER_ACTION,
2168 serde_json::Value::String(operation.to_string()),
2169 );
2170 use tower::ServiceExt;
2173 let result = producer.ready().await.unwrap().call(exchange).await;
2174
2175 assert!(
2176 result.is_err(),
2177 "Expected error for {} operation without CamelContainerId",
2178 operation
2179 );
2180 let err = result.unwrap_err();
2181 match &err {
2182 CamelError::ProcessorError(msg) => {
2183 assert!(
2184 msg.contains(HEADER_CONTAINER_ID),
2185 "Error message should mention {}, got: {}",
2186 HEADER_CONTAINER_ID,
2187 msg
2188 );
2189 }
2190 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2191 }
2192 }
2193 }
2194
2195 #[tokio::test]
2197 async fn test_container_producer_stop_nonexistent() {
2198 let docker = match Docker::connect_with_local_defaults() {
2200 Ok(d) => d,
2201 Err(_) => {
2202 eprintln!("Skipping test: Could not connect to Docker daemon");
2203 return;
2204 }
2205 };
2206
2207 if docker.ping().await.is_err() {
2208 eprintln!("Skipping test: Docker daemon not responding to ping");
2209 return;
2210 }
2211
2212 let component = ContainerComponent::new();
2213 let ctx = NoOpComponentContext;
2214 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2215 let ctx = ProducerContext::new();
2216 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2217
2218 let mut exchange = Exchange::new(Message::new(""));
2219 exchange
2220 .input
2221 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2222 exchange.input.set_header(
2223 HEADER_CONTAINER_ID,
2224 serde_json::Value::String("nonexistent-container-123".into()),
2225 );
2226
2227 use tower::ServiceExt;
2228 let result = producer.ready().await.unwrap().call(exchange).await;
2229
2230 assert!(
2231 result.is_err(),
2232 "Expected error when stopping nonexistent container"
2233 );
2234 let err = result.unwrap_err();
2235 match &err {
2236 CamelError::ProcessorError(msg) => {
2237 assert!(
2239 msg.to_lowercase().contains("no such container")
2240 || msg.to_lowercase().contains("not found")
2241 || msg.contains("404"),
2242 "Error message should indicate container not found, got: {}",
2243 msg
2244 );
2245 }
2246 _ => panic!("Expected ProcessorError, got: {:?}", err),
2247 }
2248 }
2249
2250 #[tokio::test]
2252 async fn test_container_producer_run_missing_image() {
2253 let docker = match Docker::connect_with_local_defaults() {
2255 Ok(d) => d,
2256 Err(_) => {
2257 eprintln!("Skipping test: Could not connect to Docker daemon");
2258 return;
2259 }
2260 };
2261
2262 if docker.ping().await.is_err() {
2263 eprintln!("Skipping test: Docker daemon not responding to ping");
2264 return;
2265 }
2266
2267 let component = ContainerComponent::new();
2269 let ctx = NoOpComponentContext;
2270 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2271 let ctx = ProducerContext::new();
2272 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2273
2274 let mut exchange = Exchange::new(Message::new(""));
2275 exchange
2276 .input
2277 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2278 use tower::ServiceExt;
2281 let result = producer.ready().await.unwrap().call(exchange).await;
2282
2283 assert!(
2284 result.is_err(),
2285 "Expected error for run operation without image"
2286 );
2287 let err = result.unwrap_err();
2288 match &err {
2289 CamelError::ProcessorError(msg) => {
2290 assert!(
2291 msg.to_lowercase().contains("image"),
2292 "Error message should mention 'image', got: {}",
2293 msg
2294 );
2295 }
2296 _ => panic!("Expected ProcessorError, got: {:?}", err),
2297 }
2298 }
2299
2300 #[tokio::test]
2302 async fn test_container_producer_run_image_from_header() {
2303 let docker = match Docker::connect_with_local_defaults() {
2305 Ok(d) => d,
2306 Err(_) => {
2307 eprintln!("Skipping test: Could not connect to Docker daemon");
2308 return;
2309 }
2310 };
2311
2312 if docker.ping().await.is_err() {
2313 eprintln!("Skipping test: Docker daemon not responding to ping");
2314 return;
2315 }
2316
2317 let component = ContainerComponent::new();
2319 let ctx = NoOpComponentContext;
2320 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2321 let ctx = ProducerContext::new();
2322 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2323
2324 let mut exchange = Exchange::new(Message::new(""));
2325 exchange
2326 .input
2327 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2328 exchange.input.set_header(
2330 HEADER_IMAGE,
2331 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2332 );
2333
2334 use tower::ServiceExt;
2335 let result = producer.ready().await.unwrap().call(exchange).await;
2336
2337 assert!(
2339 result.is_err(),
2340 "Expected error when running container with nonexistent image"
2341 );
2342 let err = result.unwrap_err();
2343 match &err {
2344 CamelError::ProcessorError(msg) => {
2345 assert!(
2347 msg.to_lowercase().contains("no such image")
2348 || msg.to_lowercase().contains("not found")
2349 || msg.to_lowercase().contains("image")
2350 || msg.to_lowercase().contains("pull")
2351 || msg.contains("404"),
2352 "Error message should indicate image issue, got: {}",
2353 msg
2354 );
2355 }
2356 _ => panic!("Expected ProcessorError, got: {:?}", err),
2357 }
2358 }
2359
2360 #[tokio::test]
2363 async fn test_container_producer_run_alpine_container() {
2364 let docker = match Docker::connect_with_local_defaults() {
2365 Ok(d) => d,
2366 Err(_) => {
2367 eprintln!("Skipping test: Could not connect to Docker daemon");
2368 return;
2369 }
2370 };
2371
2372 if docker.ping().await.is_err() {
2373 eprintln!("Skipping test: Docker daemon not responding to ping");
2374 return;
2375 }
2376
2377 let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2379 let has_alpine = images
2380 .iter()
2381 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2382
2383 if !has_alpine {
2384 eprintln!("Pulling alpine:latest image...");
2385 let mut stream = docker.create_image(
2386 Some(CreateImageOptions {
2387 from_image: Some("alpine:latest".to_string()),
2388 ..Default::default()
2389 }),
2390 None,
2391 None,
2392 );
2393
2394 use futures::StreamExt;
2395 while let Some(_item) = stream.next().await {
2396 }
2398 eprintln!("Image pulled successfully");
2399 }
2400
2401 let component = ContainerComponent::new();
2403 let ctx = NoOpComponentContext;
2404 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2405 let ctx = ProducerContext::new();
2406 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2407
2408 let timestamp = std::time::SystemTime::now()
2410 .duration_since(std::time::UNIX_EPOCH)
2411 .unwrap()
2412 .as_millis();
2413 let container_name = format!("test-rust-camel-{}", timestamp);
2414 let mut exchange = Exchange::new(Message::new(""));
2415 exchange.input.set_header(
2416 HEADER_IMAGE,
2417 serde_json::Value::String("alpine:latest".into()),
2418 );
2419 exchange.input.set_header(
2420 HEADER_CONTAINER_NAME,
2421 serde_json::Value::String(container_name.clone()),
2422 );
2423
2424 use tower::ServiceExt;
2425 let result = producer
2426 .ready()
2427 .await
2428 .unwrap()
2429 .call(exchange)
2430 .await
2431 .expect("Container run should succeed");
2432
2433 let container_id = result
2435 .input
2436 .header(HEADER_CONTAINER_ID)
2437 .and_then(|v| v.as_str().map(|s| s.to_string()))
2438 .expect("Expected container ID header");
2439 assert!(!container_id.is_empty(), "Container ID should not be empty");
2440
2441 assert_eq!(
2443 result
2444 .input
2445 .header(HEADER_ACTION_RESULT)
2446 .and_then(|v| v.as_str()),
2447 Some("success")
2448 );
2449
2450 let inspect = docker
2452 .inspect_container(&container_id, None)
2453 .await
2454 .expect("Container should exist");
2455 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2456
2457 docker
2459 .remove_container(
2460 &container_id,
2461 Some(RemoveContainerOptions {
2462 force: true,
2463 ..Default::default()
2464 }),
2465 )
2466 .await
2467 .ok();
2468
2469 eprintln!("✅ Container {} created and cleaned up", container_id);
2470 }
2471
2472 #[tokio::test]
2474 async fn test_container_consumer_unsupported_operation() {
2475 use tokio::sync::mpsc;
2476
2477 let component = ContainerComponent::new();
2478 let ctx = NoOpComponentContext;
2479 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2480 let mut consumer = endpoint.create_consumer(rt()).unwrap();
2481
2482 let (tx, _rx) = mpsc::channel(16);
2484 let cancel_token = tokio_util::sync::CancellationToken::new();
2485 let context = ConsumerContext::new(tx, cancel_token, "container-test-route".to_string());
2486
2487 let result = consumer.start(context).await;
2488
2489 assert!(
2491 result.is_err(),
2492 "Expected error for unsupported consumer operation"
2493 );
2494 let err = result.unwrap_err();
2495 match &err {
2496 CamelError::EndpointCreationFailed(msg) => {
2497 assert!(
2498 msg.contains("Consumer only supports 'events' or 'logs'"),
2499 "Error message should mention events or logs support, got: {}",
2500 msg
2501 );
2502 }
2503 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2504 }
2505 }
2506
2507 #[test]
2508 fn test_container_consumer_concurrency_model_is_concurrent() {
2509 let consumer = ContainerConsumer {
2510 config: ContainerConfig::from_uri("container:events").unwrap(),
2511 runtime: test_rt(),
2512 };
2513
2514 assert_eq!(
2515 consumer.concurrency_model(),
2516 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2517 );
2518 }
2519
2520 #[tokio::test]
2524 async fn test_container_consumer_cancellation() {
2525 use std::sync::atomic::{AtomicBool, Ordering};
2526 use tokio::sync::mpsc;
2527
2528 let docker = match Docker::connect_with_local_defaults() {
2530 Ok(d) => d,
2531 Err(_) => {
2532 eprintln!("Skipping test: Could not connect to Docker daemon");
2533 return;
2534 }
2535 };
2536
2537 if docker.ping().await.is_err() {
2538 eprintln!("Skipping test: Docker daemon not responding to ping");
2539 return;
2540 }
2541
2542 let component = ContainerComponent::new();
2543 let ctx = NoOpComponentContext;
2544 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2545 let mut consumer = endpoint.create_consumer(rt()).unwrap();
2546
2547 let (tx, _rx) = mpsc::channel(16);
2549 let cancel_token = tokio_util::sync::CancellationToken::new();
2550 let context =
2551 ConsumerContext::new(tx, cancel_token.clone(), "container-test-route".to_string());
2552
2553 let completed = Arc::new(AtomicBool::new(false));
2555 let completed_clone = completed.clone();
2556
2557 let handle = tokio::spawn(async move {
2559 let result = consumer.start(context).await;
2560 completed_clone.store(true, Ordering::SeqCst);
2562 result
2563 });
2564
2565 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2567
2568 assert!(
2570 !completed.load(Ordering::SeqCst),
2571 "Consumer should still be running before cancellation"
2572 );
2573
2574 cancel_token.cancel();
2576
2577 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2579
2580 assert!(
2582 result.is_ok(),
2583 "Consumer should gracefully shut down after cancellation"
2584 );
2585
2586 assert!(
2588 completed.load(Ordering::SeqCst),
2589 "Consumer should have completed after cancellation"
2590 );
2591 }
2592
2593 #[tokio::test]
2597 async fn test_container_producer_list_containers() {
2598 let docker = match Docker::connect_with_local_defaults() {
2601 Ok(d) => d,
2602 Err(_) => {
2603 eprintln!("Skipping test: Could not connect to Docker daemon");
2604 return;
2605 }
2606 };
2607
2608 if docker.ping().await.is_err() {
2609 eprintln!("Skipping test: Docker daemon not responding to ping");
2610 return;
2611 }
2612
2613 let component = ContainerComponent::new();
2615 let ctx = NoOpComponentContext;
2616 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2617
2618 let ctx = ProducerContext::new();
2619 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2620
2621 let mut exchange = Exchange::new(Message::new(""));
2623 exchange
2624 .input
2625 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2626
2627 use tower::ServiceExt;
2629 let result = producer
2630 .ready()
2631 .await
2632 .unwrap()
2633 .call(exchange)
2634 .await
2635 .expect("Producer should succeed when Docker is available");
2636
2637 match &result.input.body {
2640 camel_component_api::Body::Json(json_value) => {
2641 assert!(
2642 json_value.is_array(),
2643 "Expected input body to be a JSON array, got: {:?}",
2644 json_value
2645 );
2646 }
2647 other => panic!("Expected Body::Json with array, got: {:?}", other),
2648 }
2649 }
2650
2651 #[test]
2652 fn test_container_config_parses_volumes() {
2653 let config = ContainerConfig::from_uri(
2654 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2655 )
2656 .unwrap();
2657 assert_eq!(
2658 config.volumes.as_deref(),
2659 Some("./html:/usr/share/nginx/html:ro")
2660 );
2661 }
2662
2663 #[test]
2664 fn test_container_config_parses_exec_params() {
2665 let config = ContainerConfig::from_uri(
2666 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2667 )
2668 .unwrap();
2669 assert_eq!(config.operation, "exec");
2670 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2671 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2672 assert_eq!(config.user.as_deref(), Some("root"));
2673 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2674 assert!(config.detach);
2675 }
2676
2677 #[test]
2678 fn test_container_config_parses_network_create_params() {
2679 let config =
2680 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2681 .unwrap();
2682 assert_eq!(config.operation, "network-create");
2683 assert_eq!(config.name.as_deref(), Some("my-net"));
2684 assert_eq!(config.driver.as_deref(), Some("bridge"));
2685 }
2686
2687 #[test]
2688 fn test_container_config_defaults_new_fields() {
2689 let config = ContainerConfig::from_uri("container:list").unwrap();
2690 assert!(config.volumes.is_none());
2691 assert!(config.user.is_none());
2692 assert!(config.workdir.is_none());
2693 assert!(!config.detach);
2694 assert!(config.driver.is_none());
2695 assert!(!config.force);
2696 }
2697
2698 #[test]
2699 fn test_parse_volumes_bind_mount() {
2700 let config = ContainerConfig::from_uri(
2701 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2702 )
2703 .unwrap();
2704 let (binds, anon) = config.parse_volumes().unwrap();
2705 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2706 assert!(anon.is_empty());
2707 }
2708
2709 #[test]
2710 fn test_parse_volumes_named_volume() {
2711 let config =
2712 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2713 .unwrap();
2714 let (binds, anon) = config.parse_volumes().unwrap();
2715 assert_eq!(binds, vec!["data:/var/lib/data"]);
2716 assert!(anon.is_empty());
2717 }
2718
2719 #[test]
2720 fn test_parse_volumes_anonymous() {
2721 let config =
2722 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2723 let (binds, anon) = config.parse_volumes().unwrap();
2724 assert!(binds.is_empty());
2725 assert!(anon.contains(&"/tmp/app-data".to_string()));
2726 }
2727
2728 #[test]
2729 fn test_parse_volumes_anonymous_with_mode() {
2730 let config =
2731 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2732 .unwrap();
2733 let (binds, anon) = config.parse_volumes().unwrap();
2734 assert!(binds.is_empty());
2735 assert!(anon.contains(&"/tmp/app-data".to_string()));
2736 }
2737
2738 #[test]
2739 fn test_parse_volumes_multiple() {
2740 let config = ContainerConfig::from_uri(
2741 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2742 )
2743 .unwrap();
2744 let (binds, anon) = config.parse_volumes().unwrap();
2745 assert_eq!(binds.len(), 2);
2746 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2747 assert!(binds.contains(&"data:/var/log/app".to_string()));
2748 assert!(anon.is_empty());
2749 }
2750
2751 #[test]
2752 fn test_parse_volumes_mixed() {
2753 let config = ContainerConfig::from_uri(
2754 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2755 )
2756 .unwrap();
2757 let (binds, anon) = config.parse_volumes().unwrap();
2758 assert_eq!(binds.len(), 1);
2759 assert!(anon.contains(&"/tmp/cache".to_string()));
2760 }
2761
2762 #[test]
2763 fn test_parse_volumes_none() {
2764 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2765 assert!(config.parse_volumes().is_none());
2766 }
2767
2768 #[test]
2769 fn test_parse_volumes_empty_entry_skipped() {
2770 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2771 assert!(config.parse_volumes().is_none());
2772 }
2773
2774 #[test]
2775 fn test_parse_volumes_rw_mode() {
2776 let config =
2777 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2778 .unwrap();
2779 let (binds, _) = config.parse_volumes().unwrap();
2780 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2781 }
2782
2783 #[test]
2784 fn test_container_config_from_uri_parses_false_flags() {
2785 let config = ContainerConfig::from_uri(
2786 "container:logs?containerId=a&follow=false×tamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2787 )
2788 .unwrap();
2789 assert!(!config.follow);
2790 assert!(!config.timestamps);
2791 assert!(!config.auto_pull);
2792 assert!(!config.auto_remove);
2793 assert!(config.detach);
2794 assert!(config.force);
2795 }
2796
2797 #[test]
2798 fn test_docker_socket_path_validation() {
2799 let unix_cfg =
2800 ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2801 assert_eq!(
2802 unix_cfg.docker_socket_path().unwrap(),
2803 "unix:///tmp/docker.sock"
2804 );
2805
2806 let npipe_cfg =
2807 ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2808 .unwrap();
2809 assert_eq!(
2810 npipe_cfg.docker_socket_path().unwrap(),
2811 "npipe:////./pipe/docker_engine"
2812 );
2813
2814 let plain_cfg =
2815 ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2816 assert_eq!(
2817 plain_cfg.docker_socket_path().unwrap(),
2818 "/var/run/docker.sock"
2819 );
2820
2821 let bad_cfg =
2822 ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2823 assert!(bad_cfg.docker_socket_path().is_err());
2824 }
2825
2826 #[test]
2827 fn test_parse_ports_invalid_and_whitespace_entries() {
2828 let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2830 let err = cfg.parse_ports().unwrap_err();
2831 assert!(
2832 err.to_string().contains("malformed port mapping"),
2833 "expected malformed port error, got: {}",
2834 err
2835 );
2836
2837 let cfg =
2838 ContainerConfig::from_uri("container:run?ports= 8080:80 , 5353:53/udp ").unwrap();
2839 let (exposed, bindings) = cfg.parse_ports().unwrap();
2840 assert!(exposed.contains(&"80/tcp".to_string()));
2841 assert!(exposed.contains(&"53/udp".to_string()));
2842 assert_eq!(bindings.len(), 2);
2843 }
2844
2845 #[test]
2846 fn test_parse_ports_fail_fast_on_malformed() {
2847 let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2849 let err = cfg.parse_ports().unwrap_err();
2850 assert!(
2851 err.to_string()
2852 .contains("malformed port mapping 'badentry'"),
2853 "expected fail-fast on 'badentry', got: {}",
2854 err
2855 );
2856 }
2857
2858 #[test]
2859 fn test_parse_env_trims_and_filters_empty_items() {
2860 let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2861 let env = cfg.parse_env().unwrap();
2862 assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2863
2864 let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2865 assert!(cfg.parse_env().is_none());
2866 }
2867
2868 #[test]
2869 fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2870 assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2871 let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2872 assert!(binds.contains(&"a:/b:rw".to_string()));
2873 assert!(anon.contains(&"/tmp/cache".to_string()));
2874 assert!(anon.contains(&"/tmp/logs".to_string()));
2875 }
2876
2877 #[test]
2878 fn test_format_docker_event_variants_and_timestamp_extraction() {
2879 let mut attrs = std::collections::HashMap::new();
2880 attrs.insert("name".to_string(), "demo".to_string());
2881 attrs.insert("image".to_string(), "alpine:latest".to_string());
2882 attrs.insert("exitCode".to_string(), "137".to_string());
2883 let actor = bollard::models::EventActor {
2884 id: None,
2885 attributes: Some(attrs),
2886 };
2887
2888 let create_event = bollard::models::EventMessage {
2889 action: Some("create".to_string()),
2890 actor: Some(actor.clone()),
2891 ..Default::default()
2892 };
2893 assert_eq!(
2894 format_docker_event(&create_event),
2895 "[CREATE] Container demo (alpine:latest)"
2896 );
2897
2898 let die_event = bollard::models::EventMessage {
2899 action: Some("die".to_string()),
2900 actor: Some(actor),
2901 ..Default::default()
2902 };
2903 assert_eq!(
2904 format_docker_event(&die_event),
2905 "[DIE] Container demo (exit: 137)"
2906 );
2907
2908 let other_event = bollard::models::EventMessage {
2909 action: Some("oom".to_string()),
2910 actor: None,
2911 ..Default::default()
2912 };
2913 assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2914
2915 assert_eq!(
2916 extract_timestamp("2024-01-01T00:00:00Z hello"),
2917 Some("2024-01-01T00:00:00Z".to_string())
2918 );
2919 assert_eq!(extract_timestamp("hello world"), None);
2920 }
2921
2922 #[tokio::test]
2923 async fn test_run_container_with_cleanup_error_paths() {
2924 let create_fail = run_container_with_cleanup(
2925 || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2926 |_id| async move { Ok(()) },
2927 |_id| async move { Ok(()) },
2928 )
2929 .await;
2930 assert!(
2931 matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2932 );
2933
2934 let cleanup_fail = run_container_with_cleanup(
2935 || async { Ok("cid-1".to_string()) },
2936 |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2937 |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2938 )
2939 .await;
2940 match cleanup_fail {
2941 Err(CamelError::ProcessorError(msg)) => {
2942 assert!(msg.contains("Failed to start container"));
2943 assert!(msg.contains("Cleanup failed"));
2944 }
2945 other => panic!("unexpected result: {:?}", other),
2946 }
2947 }
2948
2949 #[tokio::test]
2950 async fn test_container_producer_network_lifecycle() {
2951 let docker = match Docker::connect_with_local_defaults() {
2952 Ok(d) => d,
2953 Err(_) => {
2954 eprintln!("Skipping test: Could not connect to Docker daemon");
2955 return;
2956 }
2957 };
2958 if docker.ping().await.is_err() {
2959 eprintln!("Skipping test: Docker daemon not responding to ping");
2960 return;
2961 }
2962
2963 let network_name = format!("camel-test-{}", std::process::id());
2964
2965 let component = ContainerComponent::new();
2966 let component_ctx = NoOpComponentContext;
2967
2968 let endpoint = component
2970 .create_endpoint(
2971 &format!("container:network-create?name={}", network_name),
2972 &component_ctx,
2973 )
2974 .unwrap();
2975 let ctx = ProducerContext::new();
2976 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2977
2978 let mut exchange = Exchange::new(Message::new(""));
2979 exchange.input.set_header(
2980 HEADER_ACTION,
2981 serde_json::Value::String("network-create".into()),
2982 );
2983
2984 use tower::ServiceExt;
2985 let result = producer
2986 .ready()
2987 .await
2988 .unwrap()
2989 .call(exchange)
2990 .await
2991 .expect("Network create should succeed");
2992
2993 let network_id = result
2994 .input
2995 .header(HEADER_NETWORK)
2996 .and_then(|v| v.as_str().map(|s| s.to_string()))
2997 .expect("Should have network ID");
2998
2999 assert!(!network_id.is_empty());
3000
3001 let endpoint = component
3003 .create_endpoint("container:network-list", &component_ctx)
3004 .unwrap();
3005 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3006
3007 let mut exchange = Exchange::new(Message::new(""));
3008 exchange.input.set_header(
3009 HEADER_ACTION,
3010 serde_json::Value::String("network-list".into()),
3011 );
3012
3013 let list_result = producer
3014 .ready()
3015 .await
3016 .unwrap()
3017 .call(exchange)
3018 .await
3019 .expect("Network list should succeed");
3020
3021 match &list_result.input.body {
3022 Body::Json(json_value) => {
3023 assert!(json_value.is_array(), "Expected JSON array");
3024 }
3025 other => panic!("Expected Body::Json, got: {:?}", other),
3026 }
3027
3028 let endpoint = component
3030 .create_endpoint(
3031 &format!("container:network-remove?network={}", network_name),
3032 &component_ctx,
3033 )
3034 .unwrap();
3035 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3036
3037 let mut exchange = Exchange::new(Message::new(""));
3038 exchange.input.set_header(
3039 HEADER_ACTION,
3040 serde_json::Value::String("network-remove".into()),
3041 );
3042
3043 let remove_result = producer
3044 .ready()
3045 .await
3046 .unwrap()
3047 .call(exchange)
3048 .await
3049 .expect("Network remove should succeed");
3050
3051 let action_result = remove_result
3052 .input
3053 .header(HEADER_ACTION_RESULT)
3054 .and_then(|v| v.as_str());
3055 assert_eq!(action_result, Some("success"));
3056 }
3057
3058 #[tokio::test]
3059 async fn test_logs_consumer_requires_container_id() {
3060 use tokio::sync::mpsc;
3061
3062 let mut consumer = ContainerConsumer {
3063 config: ContainerConfig::from_uri("container:logs").unwrap(),
3064 runtime: test_rt(),
3065 };
3066 let (tx, _rx) = mpsc::channel(4);
3067 let context = ConsumerContext::new(
3068 tx,
3069 tokio_util::sync::CancellationToken::new(),
3070 "container-test-route".to_string(),
3071 );
3072
3073 let err = consumer.start(context).await.unwrap_err();
3074 match err {
3075 CamelError::EndpointCreationFailed(msg) => {
3076 assert!(msg.contains("containerId is required for logs consumer"));
3077 }
3078 other => panic!("unexpected error: {:?}", other),
3079 }
3080 }
3081
3082 #[tokio::test]
3083 async fn test_events_consumer_stops_immediately_when_cancelled() {
3084 use tokio::sync::mpsc;
3085
3086 let mut consumer = ContainerConsumer {
3087 config: ContainerConfig::from_uri("container:events").unwrap(),
3088 runtime: test_rt(),
3089 };
3090
3091 let (tx, _rx) = mpsc::channel(4);
3092 let cancel = tokio_util::sync::CancellationToken::new();
3093 cancel.cancel();
3094 let context = ConsumerContext::new(tx, cancel, "container-test-route".to_string());
3095
3096 let result = consumer.start(context).await;
3097 assert!(result.is_ok());
3098 }
3099
3100 #[test]
3101 fn test_global_config_constructors_and_endpoint_docker_host() {
3102 let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3103 let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3104 cfg.apply_global_defaults(&global);
3105
3106 let endpoint = ContainerEndpoint {
3107 uri: "container:list".to_string(),
3108 config: cfg,
3109 };
3110
3111 assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3112
3113 let component = ContainerComponent::with_config(global);
3114 assert_eq!(component.scheme(), "container");
3115 }
3116
3117 #[test]
3118 fn container_global_config_has_reconnect_policy() {
3119 let cfg = ContainerGlobalConfig::default();
3120 assert_eq!(cfg.reconnect.max_attempts, 0); assert!(cfg.reconnect.enabled);
3122 }
3123
3124 #[tokio::test]
3129 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
3130 use std::sync::Arc;
3131 use std::sync::atomic::{AtomicU32, Ordering};
3132 use std::time::Duration;
3133
3134 let policy = NetworkRetryPolicy {
3135 max_attempts: 3,
3136 initial_delay: Duration::from_millis(1),
3137 max_delay: Duration::from_millis(1),
3138 multiplier: 1.0,
3139 ..NetworkRetryPolicy::default()
3140 };
3141
3142 let calls = Arc::new(AtomicU32::new(0));
3143 let calls_clone = Arc::clone(&calls);
3144 let mut attempt: u32 = 0;
3145
3146 loop {
3147 calls_clone.fetch_add(1, Ordering::SeqCst);
3148 let result: Result<(), ()> = Err(());
3149 match result {
3150 Ok(_) => {
3151 break;
3152 }
3153 Err(_) => {
3154 attempt += 1;
3155 if !policy.should_retry(attempt) {
3156 break;
3157 }
3158 let delay = policy.delay_for(attempt - 1);
3159 tokio::time::sleep(delay).await;
3160 continue;
3161 }
3162 }
3163 }
3164
3165 assert_eq!(
3166 calls.load(Ordering::SeqCst),
3167 3,
3168 "max_attempts=3 must yield exactly 3 invocations"
3169 );
3170 }
3171
3172 #[tokio::test]
3181 async fn events_connect_error_increments_metrics() {
3182 use std::sync::Mutex;
3183 use std::time::Duration;
3184
3185 struct RecordingMetrics(Mutex<Vec<(String, String)>>);
3186
3187 impl MetricsCollector for RecordingMetrics {
3188 fn record_exchange_duration(&self, _: &str, _: Duration) {}
3189 fn increment_errors(&self, route_id: &str, error_type: &str) {
3190 self.0
3191 .lock()
3192 .unwrap()
3193 .push((route_id.to_string(), error_type.to_string()));
3194 }
3195 fn increment_exchanges(&self, _: &str) {}
3196 fn set_queue_depth(&self, _: &str, _: usize) {}
3197 fn record_circuit_breaker_change(&self, _: &str, _: &str, _: &str) {}
3198 }
3199
3200 struct RecordingRuntime {
3201 metrics: Arc<RecordingMetrics>,
3202 }
3203
3204 impl HealthCheckRegistry for RecordingRuntime {
3205 fn force_unhealthy_for_route(&self, _: &str, _: &str, _: &str) {}
3206 }
3207
3208 impl RuntimeObservability for RecordingRuntime {
3209 fn metrics(&self) -> Arc<dyn MetricsCollector> {
3210 self.metrics.clone()
3211 }
3212 fn health(&self) -> Arc<dyn HealthCheckRegistry> {
3213 Arc::new(camel_component_api::NoOpHealthCheckRegistry)
3214 }
3215 }
3216
3217 let recording = Arc::new(RecordingMetrics(Mutex::new(Vec::new())));
3218 let rt: Arc<dyn RuntimeObservability> = Arc::new(RecordingRuntime {
3219 metrics: recording.clone(),
3220 });
3221
3222 let mut config = ContainerConfig::from_uri("container:events").unwrap();
3226 config.host = Some("tcp://192.0.2.1:2375".to_string());
3227 config.reconnect = NetworkRetryPolicy::disabled();
3228
3229 let mut consumer = ContainerConsumer {
3230 config,
3231 runtime: rt,
3232 };
3233
3234 let (tx, _rx) = tokio::sync::mpsc::channel(4);
3235 let cancel = tokio_util::sync::CancellationToken::new();
3236 let context = ConsumerContext::new(tx, cancel, "events-test-route".to_string());
3237
3238 let result = consumer.start(context).await;
3239 assert!(result.is_err(), "expected Docker connection error");
3240
3241 let errors = recording.0.lock().unwrap();
3242 assert_eq!(
3243 errors.len(),
3244 1,
3245 "expected exactly one increment_errors call"
3246 );
3247 assert_eq!(errors[0].0, "events-test-route");
3248 assert_eq!(errors[0].1, "e:container:events-connect");
3249 }
3250}