1pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21 ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24 CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25 ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26 StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::NetworkRetryPolicy;
30use camel_component_api::parse_uri;
31use camel_component_api::retry_async_cancelable;
32use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
33use camel_component_api::{
34 Component, Consumer, ConsumerContext, Endpoint, ProducerContext, RuntimeObservability,
35};
36use tower::Service;
37
38static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
41 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
42
43fn track_container(id: String) {
45 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
46 tracker.insert(id);
47 }
48}
49
50fn untrack_container(id: &str) {
52 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
53 tracker.remove(id);
54 }
55}
56
57pub async fn cleanup_tracked_containers() {
59 let ids: Vec<String> = {
60 match CONTAINER_TRACKER.lock() {
61 Ok(tracker) => tracker.iter().cloned().collect(),
62 Err(_) => return,
63 }
64 };
65
66 if ids.is_empty() {
67 return;
68 }
69
70 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
71
72 let docker = match Docker::connect_with_local_defaults() {
73 Ok(d) => d,
74 Err(e) => {
75 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
77 return;
78 }
79 };
80
81 for id in ids {
82 match docker
83 .remove_container(
84 &id,
85 Some(RemoveContainerOptions {
86 force: true,
87 ..Default::default()
88 }),
89 )
90 .await
91 {
92 Ok(_) => {
93 tracing::debug!("Cleaned up container {}", id);
94 untrack_container(&id);
95 }
96 Err(e) => {
97 tracing::warn!("Failed to cleanup container {}: {}", id, e);
98 }
99 }
100 }
101}
102
103const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
107
108pub const HEADER_ACTION: &str = "CamelContainerAction";
110
111pub const HEADER_IMAGE: &str = "CamelContainerImage";
113
114pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
116
117pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
119
120pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
122
123pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
125
126pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
128
129pub const HEADER_CMD: &str = "CamelContainerCmd";
131
132pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
134
135pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
137
138pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
140
141pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
143
144fn container_reconnect_default() -> NetworkRetryPolicy {
152 NetworkRetryPolicy {
153 enabled: true,
154 max_attempts: 0, initial_delay: std::time::Duration::from_secs(5),
156 multiplier: 1.0, max_delay: std::time::Duration::from_secs(5),
158 jitter_factor: 0.0,
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
166#[serde(default)]
167pub struct ContainerGlobalConfig {
168 pub docker_host: String,
170 #[serde(default = "container_reconnect_default")]
172 pub reconnect: NetworkRetryPolicy,
173}
174
175impl Default for ContainerGlobalConfig {
176 fn default() -> Self {
177 Self {
178 docker_host: "unix:///var/run/docker.sock".to_string(),
179 reconnect: container_reconnect_default(),
180 }
181 }
182}
183
184impl ContainerGlobalConfig {
185 pub fn new() -> Self {
186 Self::default()
187 }
188
189 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
190 self.docker_host = v.into();
191 self
192 }
193}
194
195#[derive(Debug, Clone)]
204pub struct ContainerConfig {
205 pub operation: String,
207 pub image: Option<String>,
209 pub name: Option<String>,
211 pub host: Option<String>,
213 pub cmd: Option<String>,
215 pub ports: Option<String>,
217 pub env: Option<String>,
219 pub network: Option<String>,
221 pub container_id: Option<String>,
223 pub follow: bool,
225 pub timestamps: bool,
227 pub tail: Option<String>,
229 pub auto_pull: bool,
231 pub auto_remove: bool,
233 pub volumes: Option<String>,
235 pub user: Option<String>,
237 pub workdir: Option<String>,
239 pub detach: bool,
241 pub driver: Option<String>,
243 pub force: bool,
245 pub reconnect: NetworkRetryPolicy,
247}
248
249impl ContainerConfig {
250 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
258 let parts = parse_uri(uri)?;
259 if parts.scheme != "container" {
260 return Err(CamelError::InvalidUri(format!(
261 "expected scheme 'container', got '{}'",
262 parts.scheme
263 )));
264 }
265
266 let image = parts.params.get("image").cloned();
267 let name = parts.params.get("name").cloned();
268 let cmd = parts.params.get("cmd").cloned();
269 let ports = parts.params.get("ports").cloned();
270 let env = parts.params.get("env").cloned();
271 let network = parts.params.get("network").cloned();
272 let container_id = parts.params.get("containerId").cloned();
273 let follow = parts
274 .params
275 .get("follow")
276 .map(|v| v.eq_ignore_ascii_case("true"))
277 .unwrap_or(true);
278 let timestamps = parts
279 .params
280 .get("timestamps")
281 .map(|v| v.eq_ignore_ascii_case("true"))
282 .unwrap_or(false);
283 let tail = parts.params.get("tail").cloned();
284 let auto_pull = parts
285 .params
286 .get("autoPull")
287 .map(|v| v.eq_ignore_ascii_case("true"))
288 .unwrap_or(true);
289 let auto_remove = parts
290 .params
291 .get("autoRemove")
292 .map(|v| v.eq_ignore_ascii_case("true"))
293 .unwrap_or(true);
294 let host = parts.params.get("host").cloned();
296 let volumes = parts.params.get("volumes").cloned();
297 let user = parts.params.get("user").cloned();
298 let workdir = parts.params.get("workdir").cloned();
299 let detach = parts
300 .params
301 .get("detach")
302 .map(|v| v.eq_ignore_ascii_case("true"))
303 .unwrap_or(false);
304 let driver = parts.params.get("driver").cloned();
305 let force = parts
306 .params
307 .get("force")
308 .map(|v| v.eq_ignore_ascii_case("true"))
309 .unwrap_or(false);
310
311 Ok(Self {
312 operation: parts.path,
313 image,
314 name,
315 host,
316 cmd,
317 ports,
318 env,
319 network,
320 container_id,
321 follow,
322 timestamps,
323 tail,
324 auto_pull,
325 auto_remove,
326 volumes,
327 user,
328 workdir,
329 detach,
330 driver,
331 force,
332 reconnect: NetworkRetryPolicy::default(),
333 })
334 }
335
336 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
339 if self.host.is_none() {
340 self.host = Some(global.docker_host.clone());
341 }
342 self.reconnect = global.reconnect.clone();
343 }
344
345 fn docker_socket_path(&self) -> Result<&str, CamelError> {
346 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
347 "npipe:////./pipe/docker_engine"
348 } else {
349 "unix:///var/run/docker.sock"
350 });
351
352 if host.starts_with("unix://") || host.starts_with("npipe://") {
353 return Ok(host);
354 }
355
356 if host.contains("://") {
357 return Err(CamelError::ProcessorError(format!(
358 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
359 host
360 )));
361 }
362
363 Ok(host)
364 }
365
366 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
367 let socket_path = self.docker_socket_path()?;
368 Docker::connect_with_socket(
369 socket_path,
370 DOCKER_CONNECT_TIMEOUT_SECS,
371 bollard::API_DEFAULT_VERSION,
372 )
373 .map_err(|e| {
374 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
375 })
376 }
377
378 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
386 let docker = self.connect_docker_client()?;
387 docker
388 .ping()
389 .await
390 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
391 Ok(docker)
392 }
393
394 #[allow(clippy::type_complexity)]
395 fn parse_ports(
396 &self,
397 ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
398 let ports_str = match self.ports.as_ref() {
399 Some(s) => s,
400 None => return Ok((Vec::new(), HashMap::new())),
401 };
402
403 let mut exposed_ports: Vec<String> = Vec::new();
404 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
405
406 for mapping in ports_str.split(',') {
407 let mapping = mapping.trim();
408 if mapping.is_empty() {
409 continue;
410 }
411
412 let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
413 CamelError::ProcessorError(format!(
414 "malformed port mapping '{}': expected hostPort:containerPort",
415 mapping
416 ))
417 })?;
418
419 let (container_port, protocol) = if container_spec.contains('/') {
420 let parts: Vec<&str> = container_spec.split('/').collect();
421 (parts[0], parts[1])
422 } else {
423 (container_spec, "tcp")
424 };
425
426 let container_key = format!("{}/{}", container_port, protocol);
427
428 exposed_ports.push(container_key.clone());
429
430 port_bindings.insert(
431 container_key,
432 Some(vec![PortBinding {
433 host_ip: None,
434 host_port: Some(host_port.to_string()),
435 }]),
436 );
437 }
438
439 Ok((exposed_ports, port_bindings))
440 }
441
442 fn parse_env(&self) -> Option<Vec<String>> {
443 let env_str = self.env.as_ref()?;
444
445 let env_vars: Vec<String> = env_str
446 .split(',')
447 .map(|s| s.trim().to_string())
448 .filter(|s| !s.is_empty())
449 .collect();
450
451 if env_vars.is_empty() {
452 None
453 } else {
454 Some(env_vars)
455 }
456 }
457
458 #[cfg(test)]
459 #[allow(clippy::type_complexity)]
460 fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
461 self.volumes.as_deref().and_then(parse_volume_str)
462 }
463}
464
465#[allow(clippy::type_complexity)]
470fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
471 let mut binds: Vec<String> = Vec::new();
472 let mut anonymous_volumes: Vec<String> = Vec::new();
473
474 for entry in volumes_str.split(',') {
475 let entry = entry.trim();
476 if entry.is_empty() {
477 continue;
478 }
479
480 let segments: Vec<&str> = entry.split(':').collect();
481
482 match segments.len() {
483 3 => {
484 let source = segments[0];
485 let target = segments[1];
486 let mode = segments[2];
487 if mode != "ro" && mode != "rw" {
488 continue;
489 }
490 binds.push(format!("{}:{}:{}", source, target, mode));
491 }
492 2 => {
493 let a = segments[0];
494 let b = segments[1];
495 if b == "ro" || b == "rw" {
496 anonymous_volumes.push(a.to_string());
497 } else {
498 binds.push(format!("{}:{}", a, b));
499 }
500 }
501 1 => {
502 anonymous_volumes.push(segments[0].to_string());
503 }
504 _ => continue,
505 }
506 }
507
508 if binds.is_empty() && anonymous_volumes.is_empty() {
509 None
510 } else {
511 Some((binds, anonymous_volumes))
512 }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq)]
516enum ProducerOperation {
517 List,
518 Run,
519 Start,
520 Stop,
521 Remove,
522 Exec,
523 NetworkCreate,
524 NetworkConnect,
525 NetworkDisconnect,
526 NetworkRemove,
527 NetworkList,
528}
529
530fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
531 match operation {
532 "list" => Ok(ProducerOperation::List),
533 "run" => Ok(ProducerOperation::Run),
534 "start" => Ok(ProducerOperation::Start),
535 "stop" => Ok(ProducerOperation::Stop),
536 "remove" => Ok(ProducerOperation::Remove),
537 "exec" => Ok(ProducerOperation::Exec),
538 "network-create" => Ok(ProducerOperation::NetworkCreate),
539 "network-connect" => Ok(ProducerOperation::NetworkConnect),
540 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
541 "network-remove" => Ok(ProducerOperation::NetworkRemove),
542 "network-list" => Ok(ProducerOperation::NetworkList),
543 _ => Err(CamelError::ProcessorError(format!(
544 "Unknown container operation: {}",
545 operation
546 ))),
547 }
548}
549
550fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
551 exchange
552 .input
553 .header(HEADER_CONTAINER_NAME)
554 .and_then(|v| v.as_str().map(|s| s.to_string()))
555 .or_else(|| config.name.clone())
556}
557
558async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
559 let images = docker
560 .list_images(None::<ListImagesOptions>)
561 .await
562 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
563
564 Ok(images.iter().any(|img| {
565 img.repo_tags
566 .iter()
567 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
568 }))
569}
570
571async fn pull_image_with_progress(
572 docker: &Docker,
573 image: &str,
574 timeout_secs: u64,
575) -> Result<(), CamelError> {
576 use futures::StreamExt;
577
578 tracing::info!("Pulling image: {}", image);
579
580 let mut stream = docker.create_image(
581 Some(CreateImageOptions {
582 from_image: Some(image.to_string()),
583 ..Default::default()
584 }),
585 None,
586 None,
587 );
588
589 let start = std::time::Instant::now();
590 let mut last_progress = std::time::Instant::now();
591
592 while let Some(item) = stream.next().await {
593 if start.elapsed().as_secs() > timeout_secs {
594 return Err(CamelError::ProcessorError(format!(
595 "Image pull timeout after {}s. Try manually: docker pull {}",
596 timeout_secs, image
597 )));
598 }
599
600 match item {
601 Ok(update) => {
602 if last_progress.elapsed().as_secs() >= 2 {
604 if let Some(status) = update.status {
605 tracing::debug!("Pull progress: {}", status);
606 }
607 last_progress = std::time::Instant::now();
608 }
609 }
610 Err(e) => {
611 let err_str = e.to_string().to_lowercase();
612 if err_str.contains("unauthorized") || err_str.contains("401") {
613 return Err(CamelError::ProcessorError(format!(
614 "Authentication required for image '{}'. Configure Docker credentials: docker login",
615 image
616 )));
617 }
618 if err_str.contains("not found") || err_str.contains("404") {
619 return Err(CamelError::ProcessorError(format!(
620 "Image '{}' not found in registry. Check the image name and tag",
621 image
622 )));
623 }
624 return Err(CamelError::ProcessorError(format!(
625 "Failed to pull image '{}': {}",
626 image, e
627 )));
628 }
629 }
630 }
631
632 tracing::info!("Successfully pulled image: {}", image);
633 Ok(())
634}
635
636async fn ensure_image_available(
637 docker: &Docker,
638 image: &str,
639 auto_pull: bool,
640 timeout_secs: u64,
641) -> Result<(), CamelError> {
642 if image_exists_locally(docker, image).await? {
643 tracing::debug!("Image '{}' already available locally", image);
644 return Ok(());
645 }
646
647 if !auto_pull {
648 return Err(CamelError::ProcessorError(format!(
649 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
650 image, image
651 )));
652 }
653
654 pull_image_with_progress(docker, image, timeout_secs).await
655}
656
657fn format_docker_event(event: &bollard::models::EventMessage) -> String {
658 let action = event.action.as_deref().unwrap_or("unknown");
659 let actor = event.actor.as_ref();
660
661 let container_name = actor
662 .and_then(|a| a.attributes.as_ref())
663 .and_then(|attrs| attrs.get("name"))
664 .map(|s| s.as_str())
665 .unwrap_or("unknown");
666
667 let image = actor
668 .and_then(|a| a.attributes.as_ref())
669 .and_then(|attrs| attrs.get("image"))
670 .map(|s| s.as_str())
671 .unwrap_or("");
672
673 let exit_code = actor
674 .and_then(|a| a.attributes.as_ref())
675 .and_then(|attrs| attrs.get("exitCode"))
676 .map(|s| s.as_str());
677
678 match action {
679 "create" => {
680 if image.is_empty() {
681 format!("[CREATE] Container {}", container_name)
682 } else {
683 format!("[CREATE] Container {} ({})", container_name, image)
684 }
685 }
686 "start" => format!("[START] Container {}", container_name),
687 "die" => {
688 if let Some(code) = exit_code {
689 format!("[DIE] Container {} (exit: {})", container_name, code)
690 } else {
691 format!("[DIE] Container {}", container_name)
692 }
693 }
694 "destroy" => format!("[DESTROY] Container {}", container_name),
695 "stop" => format!("[STOP] Container {}", container_name),
696 "pause" => format!("[PAUSE] Container {}", container_name),
697 "unpause" => format!("[UNPAUSE] Container {}", container_name),
698 "restart" => format!("[RESTART] Container {}", container_name),
699 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
700 }
701}
702
703async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
704 create: CreateFn,
705 start: StartFn,
706 remove: RemoveFn,
707) -> Result<String, CamelError>
708where
709 CreateFn: FnOnce() -> CreateFut,
710 CreateFut: Future<Output = Result<String, CamelError>>,
711 StartFn: FnOnce(String) -> StartFut,
712 StartFut: Future<Output = Result<(), CamelError>>,
713 RemoveFn: FnOnce(String) -> RemoveFut,
714 RemoveFut: Future<Output = Result<(), CamelError>>,
715{
716 let container_id = create().await?;
717 if let Err(start_err) = start(container_id.clone()).await {
718 if let Err(remove_err) = remove(container_id.clone()).await {
719 return Err(CamelError::ProcessorError(format!(
720 "Failed to start container: {}. Cleanup failed: {}",
721 start_err, remove_err
722 )));
723 }
724 return Err(start_err);
725 }
726
727 Ok(container_id)
728}
729
730async fn handle_list(
731 docker: Docker,
732 _config: ContainerConfig,
733 exchange: &mut Exchange,
734) -> Result<(), CamelError> {
735 let containers = docker
736 .list_containers(None::<ListContainersOptions>)
737 .await
738 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
739
740 let json_value = serde_json::to_value(&containers).map_err(|e| {
741 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
742 })?;
743
744 exchange.input.body = Body::Json(json_value);
745 exchange.input.set_header(
746 HEADER_ACTION_RESULT,
747 serde_json::Value::String("success".to_string()),
748 );
749 Ok(())
750}
751
752async fn handle_run(
753 docker: Docker,
754 config: ContainerConfig,
755 exchange: &mut Exchange,
756) -> Result<(), CamelError> {
757 let image = exchange
758 .input
759 .header(HEADER_IMAGE)
760 .and_then(|v| v.as_str().map(|s| s.to_string()))
761 .or(config.image.clone())
762 .ok_or_else(|| {
763 CamelError::ProcessorError(
764 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
765 )
766 })?;
767
768 let image = if !image.contains(':') && !image.contains('@') {
769 format!("{}:latest", image)
770 } else {
771 image
772 };
773
774 let pull_timeout = 300;
775 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
776 .await
777 .map_err(|e| {
778 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
779 })?;
780
781 let container_name = resolve_container_name(exchange, &config);
782 let container_name_ref = container_name.as_deref().unwrap_or("");
783 let cmd_parts: Option<Vec<String>> = config
784 .cmd
785 .as_ref()
786 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
787 let auto_remove = config.auto_remove;
788 let (exposed_ports, port_bindings) = config.parse_ports()?;
789 let env_vars = config.parse_env();
790 let network_mode = config.network.clone();
791
792 let volumes_str = exchange
793 .input
794 .header(HEADER_VOLUMES)
795 .and_then(|v| v.as_str().map(|s| s.to_string()))
796 .or(config.volumes.clone());
797 let (binds, anon_volumes) = volumes_str
798 .as_deref()
799 .and_then(parse_volume_str)
800 .unwrap_or_default();
801
802 let docker_create = docker.clone();
803 let docker_start = docker.clone();
804 let docker_remove = docker.clone();
805
806 let container_id = run_container_with_cleanup(
807 move || async move {
808 let create_options = CreateContainerOptions {
809 name: Some(container_name_ref.to_string()),
810 ..Default::default()
811 };
812 let container_config = ContainerCreateBody {
813 image: Some(image.clone()),
814 cmd: cmd_parts,
815 env: env_vars,
816 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
817 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
818 host_config: Some(HostConfig {
819 auto_remove: Some(auto_remove),
820 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
821 network_mode,
822 binds: if binds.is_empty() { None } else { Some(binds) },
823 ..Default::default()
824 }),
825 ..Default::default()
826 };
827
828 let create_response = docker_create
829 .create_container(Some(create_options), container_config)
830 .await
831 .map_err(|e| {
832 let err_str = e.to_string().to_lowercase();
833 if err_str.contains("409") || err_str.contains("conflict") {
834 CamelError::ProcessorError(format!(
835 "Container name '{}' already exists. Use a unique name or remove the existing container first",
836 container_name_ref
837 ))
838 } else {
839 CamelError::ProcessorError(format!(
840 "Failed to create container: {}",
841 e
842 ))
843 }
844 })?;
845
846 Ok(create_response.id)
847 },
848 move |container_id| async move {
849 docker_start
850 .start_container(&container_id, None::<StartContainerOptions>)
851 .await
852 .map_err(|e| {
853 CamelError::ProcessorError(format!(
854 "Failed to start container: {}",
855 e
856 ))
857 })
858 },
859 move |container_id| async move {
860 docker_remove
861 .remove_container(&container_id, None)
862 .await
863 .map_err(|e| {
864 CamelError::ProcessorError(format!(
865 "Failed to remove container after start failure: {}",
866 e
867 ))
868 })
869 },
870 )
871 .await?;
872
873 track_container(container_id.clone());
874
875 exchange
876 .input
877 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
878 exchange.input.set_header(
879 HEADER_ACTION_RESULT,
880 serde_json::Value::String("success".to_string()),
881 );
882 Ok(())
883}
884
885async fn handle_lifecycle(
886 docker: Docker,
887 _config: ContainerConfig,
888 exchange: &mut Exchange,
889 operation: ProducerOperation,
890 operation_name: &str,
891) -> Result<(), CamelError> {
892 let container_id = exchange
893 .input
894 .header(HEADER_CONTAINER_ID)
895 .and_then(|v| v.as_str().map(|s| s.to_string()))
896 .ok_or_else(|| {
897 CamelError::ProcessorError(format!(
898 "{} header is required for {} operation",
899 HEADER_CONTAINER_ID, operation_name
900 ))
901 })?;
902
903 match operation {
904 ProducerOperation::Start => {
905 docker
906 .start_container(&container_id, None::<StartContainerOptions>)
907 .await
908 .map_err(|e| {
909 CamelError::ProcessorError(format!("Failed to start container: {}", e))
910 })?;
911 }
912 ProducerOperation::Stop => {
913 docker
914 .stop_container(&container_id, None)
915 .await
916 .map_err(|e| {
917 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
918 })?;
919 }
920 ProducerOperation::Remove => {
921 docker
922 .remove_container(&container_id, None)
923 .await
924 .map_err(|e| {
925 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
926 })?;
927 untrack_container(&container_id);
928 }
929 _ => {}
930 }
931
932 exchange.input.set_header(
933 HEADER_ACTION_RESULT,
934 serde_json::Value::String("success".to_string()),
935 );
936 Ok(())
937}
938
939async fn handle_exec(
940 docker: Docker,
941 config: ContainerConfig,
942 exchange: &mut Exchange,
943) -> Result<(), CamelError> {
944 let container_id = exchange
945 .input
946 .header(HEADER_CONTAINER_ID)
947 .and_then(|v| v.as_str().map(|s| s.to_string()))
948 .or(config.container_id.clone())
949 .ok_or_else(|| {
950 CamelError::ProcessorError(format!(
951 "{} header or containerId param is required for exec operation",
952 HEADER_CONTAINER_ID
953 ))
954 })?;
955
956 let cmd = exchange
957 .input
958 .header(HEADER_CMD)
959 .and_then(|v| v.as_str().map(|s| s.to_string()))
960 .or(config.cmd.clone())
961 .ok_or_else(|| {
962 CamelError::ProcessorError(
963 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
964 )
965 })?;
966
967 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
968 let env_vars = config.parse_env();
969
970 let exec_config = bollard::exec::CreateExecOptions {
971 cmd: Some(cmd_parts),
972 env: env_vars,
973 user: config.user.clone(),
974 working_dir: config.workdir.clone(),
975 attach_stdout: Some(true),
976 attach_stderr: Some(true),
977 ..Default::default()
978 };
979
980 let create_result = docker
981 .create_exec(&container_id, exec_config)
982 .await
983 .map_err(|e| {
984 let err_str = e.to_string().to_lowercase();
985 if err_str.contains("404") || err_str.contains("no such") {
986 CamelError::ProcessorError(format!(
987 "Container '{}' not found for exec",
988 container_id
989 ))
990 } else {
991 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
992 }
993 })?;
994
995 let exec_id = create_result.id;
996
997 if config.detach {
998 docker
999 .start_exec(
1000 &exec_id,
1001 Some(bollard::exec::StartExecOptions {
1002 detach: true,
1003 ..Default::default()
1004 }),
1005 )
1006 .await
1007 .map_err(|e| {
1008 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
1009 })?;
1010
1011 exchange
1012 .input
1013 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
1014 exchange
1015 .input
1016 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1017 } else {
1018 let start_result = docker
1019 .start_exec(&exec_id, None)
1020 .await
1021 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
1022
1023 let mut output = String::new();
1024
1025 match start_result {
1026 bollard::exec::StartExecResults::Attached {
1027 output: mut stream, ..
1028 } => {
1029 use futures::StreamExt;
1030 while let Some(msg) = stream.next().await {
1031 match msg {
1032 Ok(bollard::container::LogOutput::StdOut { message }) => {
1033 output.push_str(&String::from_utf8_lossy(&message));
1034 }
1035 Ok(bollard::container::LogOutput::StdErr { message }) => {
1036 output.push_str(&String::from_utf8_lossy(&message));
1037 }
1038 Ok(_) => {}
1039 Err(e) => {
1040 output.push_str(&format!("[error reading stream: {}]", e));
1041 }
1042 }
1043 }
1044 }
1045 bollard::exec::StartExecResults::Detached => {}
1046 }
1047
1048 let inspect = docker
1049 .inspect_exec(&exec_id)
1050 .await
1051 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1052
1053 let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1054 CamelError::ProcessorError("container exec returned no exit code".into())
1055 })?;
1056
1057 let output = output.trim_end().to_string();
1058 exchange.input.body = Body::Text(output);
1059 exchange.input.set_header(
1060 HEADER_EXIT_CODE,
1061 serde_json::Value::Number(exit_code.into()),
1062 );
1063 exchange
1064 .input
1065 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1066 }
1067
1068 exchange.input.set_header(
1069 HEADER_ACTION_RESULT,
1070 serde_json::Value::String("success".to_string()),
1071 );
1072 Ok(())
1073}
1074
1075async fn handle_network_create(
1076 docker: Docker,
1077 config: ContainerConfig,
1078 exchange: &mut Exchange,
1079) -> Result<(), CamelError> {
1080 let network_name = exchange
1081 .input
1082 .header(HEADER_CONTAINER_NAME)
1083 .and_then(|v| v.as_str().map(|s| s.to_string()))
1084 .or(config.name.clone())
1085 .ok_or_else(|| {
1086 CamelError::ProcessorError(
1087 "CamelContainerName header or name param is required for network-create"
1088 .to_string(),
1089 )
1090 })?;
1091
1092 let driver = config.driver.as_deref().unwrap_or("bridge");
1093
1094 let options = NetworkCreateRequest {
1095 name: network_name.clone(),
1096 driver: Some(driver.to_string()),
1097 ..Default::default()
1098 };
1099
1100 let result = docker.create_network(options).await.map_err(|e| {
1101 let err_str = e.to_string().to_lowercase();
1102 if err_str.contains("409") || err_str.contains("already exists") {
1103 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1104 } else {
1105 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1106 }
1107 })?;
1108
1109 let network_id = result.id.clone();
1110 let json_value = serde_json::to_value(&result).map_err(|e| {
1111 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1112 })?;
1113
1114 exchange.input.body = Body::Json(json_value);
1115 exchange
1116 .input
1117 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1118 exchange.input.set_header(
1119 HEADER_ACTION_RESULT,
1120 serde_json::Value::String("success".to_string()),
1121 );
1122 Ok(())
1123}
1124
1125async fn handle_network_connect(
1126 docker: Docker,
1127 config: ContainerConfig,
1128 exchange: &mut Exchange,
1129) -> Result<(), CamelError> {
1130 let network = exchange
1131 .input
1132 .header(HEADER_NETWORK)
1133 .and_then(|v| v.as_str().map(|s| s.to_string()))
1134 .or(config.network.clone())
1135 .ok_or_else(|| {
1136 CamelError::ProcessorError(
1137 "CamelContainerNetwork header or network param is required for network-connect"
1138 .to_string(),
1139 )
1140 })?;
1141
1142 let container = exchange
1143 .input
1144 .header(HEADER_CONTAINER_ID)
1145 .and_then(|v| v.as_str().map(|s| s.to_string()))
1146 .or(config.container_id.clone())
1147 .ok_or_else(|| {
1148 CamelError::ProcessorError(
1149 "CamelContainerId header or container param is required for network-connect"
1150 .to_string(),
1151 )
1152 })?;
1153
1154 docker
1155 .connect_network(
1156 &network,
1157 NetworkConnectRequest {
1158 container,
1159 ..Default::default()
1160 },
1161 )
1162 .await
1163 .map_err(|e| {
1164 let err_str = e.to_string().to_lowercase();
1165 if err_str.contains("404") || err_str.contains("not found") {
1166 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1167 } else {
1168 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1169 }
1170 })?;
1171
1172 exchange.input.set_header(
1173 HEADER_ACTION_RESULT,
1174 serde_json::Value::String("success".to_string()),
1175 );
1176 Ok(())
1177}
1178
1179async fn handle_network_disconnect(
1180 docker: Docker,
1181 config: ContainerConfig,
1182 exchange: &mut Exchange,
1183) -> Result<(), CamelError> {
1184 let network = exchange
1185 .input
1186 .header(HEADER_NETWORK)
1187 .and_then(|v| v.as_str().map(|s| s.to_string()))
1188 .or(config.network.clone())
1189 .ok_or_else(|| {
1190 CamelError::ProcessorError(
1191 "CamelContainerNetwork header or network param is required for network-disconnect"
1192 .to_string(),
1193 )
1194 })?;
1195
1196 let container = exchange
1197 .input
1198 .header(HEADER_CONTAINER_ID)
1199 .and_then(|v| v.as_str().map(|s| s.to_string()))
1200 .or(config.container_id.clone())
1201 .ok_or_else(|| {
1202 CamelError::ProcessorError(
1203 "CamelContainerId header or container param is required for network-disconnect"
1204 .to_string(),
1205 )
1206 })?;
1207
1208 docker
1209 .disconnect_network(
1210 &network,
1211 NetworkDisconnectRequest {
1212 container,
1213 force: Some(config.force),
1214 },
1215 )
1216 .await
1217 .map_err(|e| {
1218 let err_str = e.to_string().to_lowercase();
1219 if err_str.contains("404") || err_str.contains("not found") {
1220 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1221 } else {
1222 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1223 }
1224 })?;
1225
1226 exchange.input.set_header(
1227 HEADER_ACTION_RESULT,
1228 serde_json::Value::String("success".to_string()),
1229 );
1230 Ok(())
1231}
1232
1233async fn handle_network_remove(
1234 docker: Docker,
1235 config: ContainerConfig,
1236 exchange: &mut Exchange,
1237) -> Result<(), CamelError> {
1238 let network = exchange
1239 .input
1240 .header(HEADER_NETWORK)
1241 .and_then(|v| v.as_str().map(|s| s.to_string()))
1242 .or(config.network.clone())
1243 .ok_or_else(|| {
1244 CamelError::ProcessorError(
1245 "CamelContainerNetwork header or network param is required for network-remove"
1246 .to_string(),
1247 )
1248 })?;
1249
1250 docker.remove_network(&network).await.map_err(|e| {
1251 let err_str = e.to_string().to_lowercase();
1252 if err_str.contains("404") || err_str.contains("not found") {
1253 CamelError::ProcessorError(format!("Network '{}' not found", network))
1254 } else if err_str.contains("409") || err_str.contains("in use") {
1255 CamelError::ProcessorError(format!(
1256 "Network '{}' is in use and cannot be removed",
1257 network
1258 ))
1259 } else {
1260 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1261 }
1262 })?;
1263
1264 exchange.input.set_header(
1265 HEADER_ACTION_RESULT,
1266 serde_json::Value::String("success".to_string()),
1267 );
1268 Ok(())
1269}
1270
1271async fn handle_network_list(
1272 docker: Docker,
1273 _config: ContainerConfig,
1274 exchange: &mut Exchange,
1275) -> Result<(), CamelError> {
1276 let networks = docker
1277 .list_networks(None::<ListNetworksOptions>)
1278 .await
1279 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1280
1281 let json_value = serde_json::to_value(&networks)
1282 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1283
1284 exchange.input.body = Body::Json(json_value);
1285 exchange.input.set_header(
1286 HEADER_ACTION_RESULT,
1287 serde_json::Value::String("success".to_string()),
1288 );
1289 Ok(())
1290}
1291
1292#[derive(Clone)]
1297pub struct ContainerProducer {
1298 config: ContainerConfig,
1299 docker: Docker,
1300}
1301
1302impl Service<Exchange> for ContainerProducer {
1303 type Response = Exchange;
1304 type Error = CamelError;
1305 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1306
1307 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1308 Poll::Ready(Ok(()))
1309 }
1310
1311 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1312 let config = self.config.clone();
1313 let docker = self.docker.clone();
1314 Box::pin(async move {
1315 let operation_name = exchange
1316 .input
1317 .header(HEADER_ACTION)
1318 .and_then(|v| v.as_str().map(|s| s.to_string()))
1319 .unwrap_or_else(|| config.operation.clone());
1320
1321 let operation = parse_producer_operation(&operation_name)?;
1322
1323 match operation {
1324 ProducerOperation::List => {
1325 handle_list(docker, config, &mut exchange).await?;
1326 }
1327 ProducerOperation::Run => {
1328 handle_run(docker, config, &mut exchange).await?;
1329 }
1330 ProducerOperation::Start => {
1331 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1332 .await?;
1333 }
1334 ProducerOperation::Stop => {
1335 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1336 .await?;
1337 }
1338 ProducerOperation::Remove => {
1339 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1340 .await?;
1341 }
1342 ProducerOperation::Exec => {
1343 handle_exec(docker, config, &mut exchange).await?;
1344 }
1345 ProducerOperation::NetworkCreate => {
1346 handle_network_create(docker, config, &mut exchange).await?;
1347 }
1348 ProducerOperation::NetworkConnect => {
1349 handle_network_connect(docker, config, &mut exchange).await?;
1350 }
1351 ProducerOperation::NetworkDisconnect => {
1352 handle_network_disconnect(docker, config, &mut exchange).await?;
1353 }
1354 ProducerOperation::NetworkRemove => {
1355 handle_network_remove(docker, config, &mut exchange).await?;
1356 }
1357 ProducerOperation::NetworkList => {
1358 handle_network_list(docker, config, &mut exchange).await?;
1359 }
1360 }
1361
1362 Ok(exchange)
1363 })
1364 }
1365}
1366
1367pub struct ContainerConsumer {
1372 config: ContainerConfig,
1373 #[allow(dead_code)]
1376 runtime: Arc<dyn RuntimeObservability>,
1377}
1378
1379impl ContainerConsumer {
1380 pub fn new(config: ContainerConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
1381 Self { config, runtime }
1382 }
1383}
1384
1385#[async_trait]
1386impl Consumer for ContainerConsumer {
1387 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1388 match self.config.operation.as_str() {
1389 "events" => self.start_events_consumer(context).await,
1390 "logs" => self.start_logs_consumer(context).await,
1391 _ => Err(CamelError::EndpointCreationFailed(format!(
1392 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1393 self.config.operation
1394 ))),
1395 }
1396 }
1397
1398 async fn stop(&mut self) -> Result<(), CamelError> {
1399 Ok(())
1400 }
1401
1402 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1403 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1404 }
1405}
1406
1407impl ContainerConsumer {
1408 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1409 use futures::StreamExt;
1410
1411 let cancel = context.cancel_token();
1412
1413 loop {
1417 let docker = match retry_async_cancelable(
1418 &self.config.reconnect,
1419 Some("container-events"),
1420 || async { self.config.connect_docker().await },
1421 |_| true,
1422 &cancel,
1423 )
1424 .await
1425 {
1426 Ok(d) => d,
1427 Err(_) if context.is_cancelled() => {
1428 tracing::info!("Container events consumer shutting down");
1429 return Ok(());
1430 }
1431 Err(e) => {
1432 tracing::error!(error = %e, "Container events consumer exhausted reconnect attempts"); return Err(e);
1435 }
1436 };
1437
1438 let mut event_stream = docker.events(None::<EventsOptions>);
1439
1440 loop {
1441 tokio::select! {
1442 _ = context.cancelled() => {
1443 tracing::info!("Container events consumer shutting down");
1444 return Ok(());
1445 }
1446
1447 msg = event_stream.next() => {
1448 match msg {
1449 Some(Ok(event)) => {
1450 let formatted = format_docker_event(&event);
1451 let message = Message::new(Body::Text(formatted));
1452 let exchange = Exchange::new(message);
1453
1454 if let Err(e) = context.send(exchange).await {
1455 tracing::error!("Failed to send exchange: {:?}", e);
1457 break;
1458 }
1459 }
1460 Some(Err(e)) => {
1461 tracing::error!("Docker event stream error: {}. Reconnecting...", e); break;
1464 }
1465 None => {
1466 tracing::info!("Docker event stream ended. Reconnecting...");
1467 break;
1468 }
1469 }
1470 }
1471 }
1472 }
1473
1474 tokio::select! {
1475 _ = context.cancelled() => {
1476 tracing::info!("Container events consumer shutting down");
1477 return Ok(());
1478 }
1479 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1480 }
1481 }
1482 }
1483
1484 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1485 use futures::StreamExt;
1486
1487 let container_id = self.config.container_id.clone().ok_or_else(|| {
1488 CamelError::EndpointCreationFailed(
1489 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1490 .to_string(),
1491 )
1492 })?;
1493
1494 let cancel = context.cancel_token();
1495
1496 loop {
1500 let docker = match retry_async_cancelable(
1501 &self.config.reconnect,
1502 Some("container-logs"),
1503 || async { self.config.connect_docker().await },
1504 |_| true,
1505 &cancel,
1506 )
1507 .await
1508 {
1509 Ok(d) => d,
1510 Err(_) if context.is_cancelled() => {
1511 tracing::info!("Container logs consumer shutting down");
1512 return Ok(());
1513 }
1514 Err(e) => {
1515 tracing::error!(error = %e, "Container logs consumer exhausted reconnect attempts"); return Err(e);
1518 }
1519 };
1520
1521 let tail = self
1522 .config
1523 .tail
1524 .clone()
1525 .unwrap_or_else(|| "all".to_string());
1526
1527 let options = LogsOptions {
1528 follow: self.config.follow,
1529 stdout: true,
1530 stderr: true,
1531 timestamps: self.config.timestamps,
1532 tail,
1533 ..Default::default()
1534 };
1535
1536 let mut log_stream = docker.logs(&container_id, Some(options));
1537 let container_id_header = container_id.clone();
1538
1539 loop {
1540 tokio::select! {
1541 _ = context.cancelled() => {
1542 tracing::info!("Container logs consumer shutting down");
1543 return Ok(());
1544 }
1545
1546 msg = log_stream.next() => {
1547 match msg {
1548 Some(Ok(log_output)) => {
1549 let (stream_type, content) = match log_output {
1550 bollard::container::LogOutput::StdOut { message } => {
1551 ("stdout", String::from_utf8_lossy(&message).into_owned())
1552 }
1553 bollard::container::LogOutput::StdErr { message } => {
1554 ("stderr", String::from_utf8_lossy(&message).into_owned())
1555 }
1556 bollard::container::LogOutput::Console { message } => {
1557 ("console", String::from_utf8_lossy(&message).into_owned())
1558 }
1559 bollard::container::LogOutput::StdIn { message } => {
1560 ("stdin", String::from_utf8_lossy(&message).into_owned())
1561 }
1562 };
1563
1564 let content = content.trim_end();
1565 if content.is_empty() {
1566 continue;
1567 }
1568
1569 let mut message = Message::new(Body::Text(content.to_string()));
1570 message.set_header(
1571 HEADER_CONTAINER_ID,
1572 serde_json::Value::String(container_id_header.clone()),
1573 );
1574 message.set_header(
1575 HEADER_LOG_STREAM,
1576 serde_json::Value::String(stream_type.to_string()),
1577 );
1578
1579 if self.config.timestamps
1580 && let Some(ts) = extract_timestamp(content) {
1581 message.set_header(
1582 HEADER_LOG_TIMESTAMP,
1583 serde_json::Value::String(ts),
1584 );
1585 }
1586
1587 let exchange = Exchange::new(message);
1588
1589 if let Err(e) = context.send(exchange).await {
1590 tracing::error!("Failed to send log exchange: {:?}", e);
1592 break;
1593 }
1594 }
1595 Some(Err(e)) => {
1596 tracing::error!("Docker log stream error: {}. Reconnecting...", e); break;
1599 }
1600 None => {
1601 if self.config.follow {
1602 tracing::info!("Docker log stream ended. Reconnecting...");
1603 break;
1604 } else {
1605 tracing::info!("Container logs consumer finished (follow=false)");
1606 return Ok(());
1607 }
1608 }
1609 }
1610 }
1611 }
1612 }
1613
1614 tokio::select! {
1615 _ = context.cancelled() => {
1616 tracing::info!("Container logs consumer shutting down");
1617 return Ok(());
1618 }
1619 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1620 }
1621 }
1622 }
1623}
1624
1625fn extract_timestamp(log_line: &str) -> Option<String> {
1626 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1627 if parts.len() > 1 && parts[0].contains('T') {
1628 Some(parts[0].to_string())
1629 } else {
1630 None
1631 }
1632}
1633
1634pub struct ContainerComponent {
1642 config: Option<ContainerGlobalConfig>,
1643}
1644
1645impl ContainerComponent {
1646 pub fn new() -> Self {
1648 Self { config: None }
1649 }
1650
1651 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1653 Self {
1654 config: Some(config),
1655 }
1656 }
1657
1658 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1660 Self { config }
1661 }
1662}
1663
1664impl Default for ContainerComponent {
1665 fn default() -> Self {
1666 Self::new()
1667 }
1668}
1669
1670impl Component for ContainerComponent {
1671 fn scheme(&self) -> &str {
1672 "container"
1673 }
1674
1675 fn create_endpoint(
1676 &self,
1677 uri: &str,
1678 ctx: &dyn camel_component_api::ComponentContext,
1679 ) -> Result<Box<dyn Endpoint>, CamelError> {
1680 let mut config = ContainerConfig::from_uri(uri)?;
1681 if let Some(ref global) = self.config {
1683 config.apply_global_defaults(global);
1684 }
1685 let health_check = ContainerHealthCheck::new(&config);
1686 ctx.register_current_route_health_check(Arc::new(health_check));
1687 Ok(Box::new(ContainerEndpoint {
1688 uri: uri.to_string(),
1689 config,
1690 }))
1691 }
1692}
1693
1694pub struct ContainerEndpoint {
1701 uri: String,
1702 config: ContainerConfig,
1703}
1704
1705impl ContainerEndpoint {
1706 pub fn docker_host(&self) -> Option<&str> {
1709 self.config.host.as_deref()
1710 }
1711}
1712
1713impl Endpoint for ContainerEndpoint {
1714 fn uri(&self) -> &str {
1715 &self.uri
1716 }
1717
1718 fn create_consumer(
1719 &self,
1720 rt: Arc<dyn camel_component_api::RuntimeObservability>,
1721 ) -> Result<Box<dyn Consumer>, CamelError> {
1722 Ok(Box::new(ContainerConsumer::new(self.config.clone(), rt)))
1723 }
1724
1725 fn create_producer(
1726 &self,
1727 _rt: Arc<dyn camel_component_api::RuntimeObservability>,
1728 _ctx: &ProducerContext,
1729 ) -> Result<BoxProcessor, CamelError> {
1730 let docker = self.config.connect_docker_client()?;
1731 Ok(BoxProcessor::new(ContainerProducer {
1732 config: self.config.clone(),
1733 docker,
1734 }))
1735 }
1736}
1737
1738#[cfg(test)]
1739mod tests {
1740 use camel_component_api::test_support::PanicRuntimeObservability;
1741 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1742 std::sync::Arc::new(PanicRuntimeObservability)
1743 }
1744 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
1745 std::sync::Arc::new(PanicRuntimeObservability)
1746 }
1747
1748 use super::*;
1749 use camel_component_api::NoOpComponentContext;
1750
1751 #[test]
1752 fn test_container_config() {
1753 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1754 assert_eq!(config.operation, "run");
1755 assert_eq!(config.image.as_deref(), Some("alpine"));
1756 assert!(config.host.is_none());
1758 }
1759
1760 #[test]
1761 fn test_global_config_applied_to_endpoint() {
1762 let global =
1765 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1766 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1767 assert!(
1768 config.host.is_none(),
1769 "URI without ?host= should leave host as None"
1770 );
1771 config.apply_global_defaults(&global);
1772 assert_eq!(
1773 config.host.as_deref(),
1774 Some("unix:///custom/docker.sock"),
1775 "global docker_host must be applied when URI did not set host"
1776 );
1777 }
1778
1779 #[test]
1780 fn test_uri_param_wins_over_global_config() {
1781 let global =
1783 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1784 let mut config =
1785 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1786 .unwrap();
1787 assert_eq!(
1788 config.host.as_deref(),
1789 Some("unix:///override.sock"),
1790 "URI-set host should be parsed correctly"
1791 );
1792 config.apply_global_defaults(&global);
1793 assert_eq!(
1794 config.host.as_deref(),
1795 Some("unix:///override.sock"),
1796 "global config must NOT override a host already set by URI"
1797 );
1798 }
1799
1800 #[test]
1801 fn test_container_config_parses_name() {
1802 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1803 assert_eq!(config.name.as_deref(), Some("my-container"));
1804 }
1805
1806 #[test]
1807 fn test_parse_producer_operation_known() {
1808 assert_eq!(
1809 parse_producer_operation("list").unwrap(),
1810 ProducerOperation::List
1811 );
1812 assert_eq!(
1813 parse_producer_operation("run").unwrap(),
1814 ProducerOperation::Run
1815 );
1816 assert_eq!(
1817 parse_producer_operation("start").unwrap(),
1818 ProducerOperation::Start
1819 );
1820 assert_eq!(
1821 parse_producer_operation("stop").unwrap(),
1822 ProducerOperation::Stop
1823 );
1824 assert_eq!(
1825 parse_producer_operation("remove").unwrap(),
1826 ProducerOperation::Remove
1827 );
1828 }
1829
1830 #[test]
1831 fn test_parse_producer_operation_unknown() {
1832 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1833 match err {
1834 CamelError::ProcessorError(msg) => {
1835 assert!(
1836 msg.contains("Unknown container operation"),
1837 "Unexpected error message: {}",
1838 msg
1839 );
1840 }
1841 _ => panic!("Expected ProcessorError for unknown operation"),
1842 }
1843 }
1844
1845 #[test]
1846 fn test_parse_producer_operation_new_variants() {
1847 assert_eq!(
1848 parse_producer_operation("exec").unwrap(),
1849 ProducerOperation::Exec
1850 );
1851 assert_eq!(
1852 parse_producer_operation("network-create").unwrap(),
1853 ProducerOperation::NetworkCreate
1854 );
1855 assert_eq!(
1856 parse_producer_operation("network-connect").unwrap(),
1857 ProducerOperation::NetworkConnect
1858 );
1859 assert_eq!(
1860 parse_producer_operation("network-disconnect").unwrap(),
1861 ProducerOperation::NetworkDisconnect
1862 );
1863 assert_eq!(
1864 parse_producer_operation("network-remove").unwrap(),
1865 ProducerOperation::NetworkRemove
1866 );
1867 assert_eq!(
1868 parse_producer_operation("network-list").unwrap(),
1869 ProducerOperation::NetworkList
1870 );
1871 }
1872
1873 #[test]
1874 fn test_resolve_container_name_header_overrides_config() {
1875 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1876 let mut exchange = Exchange::new(Message::new(""));
1877 exchange.input.set_header(
1878 HEADER_CONTAINER_NAME,
1879 serde_json::Value::String("header-name".to_string()),
1880 );
1881
1882 let resolved = resolve_container_name(&exchange, &config);
1883 assert_eq!(resolved.as_deref(), Some("header-name"));
1884 }
1885
1886 #[test]
1887 fn test_container_config_rejects_tcp_host() {
1888 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1889 let err = config.connect_docker_client().unwrap_err();
1890 match err {
1891 CamelError::ProcessorError(msg) => {
1892 assert!(
1893 msg.to_lowercase().contains("tcp"),
1894 "Expected TCP scheme error, got: {}",
1895 msg
1896 );
1897 }
1898 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1899 }
1900 }
1901
1902 #[tokio::test]
1903 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1904 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1905 let remove_called_clone = remove_called.clone();
1906
1907 let result = run_container_with_cleanup(
1908 || async { Ok("container-123".to_string()) },
1909 |_id| async move {
1910 Err(CamelError::ProcessorError(
1911 "Failed to start container".to_string(),
1912 ))
1913 },
1914 move |_id| {
1915 let remove_called_inner = remove_called_clone.clone();
1916 async move {
1917 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1918 Ok(())
1919 }
1920 },
1921 )
1922 .await;
1923
1924 assert!(result.is_err(), "Expected start failure to bubble up");
1925 assert!(
1926 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1927 "Expected cleanup to remove container"
1928 );
1929 }
1930
1931 #[test]
1932 fn test_container_component_creates_endpoint() {
1933 let component = ContainerComponent::new();
1934 assert_eq!(component.scheme(), "container");
1935 let ctx = NoOpComponentContext;
1936 let endpoint = component
1937 .create_endpoint("container:run?image=alpine", &ctx)
1938 .unwrap();
1939 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1940 }
1941
1942 #[test]
1943 fn test_container_config_parses_ports() {
1944 let config =
1945 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1946 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1947 }
1948
1949 #[test]
1950 fn test_container_config_parses_env() {
1951 let config =
1952 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1953 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1954 }
1955
1956 #[test]
1957 fn test_container_config_parses_logs_options() {
1958 let config = ContainerConfig::from_uri(
1959 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1960 )
1961 .unwrap();
1962 assert_eq!(config.operation, "logs");
1963 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1964 assert!(config.follow);
1965 assert!(config.timestamps);
1966 assert_eq!(config.tail.as_deref(), Some("100"));
1967 }
1968
1969 #[test]
1970 fn test_container_config_logs_defaults() {
1971 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1972 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1976
1977 #[test]
1978 fn test_parse_ports_single() {
1979 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1980 let (exposed, bindings) = config.parse_ports().unwrap();
1981
1982 assert!(exposed.contains(&"80/tcp".to_string()));
1983 assert!(bindings.contains_key("80/tcp"));
1984
1985 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1986 assert_eq!(binding.len(), 1);
1987 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1988 }
1989
1990 #[test]
1991 fn test_parse_ports_multiple() {
1992 let config =
1993 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1994 let (exposed, bindings) = config.parse_ports().unwrap();
1995
1996 assert!(exposed.contains(&"80/tcp".to_string()));
1997 assert!(exposed.contains(&"443/tcp".to_string()));
1998 assert_eq!(bindings.len(), 2);
1999 }
2000
2001 #[test]
2002 fn test_parse_ports_with_protocol() {
2003 let config =
2004 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
2005 .unwrap();
2006 let (exposed, _bindings) = config.parse_ports().unwrap();
2007
2008 assert!(exposed.contains(&"80/tcp".to_string()));
2009 assert!(exposed.contains(&"53/udp".to_string()));
2010 }
2011
2012 #[test]
2013 fn test_parse_ports_none() {
2014 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2015 let (exposed, bindings) = config.parse_ports().unwrap();
2016 assert!(exposed.is_empty());
2017 assert!(bindings.is_empty());
2018 }
2019
2020 #[test]
2021 fn test_parse_env_single() {
2022 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
2023 let env = config.parse_env().unwrap();
2024
2025 assert_eq!(env.len(), 1);
2026 assert_eq!(env[0], "FOO=bar");
2027 }
2028
2029 #[test]
2030 fn test_parse_env_multiple() {
2031 let config =
2032 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
2033 .unwrap();
2034 let env = config.parse_env().unwrap();
2035
2036 assert_eq!(env.len(), 3);
2037 assert!(env.contains(&"FOO=bar".to_string()));
2038 assert!(env.contains(&"BAZ=qux".to_string()));
2039 assert!(env.contains(&"NUM=123".to_string()));
2040 }
2041
2042 #[test]
2043 fn test_parse_env_none() {
2044 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2045 assert!(config.parse_env().is_none());
2046 }
2047
2048 use camel_component_api::Message;
2049 use std::sync::Arc;
2050
2051 #[tokio::test]
2052 async fn test_container_producer_resolves_operation_from_header() {
2053 let docker = match Docker::connect_with_local_defaults() {
2055 Ok(d) => d,
2056 Err(_) => {
2057 eprintln!("Skipping test: Could not connect to Docker daemon");
2058 return;
2059 }
2060 };
2061
2062 if docker.ping().await.is_err() {
2063 eprintln!("Skipping test: Docker daemon not responding to ping");
2064 return;
2065 }
2066
2067 let component = ContainerComponent::new();
2068 let ctx = NoOpComponentContext;
2069 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2070
2071 let ctx = ProducerContext::new();
2072 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2073
2074 let mut exchange = Exchange::new(Message::new(""));
2075 exchange
2076 .input
2077 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2078
2079 use tower::ServiceExt;
2080 let result = producer
2081 .ready()
2082 .await
2083 .unwrap()
2084 .call(exchange)
2085 .await
2086 .unwrap();
2087
2088 assert_eq!(
2090 result
2091 .input
2092 .header(HEADER_ACTION_RESULT)
2093 .map(|v| v.as_str().unwrap()),
2094 Some("success")
2095 );
2096 }
2097
2098 #[tokio::test]
2099 async fn test_container_producer_connection_error_on_invalid_host() {
2100 let component = ContainerComponent::new();
2102 let ctx = NoOpComponentContext;
2103 let endpoint = component
2104 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2105 .unwrap();
2106
2107 let ctx = ProducerContext::new();
2108 let result = endpoint.create_producer(rt(), &ctx);
2109
2110 assert!(
2112 result.is_err(),
2113 "Expected error when connecting to invalid host"
2114 );
2115 let err = result.unwrap_err();
2116 match &err {
2117 CamelError::ProcessorError(msg) => {
2118 assert!(
2119 msg.to_lowercase().contains("connection")
2120 || msg.to_lowercase().contains("connect")
2121 || msg.to_lowercase().contains("socket")
2122 || msg.contains("docker"),
2123 "Error message should indicate connection failure, got: {}",
2124 msg
2125 );
2126 }
2127 _ => panic!("Expected ProcessorError, got: {:?}", err),
2128 }
2129 }
2130
2131 #[tokio::test]
2133 async fn test_container_producer_lifecycle_operations_missing_id() {
2134 let docker = match Docker::connect_with_local_defaults() {
2136 Ok(d) => d,
2137 Err(_) => {
2138 eprintln!("Skipping test: Could not connect to Docker daemon");
2139 return;
2140 }
2141 };
2142
2143 if docker.ping().await.is_err() {
2144 eprintln!("Skipping test: Docker daemon not responding to ping");
2145 return;
2146 }
2147
2148 let component = ContainerComponent::new();
2149 let ctx = NoOpComponentContext;
2150 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2151 let ctx = ProducerContext::new();
2152 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2153
2154 for operation in ["start", "stop", "remove"] {
2156 let mut exchange = Exchange::new(Message::new(""));
2157 exchange.input.set_header(
2158 HEADER_ACTION,
2159 serde_json::Value::String(operation.to_string()),
2160 );
2161 use tower::ServiceExt;
2164 let result = producer.ready().await.unwrap().call(exchange).await;
2165
2166 assert!(
2167 result.is_err(),
2168 "Expected error for {} operation without CamelContainerId",
2169 operation
2170 );
2171 let err = result.unwrap_err();
2172 match &err {
2173 CamelError::ProcessorError(msg) => {
2174 assert!(
2175 msg.contains(HEADER_CONTAINER_ID),
2176 "Error message should mention {}, got: {}",
2177 HEADER_CONTAINER_ID,
2178 msg
2179 );
2180 }
2181 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2182 }
2183 }
2184 }
2185
2186 #[tokio::test]
2188 async fn test_container_producer_stop_nonexistent() {
2189 let docker = match Docker::connect_with_local_defaults() {
2191 Ok(d) => d,
2192 Err(_) => {
2193 eprintln!("Skipping test: Could not connect to Docker daemon");
2194 return;
2195 }
2196 };
2197
2198 if docker.ping().await.is_err() {
2199 eprintln!("Skipping test: Docker daemon not responding to ping");
2200 return;
2201 }
2202
2203 let component = ContainerComponent::new();
2204 let ctx = NoOpComponentContext;
2205 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2206 let ctx = ProducerContext::new();
2207 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2208
2209 let mut exchange = Exchange::new(Message::new(""));
2210 exchange
2211 .input
2212 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2213 exchange.input.set_header(
2214 HEADER_CONTAINER_ID,
2215 serde_json::Value::String("nonexistent-container-123".into()),
2216 );
2217
2218 use tower::ServiceExt;
2219 let result = producer.ready().await.unwrap().call(exchange).await;
2220
2221 assert!(
2222 result.is_err(),
2223 "Expected error when stopping nonexistent container"
2224 );
2225 let err = result.unwrap_err();
2226 match &err {
2227 CamelError::ProcessorError(msg) => {
2228 assert!(
2230 msg.to_lowercase().contains("no such container")
2231 || msg.to_lowercase().contains("not found")
2232 || msg.contains("404"),
2233 "Error message should indicate container not found, got: {}",
2234 msg
2235 );
2236 }
2237 _ => panic!("Expected ProcessorError, got: {:?}", err),
2238 }
2239 }
2240
2241 #[tokio::test]
2243 async fn test_container_producer_run_missing_image() {
2244 let docker = match Docker::connect_with_local_defaults() {
2246 Ok(d) => d,
2247 Err(_) => {
2248 eprintln!("Skipping test: Could not connect to Docker daemon");
2249 return;
2250 }
2251 };
2252
2253 if docker.ping().await.is_err() {
2254 eprintln!("Skipping test: Docker daemon not responding to ping");
2255 return;
2256 }
2257
2258 let component = ContainerComponent::new();
2260 let ctx = NoOpComponentContext;
2261 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2262 let ctx = ProducerContext::new();
2263 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2264
2265 let mut exchange = Exchange::new(Message::new(""));
2266 exchange
2267 .input
2268 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2269 use tower::ServiceExt;
2272 let result = producer.ready().await.unwrap().call(exchange).await;
2273
2274 assert!(
2275 result.is_err(),
2276 "Expected error for run operation without image"
2277 );
2278 let err = result.unwrap_err();
2279 match &err {
2280 CamelError::ProcessorError(msg) => {
2281 assert!(
2282 msg.to_lowercase().contains("image"),
2283 "Error message should mention 'image', got: {}",
2284 msg
2285 );
2286 }
2287 _ => panic!("Expected ProcessorError, got: {:?}", err),
2288 }
2289 }
2290
2291 #[tokio::test]
2293 async fn test_container_producer_run_image_from_header() {
2294 let docker = match Docker::connect_with_local_defaults() {
2296 Ok(d) => d,
2297 Err(_) => {
2298 eprintln!("Skipping test: Could not connect to Docker daemon");
2299 return;
2300 }
2301 };
2302
2303 if docker.ping().await.is_err() {
2304 eprintln!("Skipping test: Docker daemon not responding to ping");
2305 return;
2306 }
2307
2308 let component = ContainerComponent::new();
2310 let ctx = NoOpComponentContext;
2311 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2312 let ctx = ProducerContext::new();
2313 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2314
2315 let mut exchange = Exchange::new(Message::new(""));
2316 exchange
2317 .input
2318 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2319 exchange.input.set_header(
2321 HEADER_IMAGE,
2322 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2323 );
2324
2325 use tower::ServiceExt;
2326 let result = producer.ready().await.unwrap().call(exchange).await;
2327
2328 assert!(
2330 result.is_err(),
2331 "Expected error when running container with nonexistent image"
2332 );
2333 let err = result.unwrap_err();
2334 match &err {
2335 CamelError::ProcessorError(msg) => {
2336 assert!(
2338 msg.to_lowercase().contains("no such image")
2339 || msg.to_lowercase().contains("not found")
2340 || msg.to_lowercase().contains("image")
2341 || msg.to_lowercase().contains("pull")
2342 || msg.contains("404"),
2343 "Error message should indicate image issue, got: {}",
2344 msg
2345 );
2346 }
2347 _ => panic!("Expected ProcessorError, got: {:?}", err),
2348 }
2349 }
2350
2351 #[tokio::test]
2354 async fn test_container_producer_run_alpine_container() {
2355 let docker = match Docker::connect_with_local_defaults() {
2356 Ok(d) => d,
2357 Err(_) => {
2358 eprintln!("Skipping test: Could not connect to Docker daemon");
2359 return;
2360 }
2361 };
2362
2363 if docker.ping().await.is_err() {
2364 eprintln!("Skipping test: Docker daemon not responding to ping");
2365 return;
2366 }
2367
2368 let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2370 let has_alpine = images
2371 .iter()
2372 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2373
2374 if !has_alpine {
2375 eprintln!("Pulling alpine:latest image...");
2376 let mut stream = docker.create_image(
2377 Some(CreateImageOptions {
2378 from_image: Some("alpine:latest".to_string()),
2379 ..Default::default()
2380 }),
2381 None,
2382 None,
2383 );
2384
2385 use futures::StreamExt;
2386 while let Some(_item) = stream.next().await {
2387 }
2389 eprintln!("Image pulled successfully");
2390 }
2391
2392 let component = ContainerComponent::new();
2394 let ctx = NoOpComponentContext;
2395 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2396 let ctx = ProducerContext::new();
2397 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2398
2399 let timestamp = std::time::SystemTime::now()
2401 .duration_since(std::time::UNIX_EPOCH)
2402 .unwrap()
2403 .as_millis();
2404 let container_name = format!("test-rust-camel-{}", timestamp);
2405 let mut exchange = Exchange::new(Message::new(""));
2406 exchange.input.set_header(
2407 HEADER_IMAGE,
2408 serde_json::Value::String("alpine:latest".into()),
2409 );
2410 exchange.input.set_header(
2411 HEADER_CONTAINER_NAME,
2412 serde_json::Value::String(container_name.clone()),
2413 );
2414
2415 use tower::ServiceExt;
2416 let result = producer
2417 .ready()
2418 .await
2419 .unwrap()
2420 .call(exchange)
2421 .await
2422 .expect("Container run should succeed");
2423
2424 let container_id = result
2426 .input
2427 .header(HEADER_CONTAINER_ID)
2428 .and_then(|v| v.as_str().map(|s| s.to_string()))
2429 .expect("Expected container ID header");
2430 assert!(!container_id.is_empty(), "Container ID should not be empty");
2431
2432 assert_eq!(
2434 result
2435 .input
2436 .header(HEADER_ACTION_RESULT)
2437 .and_then(|v| v.as_str()),
2438 Some("success")
2439 );
2440
2441 let inspect = docker
2443 .inspect_container(&container_id, None)
2444 .await
2445 .expect("Container should exist");
2446 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2447
2448 docker
2450 .remove_container(
2451 &container_id,
2452 Some(RemoveContainerOptions {
2453 force: true,
2454 ..Default::default()
2455 }),
2456 )
2457 .await
2458 .ok();
2459
2460 eprintln!("✅ Container {} created and cleaned up", container_id);
2461 }
2462
2463 #[tokio::test]
2465 async fn test_container_consumer_unsupported_operation() {
2466 use tokio::sync::mpsc;
2467
2468 let component = ContainerComponent::new();
2469 let ctx = NoOpComponentContext;
2470 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2471 let mut consumer = endpoint.create_consumer(rt()).unwrap();
2472
2473 let (tx, _rx) = mpsc::channel(16);
2475 let cancel_token = tokio_util::sync::CancellationToken::new();
2476 let context = ConsumerContext::new(tx, cancel_token);
2477
2478 let result = consumer.start(context).await;
2479
2480 assert!(
2482 result.is_err(),
2483 "Expected error for unsupported consumer operation"
2484 );
2485 let err = result.unwrap_err();
2486 match &err {
2487 CamelError::EndpointCreationFailed(msg) => {
2488 assert!(
2489 msg.contains("Consumer only supports 'events' or 'logs'"),
2490 "Error message should mention events or logs support, got: {}",
2491 msg
2492 );
2493 }
2494 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2495 }
2496 }
2497
2498 #[test]
2499 fn test_container_consumer_concurrency_model_is_concurrent() {
2500 let consumer = ContainerConsumer {
2501 config: ContainerConfig::from_uri("container:events").unwrap(),
2502 runtime: test_rt(),
2503 };
2504
2505 assert_eq!(
2506 consumer.concurrency_model(),
2507 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2508 );
2509 }
2510
2511 #[tokio::test]
2515 async fn test_container_consumer_cancellation() {
2516 use std::sync::atomic::{AtomicBool, Ordering};
2517 use tokio::sync::mpsc;
2518
2519 let docker = match Docker::connect_with_local_defaults() {
2521 Ok(d) => d,
2522 Err(_) => {
2523 eprintln!("Skipping test: Could not connect to Docker daemon");
2524 return;
2525 }
2526 };
2527
2528 if docker.ping().await.is_err() {
2529 eprintln!("Skipping test: Docker daemon not responding to ping");
2530 return;
2531 }
2532
2533 let component = ContainerComponent::new();
2534 let ctx = NoOpComponentContext;
2535 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2536 let mut consumer = endpoint.create_consumer(rt()).unwrap();
2537
2538 let (tx, _rx) = mpsc::channel(16);
2540 let cancel_token = tokio_util::sync::CancellationToken::new();
2541 let context = ConsumerContext::new(tx, cancel_token.clone());
2542
2543 let completed = Arc::new(AtomicBool::new(false));
2545 let completed_clone = completed.clone();
2546
2547 let handle = tokio::spawn(async move {
2549 let result = consumer.start(context).await;
2550 completed_clone.store(true, Ordering::SeqCst);
2552 result
2553 });
2554
2555 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2557
2558 assert!(
2560 !completed.load(Ordering::SeqCst),
2561 "Consumer should still be running before cancellation"
2562 );
2563
2564 cancel_token.cancel();
2566
2567 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2569
2570 assert!(
2572 result.is_ok(),
2573 "Consumer should gracefully shut down after cancellation"
2574 );
2575
2576 assert!(
2578 completed.load(Ordering::SeqCst),
2579 "Consumer should have completed after cancellation"
2580 );
2581 }
2582
2583 #[tokio::test]
2587 async fn test_container_producer_list_containers() {
2588 let docker = match Docker::connect_with_local_defaults() {
2591 Ok(d) => d,
2592 Err(_) => {
2593 eprintln!("Skipping test: Could not connect to Docker daemon");
2594 return;
2595 }
2596 };
2597
2598 if docker.ping().await.is_err() {
2599 eprintln!("Skipping test: Docker daemon not responding to ping");
2600 return;
2601 }
2602
2603 let component = ContainerComponent::new();
2605 let ctx = NoOpComponentContext;
2606 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2607
2608 let ctx = ProducerContext::new();
2609 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2610
2611 let mut exchange = Exchange::new(Message::new(""));
2613 exchange
2614 .input
2615 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2616
2617 use tower::ServiceExt;
2619 let result = producer
2620 .ready()
2621 .await
2622 .unwrap()
2623 .call(exchange)
2624 .await
2625 .expect("Producer should succeed when Docker is available");
2626
2627 match &result.input.body {
2630 camel_component_api::Body::Json(json_value) => {
2631 assert!(
2632 json_value.is_array(),
2633 "Expected input body to be a JSON array, got: {:?}",
2634 json_value
2635 );
2636 }
2637 other => panic!("Expected Body::Json with array, got: {:?}", other),
2638 }
2639 }
2640
2641 #[test]
2642 fn test_container_config_parses_volumes() {
2643 let config = ContainerConfig::from_uri(
2644 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2645 )
2646 .unwrap();
2647 assert_eq!(
2648 config.volumes.as_deref(),
2649 Some("./html:/usr/share/nginx/html:ro")
2650 );
2651 }
2652
2653 #[test]
2654 fn test_container_config_parses_exec_params() {
2655 let config = ContainerConfig::from_uri(
2656 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2657 )
2658 .unwrap();
2659 assert_eq!(config.operation, "exec");
2660 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2661 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2662 assert_eq!(config.user.as_deref(), Some("root"));
2663 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2664 assert!(config.detach);
2665 }
2666
2667 #[test]
2668 fn test_container_config_parses_network_create_params() {
2669 let config =
2670 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2671 .unwrap();
2672 assert_eq!(config.operation, "network-create");
2673 assert_eq!(config.name.as_deref(), Some("my-net"));
2674 assert_eq!(config.driver.as_deref(), Some("bridge"));
2675 }
2676
2677 #[test]
2678 fn test_container_config_defaults_new_fields() {
2679 let config = ContainerConfig::from_uri("container:list").unwrap();
2680 assert!(config.volumes.is_none());
2681 assert!(config.user.is_none());
2682 assert!(config.workdir.is_none());
2683 assert!(!config.detach);
2684 assert!(config.driver.is_none());
2685 assert!(!config.force);
2686 }
2687
2688 #[test]
2689 fn test_parse_volumes_bind_mount() {
2690 let config = ContainerConfig::from_uri(
2691 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2692 )
2693 .unwrap();
2694 let (binds, anon) = config.parse_volumes().unwrap();
2695 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2696 assert!(anon.is_empty());
2697 }
2698
2699 #[test]
2700 fn test_parse_volumes_named_volume() {
2701 let config =
2702 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2703 .unwrap();
2704 let (binds, anon) = config.parse_volumes().unwrap();
2705 assert_eq!(binds, vec!["data:/var/lib/data"]);
2706 assert!(anon.is_empty());
2707 }
2708
2709 #[test]
2710 fn test_parse_volumes_anonymous() {
2711 let config =
2712 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2713 let (binds, anon) = config.parse_volumes().unwrap();
2714 assert!(binds.is_empty());
2715 assert!(anon.contains(&"/tmp/app-data".to_string()));
2716 }
2717
2718 #[test]
2719 fn test_parse_volumes_anonymous_with_mode() {
2720 let config =
2721 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2722 .unwrap();
2723 let (binds, anon) = config.parse_volumes().unwrap();
2724 assert!(binds.is_empty());
2725 assert!(anon.contains(&"/tmp/app-data".to_string()));
2726 }
2727
2728 #[test]
2729 fn test_parse_volumes_multiple() {
2730 let config = ContainerConfig::from_uri(
2731 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2732 )
2733 .unwrap();
2734 let (binds, anon) = config.parse_volumes().unwrap();
2735 assert_eq!(binds.len(), 2);
2736 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2737 assert!(binds.contains(&"data:/var/log/app".to_string()));
2738 assert!(anon.is_empty());
2739 }
2740
2741 #[test]
2742 fn test_parse_volumes_mixed() {
2743 let config = ContainerConfig::from_uri(
2744 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2745 )
2746 .unwrap();
2747 let (binds, anon) = config.parse_volumes().unwrap();
2748 assert_eq!(binds.len(), 1);
2749 assert!(anon.contains(&"/tmp/cache".to_string()));
2750 }
2751
2752 #[test]
2753 fn test_parse_volumes_none() {
2754 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2755 assert!(config.parse_volumes().is_none());
2756 }
2757
2758 #[test]
2759 fn test_parse_volumes_empty_entry_skipped() {
2760 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2761 assert!(config.parse_volumes().is_none());
2762 }
2763
2764 #[test]
2765 fn test_parse_volumes_rw_mode() {
2766 let config =
2767 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2768 .unwrap();
2769 let (binds, _) = config.parse_volumes().unwrap();
2770 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2771 }
2772
2773 #[test]
2774 fn test_container_config_from_uri_parses_false_flags() {
2775 let config = ContainerConfig::from_uri(
2776 "container:logs?containerId=a&follow=false×tamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2777 )
2778 .unwrap();
2779 assert!(!config.follow);
2780 assert!(!config.timestamps);
2781 assert!(!config.auto_pull);
2782 assert!(!config.auto_remove);
2783 assert!(config.detach);
2784 assert!(config.force);
2785 }
2786
2787 #[test]
2788 fn test_docker_socket_path_validation() {
2789 let unix_cfg =
2790 ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2791 assert_eq!(
2792 unix_cfg.docker_socket_path().unwrap(),
2793 "unix:///tmp/docker.sock"
2794 );
2795
2796 let npipe_cfg =
2797 ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2798 .unwrap();
2799 assert_eq!(
2800 npipe_cfg.docker_socket_path().unwrap(),
2801 "npipe:////./pipe/docker_engine"
2802 );
2803
2804 let plain_cfg =
2805 ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2806 assert_eq!(
2807 plain_cfg.docker_socket_path().unwrap(),
2808 "/var/run/docker.sock"
2809 );
2810
2811 let bad_cfg =
2812 ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2813 assert!(bad_cfg.docker_socket_path().is_err());
2814 }
2815
2816 #[test]
2817 fn test_parse_ports_invalid_and_whitespace_entries() {
2818 let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2820 let err = cfg.parse_ports().unwrap_err();
2821 assert!(
2822 err.to_string().contains("malformed port mapping"),
2823 "expected malformed port error, got: {}",
2824 err
2825 );
2826
2827 let cfg =
2828 ContainerConfig::from_uri("container:run?ports= 8080:80 , 5353:53/udp ").unwrap();
2829 let (exposed, bindings) = cfg.parse_ports().unwrap();
2830 assert!(exposed.contains(&"80/tcp".to_string()));
2831 assert!(exposed.contains(&"53/udp".to_string()));
2832 assert_eq!(bindings.len(), 2);
2833 }
2834
2835 #[test]
2836 fn test_parse_ports_fail_fast_on_malformed() {
2837 let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2839 let err = cfg.parse_ports().unwrap_err();
2840 assert!(
2841 err.to_string()
2842 .contains("malformed port mapping 'badentry'"),
2843 "expected fail-fast on 'badentry', got: {}",
2844 err
2845 );
2846 }
2847
2848 #[test]
2849 fn test_parse_env_trims_and_filters_empty_items() {
2850 let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2851 let env = cfg.parse_env().unwrap();
2852 assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2853
2854 let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2855 assert!(cfg.parse_env().is_none());
2856 }
2857
2858 #[test]
2859 fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2860 assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2861 let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2862 assert!(binds.contains(&"a:/b:rw".to_string()));
2863 assert!(anon.contains(&"/tmp/cache".to_string()));
2864 assert!(anon.contains(&"/tmp/logs".to_string()));
2865 }
2866
2867 #[test]
2868 fn test_format_docker_event_variants_and_timestamp_extraction() {
2869 let mut attrs = std::collections::HashMap::new();
2870 attrs.insert("name".to_string(), "demo".to_string());
2871 attrs.insert("image".to_string(), "alpine:latest".to_string());
2872 attrs.insert("exitCode".to_string(), "137".to_string());
2873 let actor = bollard::models::EventActor {
2874 id: None,
2875 attributes: Some(attrs),
2876 };
2877
2878 let create_event = bollard::models::EventMessage {
2879 action: Some("create".to_string()),
2880 actor: Some(actor.clone()),
2881 ..Default::default()
2882 };
2883 assert_eq!(
2884 format_docker_event(&create_event),
2885 "[CREATE] Container demo (alpine:latest)"
2886 );
2887
2888 let die_event = bollard::models::EventMessage {
2889 action: Some("die".to_string()),
2890 actor: Some(actor),
2891 ..Default::default()
2892 };
2893 assert_eq!(
2894 format_docker_event(&die_event),
2895 "[DIE] Container demo (exit: 137)"
2896 );
2897
2898 let other_event = bollard::models::EventMessage {
2899 action: Some("oom".to_string()),
2900 actor: None,
2901 ..Default::default()
2902 };
2903 assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2904
2905 assert_eq!(
2906 extract_timestamp("2024-01-01T00:00:00Z hello"),
2907 Some("2024-01-01T00:00:00Z".to_string())
2908 );
2909 assert_eq!(extract_timestamp("hello world"), None);
2910 }
2911
2912 #[tokio::test]
2913 async fn test_run_container_with_cleanup_error_paths() {
2914 let create_fail = run_container_with_cleanup(
2915 || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2916 |_id| async move { Ok(()) },
2917 |_id| async move { Ok(()) },
2918 )
2919 .await;
2920 assert!(
2921 matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2922 );
2923
2924 let cleanup_fail = run_container_with_cleanup(
2925 || async { Ok("cid-1".to_string()) },
2926 |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2927 |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2928 )
2929 .await;
2930 match cleanup_fail {
2931 Err(CamelError::ProcessorError(msg)) => {
2932 assert!(msg.contains("Failed to start container"));
2933 assert!(msg.contains("Cleanup failed"));
2934 }
2935 other => panic!("unexpected result: {:?}", other),
2936 }
2937 }
2938
2939 #[tokio::test]
2940 async fn test_container_producer_network_lifecycle() {
2941 let docker = match Docker::connect_with_local_defaults() {
2942 Ok(d) => d,
2943 Err(_) => {
2944 eprintln!("Skipping test: Could not connect to Docker daemon");
2945 return;
2946 }
2947 };
2948 if docker.ping().await.is_err() {
2949 eprintln!("Skipping test: Docker daemon not responding to ping");
2950 return;
2951 }
2952
2953 let network_name = format!("camel-test-{}", std::process::id());
2954
2955 let component = ContainerComponent::new();
2956 let component_ctx = NoOpComponentContext;
2957
2958 let endpoint = component
2960 .create_endpoint(
2961 &format!("container:network-create?name={}", network_name),
2962 &component_ctx,
2963 )
2964 .unwrap();
2965 let ctx = ProducerContext::new();
2966 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2967
2968 let mut exchange = Exchange::new(Message::new(""));
2969 exchange.input.set_header(
2970 HEADER_ACTION,
2971 serde_json::Value::String("network-create".into()),
2972 );
2973
2974 use tower::ServiceExt;
2975 let result = producer
2976 .ready()
2977 .await
2978 .unwrap()
2979 .call(exchange)
2980 .await
2981 .expect("Network create should succeed");
2982
2983 let network_id = result
2984 .input
2985 .header(HEADER_NETWORK)
2986 .and_then(|v| v.as_str().map(|s| s.to_string()))
2987 .expect("Should have network ID");
2988
2989 assert!(!network_id.is_empty());
2990
2991 let endpoint = component
2993 .create_endpoint("container:network-list", &component_ctx)
2994 .unwrap();
2995 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
2996
2997 let mut exchange = Exchange::new(Message::new(""));
2998 exchange.input.set_header(
2999 HEADER_ACTION,
3000 serde_json::Value::String("network-list".into()),
3001 );
3002
3003 let list_result = producer
3004 .ready()
3005 .await
3006 .unwrap()
3007 .call(exchange)
3008 .await
3009 .expect("Network list should succeed");
3010
3011 match &list_result.input.body {
3012 Body::Json(json_value) => {
3013 assert!(json_value.is_array(), "Expected JSON array");
3014 }
3015 other => panic!("Expected Body::Json, got: {:?}", other),
3016 }
3017
3018 let endpoint = component
3020 .create_endpoint(
3021 &format!("container:network-remove?network={}", network_name),
3022 &component_ctx,
3023 )
3024 .unwrap();
3025 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
3026
3027 let mut exchange = Exchange::new(Message::new(""));
3028 exchange.input.set_header(
3029 HEADER_ACTION,
3030 serde_json::Value::String("network-remove".into()),
3031 );
3032
3033 let remove_result = producer
3034 .ready()
3035 .await
3036 .unwrap()
3037 .call(exchange)
3038 .await
3039 .expect("Network remove should succeed");
3040
3041 let action_result = remove_result
3042 .input
3043 .header(HEADER_ACTION_RESULT)
3044 .and_then(|v| v.as_str());
3045 assert_eq!(action_result, Some("success"));
3046 }
3047
3048 #[tokio::test]
3049 async fn test_logs_consumer_requires_container_id() {
3050 use tokio::sync::mpsc;
3051
3052 let mut consumer = ContainerConsumer {
3053 config: ContainerConfig::from_uri("container:logs").unwrap(),
3054 runtime: test_rt(),
3055 };
3056 let (tx, _rx) = mpsc::channel(4);
3057 let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
3058
3059 let err = consumer.start(context).await.unwrap_err();
3060 match err {
3061 CamelError::EndpointCreationFailed(msg) => {
3062 assert!(msg.contains("containerId is required for logs consumer"));
3063 }
3064 other => panic!("unexpected error: {:?}", other),
3065 }
3066 }
3067
3068 #[tokio::test]
3069 async fn test_events_consumer_stops_immediately_when_cancelled() {
3070 use tokio::sync::mpsc;
3071
3072 let mut consumer = ContainerConsumer {
3073 config: ContainerConfig::from_uri("container:events").unwrap(),
3074 runtime: test_rt(),
3075 };
3076
3077 let (tx, _rx) = mpsc::channel(4);
3078 let cancel = tokio_util::sync::CancellationToken::new();
3079 cancel.cancel();
3080 let context = ConsumerContext::new(tx, cancel);
3081
3082 let result = consumer.start(context).await;
3083 assert!(result.is_ok());
3084 }
3085
3086 #[test]
3087 fn test_global_config_constructors_and_endpoint_docker_host() {
3088 let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3089 let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3090 cfg.apply_global_defaults(&global);
3091
3092 let endpoint = ContainerEndpoint {
3093 uri: "container:list".to_string(),
3094 config: cfg,
3095 };
3096
3097 assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3098
3099 let component = ContainerComponent::with_config(global);
3100 assert_eq!(component.scheme(), "container");
3101 }
3102
3103 #[test]
3104 fn container_global_config_has_reconnect_policy() {
3105 let cfg = ContainerGlobalConfig::default();
3106 assert_eq!(cfg.reconnect.max_attempts, 0); assert!(cfg.reconnect.enabled);
3108 }
3109
3110 #[tokio::test]
3115 async fn retry_loop_invokes_operation_exactly_max_attempts_times() {
3116 use std::sync::Arc;
3117 use std::sync::atomic::{AtomicU32, Ordering};
3118 use std::time::Duration;
3119
3120 let policy = NetworkRetryPolicy {
3121 max_attempts: 3,
3122 initial_delay: Duration::from_millis(1),
3123 max_delay: Duration::from_millis(1),
3124 multiplier: 1.0,
3125 ..NetworkRetryPolicy::default()
3126 };
3127
3128 let calls = Arc::new(AtomicU32::new(0));
3129 let calls_clone = Arc::clone(&calls);
3130 let mut attempt: u32 = 0;
3131
3132 loop {
3133 calls_clone.fetch_add(1, Ordering::SeqCst);
3134 let result: Result<(), ()> = Err(());
3135 match result {
3136 Ok(_) => {
3137 break;
3138 }
3139 Err(_) => {
3140 attempt += 1;
3141 if !policy.should_retry(attempt) {
3142 break;
3143 }
3144 let delay = policy.delay_for(attempt - 1);
3145 tokio::time::sleep(delay).await;
3146 continue;
3147 }
3148 }
3149 }
3150
3151 assert_eq!(
3152 calls.load(Ordering::SeqCst),
3153 3,
3154 "max_attempts=3 must yield exactly 3 invocations"
3155 );
3156 }
3157}