1pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::models::{
19 ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
20};
21use bollard::query_parameters::{
22 CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
23 ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
24 StartContainerOptions,
25};
26use bollard::service::{HostConfig, PortBinding};
27use camel_component_api::parse_uri;
28use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
29use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
30use tower::Service;
31
32static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
35 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
36
37fn track_container(id: String) {
39 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
40 tracker.insert(id);
41 }
42}
43
44fn untrack_container(id: &str) {
46 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
47 tracker.remove(id);
48 }
49}
50
51pub async fn cleanup_tracked_containers() {
53 let ids: Vec<String> = {
54 match CONTAINER_TRACKER.lock() {
55 Ok(tracker) => tracker.iter().cloned().collect(),
56 Err(_) => return,
57 }
58 };
59
60 if ids.is_empty() {
61 return;
62 }
63
64 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
65
66 let docker = match Docker::connect_with_local_defaults() {
67 Ok(d) => d,
68 Err(e) => {
69 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
70 return;
71 }
72 };
73
74 for id in ids {
75 match docker
76 .remove_container(
77 &id,
78 Some(RemoveContainerOptions {
79 force: true,
80 ..Default::default()
81 }),
82 )
83 .await
84 {
85 Ok(_) => {
86 tracing::debug!("Cleaned up container {}", id);
87 untrack_container(&id);
88 }
89 Err(e) => {
90 tracing::warn!("Failed to cleanup container {}: {}", id, e);
91 }
92 }
93 }
94}
95
96const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
100
101pub const HEADER_ACTION: &str = "CamelContainerAction";
103
104pub const HEADER_IMAGE: &str = "CamelContainerImage";
106
107pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
109
110pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
112
113pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
115
116pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
118
119pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
121
122pub const HEADER_CMD: &str = "CamelContainerCmd";
124
125pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
127
128pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
130
131pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
133
134pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
136
137#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
145#[serde(default)]
146pub struct ContainerGlobalConfig {
147 pub docker_host: String,
149}
150
151impl Default for ContainerGlobalConfig {
152 fn default() -> Self {
153 Self {
154 docker_host: "unix:///var/run/docker.sock".to_string(),
155 }
156 }
157}
158
159impl ContainerGlobalConfig {
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
165 self.docker_host = v.into();
166 self
167 }
168}
169
170#[derive(Debug, Clone)]
179pub struct ContainerConfig {
180 pub operation: String,
182 pub image: Option<String>,
184 pub name: Option<String>,
186 pub host: Option<String>,
188 pub cmd: Option<String>,
190 pub ports: Option<String>,
192 pub env: Option<String>,
194 pub network: Option<String>,
196 pub container_id: Option<String>,
198 pub follow: bool,
200 pub timestamps: bool,
202 pub tail: Option<String>,
204 pub auto_pull: bool,
206 pub auto_remove: bool,
208 pub volumes: Option<String>,
210 pub user: Option<String>,
212 pub workdir: Option<String>,
214 pub detach: bool,
216 pub driver: Option<String>,
218 pub force: bool,
220}
221
222impl ContainerConfig {
223 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
231 let parts = parse_uri(uri)?;
232 if parts.scheme != "container" {
233 return Err(CamelError::InvalidUri(format!(
234 "expected scheme 'container', got '{}'",
235 parts.scheme
236 )));
237 }
238
239 let image = parts.params.get("image").cloned();
240 let name = parts.params.get("name").cloned();
241 let cmd = parts.params.get("cmd").cloned();
242 let ports = parts.params.get("ports").cloned();
243 let env = parts.params.get("env").cloned();
244 let network = parts.params.get("network").cloned();
245 let container_id = parts.params.get("containerId").cloned();
246 let follow = parts
247 .params
248 .get("follow")
249 .map(|v| v.eq_ignore_ascii_case("true"))
250 .unwrap_or(true);
251 let timestamps = parts
252 .params
253 .get("timestamps")
254 .map(|v| v.eq_ignore_ascii_case("true"))
255 .unwrap_or(false);
256 let tail = parts.params.get("tail").cloned();
257 let auto_pull = parts
258 .params
259 .get("autoPull")
260 .map(|v| v.eq_ignore_ascii_case("true"))
261 .unwrap_or(true);
262 let auto_remove = parts
263 .params
264 .get("autoRemove")
265 .map(|v| v.eq_ignore_ascii_case("true"))
266 .unwrap_or(true);
267 let host = parts.params.get("host").cloned();
269 let volumes = parts.params.get("volumes").cloned();
270 let user = parts.params.get("user").cloned();
271 let workdir = parts.params.get("workdir").cloned();
272 let detach = parts
273 .params
274 .get("detach")
275 .map(|v| v.eq_ignore_ascii_case("true"))
276 .unwrap_or(false);
277 let driver = parts.params.get("driver").cloned();
278 let force = parts
279 .params
280 .get("force")
281 .map(|v| v.eq_ignore_ascii_case("true"))
282 .unwrap_or(false);
283
284 Ok(Self {
285 operation: parts.path,
286 image,
287 name,
288 host,
289 cmd,
290 ports,
291 env,
292 network,
293 container_id,
294 follow,
295 timestamps,
296 tail,
297 auto_pull,
298 auto_remove,
299 volumes,
300 user,
301 workdir,
302 detach,
303 driver,
304 force,
305 })
306 }
307
308 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
311 if self.host.is_none() {
312 self.host = Some(global.docker_host.clone());
313 }
314 }
315
316 fn docker_socket_path(&self) -> Result<&str, CamelError> {
317 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
318 "npipe:////./pipe/docker_engine"
319 } else {
320 "unix:///var/run/docker.sock"
321 });
322
323 if host.starts_with("unix://") || host.starts_with("npipe://") {
324 return Ok(host);
325 }
326
327 if host.contains("://") {
328 return Err(CamelError::ProcessorError(format!(
329 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
330 host
331 )));
332 }
333
334 Ok(host)
335 }
336
337 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
338 let socket_path = self.docker_socket_path()?;
339 Docker::connect_with_socket(
340 socket_path,
341 DOCKER_CONNECT_TIMEOUT_SECS,
342 bollard::API_DEFAULT_VERSION,
343 )
344 .map_err(|e| {
345 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
346 })
347 }
348
349 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
357 let docker = self.connect_docker_client()?;
358 docker
359 .ping()
360 .await
361 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
362 Ok(docker)
363 }
364
365 #[allow(clippy::type_complexity)]
366 fn parse_ports(&self) -> Option<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>)> {
367 let ports_str = self.ports.as_ref()?;
368
369 let mut exposed_ports: Vec<String> = Vec::new();
370 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
371
372 for mapping in ports_str.split(',') {
373 let mapping = mapping.trim();
374 if mapping.is_empty() {
375 continue;
376 }
377
378 let (host_port, container_spec) = mapping.split_once(':')?;
379
380 let (container_port, protocol) = if container_spec.contains('/') {
381 let parts: Vec<&str> = container_spec.split('/').collect();
382 (parts[0], parts[1])
383 } else {
384 (container_spec, "tcp")
385 };
386
387 let container_key = format!("{}/{}", container_port, protocol);
388
389 exposed_ports.push(container_key.clone());
390
391 port_bindings.insert(
392 container_key,
393 Some(vec![PortBinding {
394 host_ip: None,
395 host_port: Some(host_port.to_string()),
396 }]),
397 );
398 }
399
400 if exposed_ports.is_empty() {
401 None
402 } else {
403 Some((exposed_ports, port_bindings))
404 }
405 }
406
407 fn parse_env(&self) -> Option<Vec<String>> {
408 let env_str = self.env.as_ref()?;
409
410 let env_vars: Vec<String> = env_str
411 .split(',')
412 .map(|s| s.trim().to_string())
413 .filter(|s| !s.is_empty())
414 .collect();
415
416 if env_vars.is_empty() {
417 None
418 } else {
419 Some(env_vars)
420 }
421 }
422
423 #[cfg(test)]
424 #[allow(clippy::type_complexity)]
425 fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
426 self.volumes.as_deref().and_then(parse_volume_str)
427 }
428}
429
430#[allow(clippy::type_complexity)]
435fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
436 let mut binds: Vec<String> = Vec::new();
437 let mut anonymous_volumes: Vec<String> = Vec::new();
438
439 for entry in volumes_str.split(',') {
440 let entry = entry.trim();
441 if entry.is_empty() {
442 continue;
443 }
444
445 let segments: Vec<&str> = entry.split(':').collect();
446
447 match segments.len() {
448 3 => {
449 let source = segments[0];
450 let target = segments[1];
451 let mode = segments[2];
452 if mode != "ro" && mode != "rw" {
453 continue;
454 }
455 binds.push(format!("{}:{}:{}", source, target, mode));
456 }
457 2 => {
458 let a = segments[0];
459 let b = segments[1];
460 if b == "ro" || b == "rw" {
461 anonymous_volumes.push(a.to_string());
462 } else {
463 binds.push(format!("{}:{}", a, b));
464 }
465 }
466 1 => {
467 anonymous_volumes.push(segments[0].to_string());
468 }
469 _ => continue,
470 }
471 }
472
473 if binds.is_empty() && anonymous_volumes.is_empty() {
474 None
475 } else {
476 Some((binds, anonymous_volumes))
477 }
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481enum ProducerOperation {
482 List,
483 Run,
484 Start,
485 Stop,
486 Remove,
487 Exec,
488 NetworkCreate,
489 NetworkConnect,
490 NetworkDisconnect,
491 NetworkRemove,
492 NetworkList,
493}
494
495fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
496 match operation {
497 "list" => Ok(ProducerOperation::List),
498 "run" => Ok(ProducerOperation::Run),
499 "start" => Ok(ProducerOperation::Start),
500 "stop" => Ok(ProducerOperation::Stop),
501 "remove" => Ok(ProducerOperation::Remove),
502 "exec" => Ok(ProducerOperation::Exec),
503 "network-create" => Ok(ProducerOperation::NetworkCreate),
504 "network-connect" => Ok(ProducerOperation::NetworkConnect),
505 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
506 "network-remove" => Ok(ProducerOperation::NetworkRemove),
507 "network-list" => Ok(ProducerOperation::NetworkList),
508 _ => Err(CamelError::ProcessorError(format!(
509 "Unknown container operation: {}",
510 operation
511 ))),
512 }
513}
514
515fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
516 exchange
517 .input
518 .header(HEADER_CONTAINER_NAME)
519 .and_then(|v| v.as_str().map(|s| s.to_string()))
520 .or_else(|| config.name.clone())
521}
522
523async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
524 let images = docker
525 .list_images(None::<ListImagesOptions>)
526 .await
527 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
528
529 Ok(images.iter().any(|img| {
530 img.repo_tags
531 .iter()
532 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
533 }))
534}
535
536async fn pull_image_with_progress(
537 docker: &Docker,
538 image: &str,
539 timeout_secs: u64,
540) -> Result<(), CamelError> {
541 use futures::StreamExt;
542
543 tracing::info!("Pulling image: {}", image);
544
545 let mut stream = docker.create_image(
546 Some(CreateImageOptions {
547 from_image: Some(image.to_string()),
548 ..Default::default()
549 }),
550 None,
551 None,
552 );
553
554 let start = std::time::Instant::now();
555 let mut last_progress = std::time::Instant::now();
556
557 while let Some(item) = stream.next().await {
558 if start.elapsed().as_secs() > timeout_secs {
559 return Err(CamelError::ProcessorError(format!(
560 "Image pull timeout after {}s. Try manually: docker pull {}",
561 timeout_secs, image
562 )));
563 }
564
565 match item {
566 Ok(update) => {
567 if last_progress.elapsed().as_secs() >= 2 {
569 if let Some(status) = update.status {
570 tracing::debug!("Pull progress: {}", status);
571 }
572 last_progress = std::time::Instant::now();
573 }
574 }
575 Err(e) => {
576 let err_str = e.to_string().to_lowercase();
577 if err_str.contains("unauthorized") || err_str.contains("401") {
578 return Err(CamelError::ProcessorError(format!(
579 "Authentication required for image '{}'. Configure Docker credentials: docker login",
580 image
581 )));
582 }
583 if err_str.contains("not found") || err_str.contains("404") {
584 return Err(CamelError::ProcessorError(format!(
585 "Image '{}' not found in registry. Check the image name and tag",
586 image
587 )));
588 }
589 return Err(CamelError::ProcessorError(format!(
590 "Failed to pull image '{}': {}",
591 image, e
592 )));
593 }
594 }
595 }
596
597 tracing::info!("Successfully pulled image: {}", image);
598 Ok(())
599}
600
601async fn ensure_image_available(
602 docker: &Docker,
603 image: &str,
604 auto_pull: bool,
605 timeout_secs: u64,
606) -> Result<(), CamelError> {
607 if image_exists_locally(docker, image).await? {
608 tracing::debug!("Image '{}' already available locally", image);
609 return Ok(());
610 }
611
612 if !auto_pull {
613 return Err(CamelError::ProcessorError(format!(
614 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
615 image, image
616 )));
617 }
618
619 pull_image_with_progress(docker, image, timeout_secs).await
620}
621
622fn format_docker_event(event: &bollard::models::EventMessage) -> String {
623 let action = event.action.as_deref().unwrap_or("unknown");
624 let actor = event.actor.as_ref();
625
626 let container_name = actor
627 .and_then(|a| a.attributes.as_ref())
628 .and_then(|attrs| attrs.get("name"))
629 .map(|s| s.as_str())
630 .unwrap_or("unknown");
631
632 let image = actor
633 .and_then(|a| a.attributes.as_ref())
634 .and_then(|attrs| attrs.get("image"))
635 .map(|s| s.as_str())
636 .unwrap_or("");
637
638 let exit_code = actor
639 .and_then(|a| a.attributes.as_ref())
640 .and_then(|attrs| attrs.get("exitCode"))
641 .map(|s| s.as_str());
642
643 match action {
644 "create" => {
645 if image.is_empty() {
646 format!("[CREATE] Container {}", container_name)
647 } else {
648 format!("[CREATE] Container {} ({})", container_name, image)
649 }
650 }
651 "start" => format!("[START] Container {}", container_name),
652 "die" => {
653 if let Some(code) = exit_code {
654 format!("[DIE] Container {} (exit: {})", container_name, code)
655 } else {
656 format!("[DIE] Container {}", container_name)
657 }
658 }
659 "destroy" => format!("[DESTROY] Container {}", container_name),
660 "stop" => format!("[STOP] Container {}", container_name),
661 "pause" => format!("[PAUSE] Container {}", container_name),
662 "unpause" => format!("[UNPAUSE] Container {}", container_name),
663 "restart" => format!("[RESTART] Container {}", container_name),
664 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
665 }
666}
667
668async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
669 create: CreateFn,
670 start: StartFn,
671 remove: RemoveFn,
672) -> Result<String, CamelError>
673where
674 CreateFn: FnOnce() -> CreateFut,
675 CreateFut: Future<Output = Result<String, CamelError>>,
676 StartFn: FnOnce(String) -> StartFut,
677 StartFut: Future<Output = Result<(), CamelError>>,
678 RemoveFn: FnOnce(String) -> RemoveFut,
679 RemoveFut: Future<Output = Result<(), CamelError>>,
680{
681 let container_id = create().await?;
682 if let Err(start_err) = start(container_id.clone()).await {
683 if let Err(remove_err) = remove(container_id.clone()).await {
684 return Err(CamelError::ProcessorError(format!(
685 "Failed to start container: {}. Cleanup failed: {}",
686 start_err, remove_err
687 )));
688 }
689 return Err(start_err);
690 }
691
692 Ok(container_id)
693}
694
695async fn handle_list(
696 docker: Docker,
697 _config: ContainerConfig,
698 exchange: &mut Exchange,
699) -> Result<(), CamelError> {
700 let containers = docker
701 .list_containers(None::<ListContainersOptions>)
702 .await
703 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
704
705 let json_value = serde_json::to_value(&containers).map_err(|e| {
706 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
707 })?;
708
709 exchange.input.body = Body::Json(json_value);
710 exchange.input.set_header(
711 HEADER_ACTION_RESULT,
712 serde_json::Value::String("success".to_string()),
713 );
714 Ok(())
715}
716
717async fn handle_run(
718 docker: Docker,
719 config: ContainerConfig,
720 exchange: &mut Exchange,
721) -> Result<(), CamelError> {
722 let image = exchange
723 .input
724 .header(HEADER_IMAGE)
725 .and_then(|v| v.as_str().map(|s| s.to_string()))
726 .or(config.image.clone())
727 .ok_or_else(|| {
728 CamelError::ProcessorError(
729 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
730 )
731 })?;
732
733 let image = if !image.contains(':') && !image.contains('@') {
734 format!("{}:latest", image)
735 } else {
736 image
737 };
738
739 let pull_timeout = 300;
740 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
741 .await
742 .map_err(|e| {
743 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
744 })?;
745
746 let container_name = resolve_container_name(exchange, &config);
747 let container_name_ref = container_name.as_deref().unwrap_or("");
748 let cmd_parts: Option<Vec<String>> = config
749 .cmd
750 .as_ref()
751 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
752 let auto_remove = config.auto_remove;
753 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
754 let env_vars = config.parse_env();
755 let network_mode = config.network.clone();
756
757 let volumes_str = exchange
758 .input
759 .header(HEADER_VOLUMES)
760 .and_then(|v| v.as_str().map(|s| s.to_string()))
761 .or(config.volumes.clone());
762 let (binds, anon_volumes) = volumes_str
763 .as_deref()
764 .and_then(parse_volume_str)
765 .unwrap_or_default();
766
767 let docker_create = docker.clone();
768 let docker_start = docker.clone();
769 let docker_remove = docker.clone();
770
771 let container_id = run_container_with_cleanup(
772 move || async move {
773 let create_options = CreateContainerOptions {
774 name: Some(container_name_ref.to_string()),
775 ..Default::default()
776 };
777 let container_config = ContainerCreateBody {
778 image: Some(image.clone()),
779 cmd: cmd_parts,
780 env: env_vars,
781 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
782 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
783 host_config: Some(HostConfig {
784 auto_remove: Some(auto_remove),
785 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
786 network_mode,
787 binds: if binds.is_empty() { None } else { Some(binds) },
788 ..Default::default()
789 }),
790 ..Default::default()
791 };
792
793 let create_response = docker_create
794 .create_container(Some(create_options), container_config)
795 .await
796 .map_err(|e| {
797 let err_str = e.to_string().to_lowercase();
798 if err_str.contains("409") || err_str.contains("conflict") {
799 CamelError::ProcessorError(format!(
800 "Container name '{}' already exists. Use a unique name or remove the existing container first",
801 container_name_ref
802 ))
803 } else {
804 CamelError::ProcessorError(format!(
805 "Failed to create container: {}",
806 e
807 ))
808 }
809 })?;
810
811 Ok(create_response.id)
812 },
813 move |container_id| async move {
814 docker_start
815 .start_container(&container_id, None::<StartContainerOptions>)
816 .await
817 .map_err(|e| {
818 CamelError::ProcessorError(format!(
819 "Failed to start container: {}",
820 e
821 ))
822 })
823 },
824 move |container_id| async move {
825 docker_remove
826 .remove_container(&container_id, None)
827 .await
828 .map_err(|e| {
829 CamelError::ProcessorError(format!(
830 "Failed to remove container after start failure: {}",
831 e
832 ))
833 })
834 },
835 )
836 .await?;
837
838 track_container(container_id.clone());
839
840 exchange
841 .input
842 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
843 exchange.input.set_header(
844 HEADER_ACTION_RESULT,
845 serde_json::Value::String("success".to_string()),
846 );
847 Ok(())
848}
849
850async fn handle_lifecycle(
851 docker: Docker,
852 _config: ContainerConfig,
853 exchange: &mut Exchange,
854 operation: ProducerOperation,
855 operation_name: &str,
856) -> Result<(), CamelError> {
857 let container_id = exchange
858 .input
859 .header(HEADER_CONTAINER_ID)
860 .and_then(|v| v.as_str().map(|s| s.to_string()))
861 .ok_or_else(|| {
862 CamelError::ProcessorError(format!(
863 "{} header is required for {} operation",
864 HEADER_CONTAINER_ID, operation_name
865 ))
866 })?;
867
868 match operation {
869 ProducerOperation::Start => {
870 docker
871 .start_container(&container_id, None::<StartContainerOptions>)
872 .await
873 .map_err(|e| {
874 CamelError::ProcessorError(format!("Failed to start container: {}", e))
875 })?;
876 }
877 ProducerOperation::Stop => {
878 docker
879 .stop_container(&container_id, None)
880 .await
881 .map_err(|e| {
882 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
883 })?;
884 }
885 ProducerOperation::Remove => {
886 docker
887 .remove_container(&container_id, None)
888 .await
889 .map_err(|e| {
890 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
891 })?;
892 untrack_container(&container_id);
893 }
894 _ => {}
895 }
896
897 exchange.input.set_header(
898 HEADER_ACTION_RESULT,
899 serde_json::Value::String("success".to_string()),
900 );
901 Ok(())
902}
903
904async fn handle_exec(
905 docker: Docker,
906 config: ContainerConfig,
907 exchange: &mut Exchange,
908) -> Result<(), CamelError> {
909 let container_id = exchange
910 .input
911 .header(HEADER_CONTAINER_ID)
912 .and_then(|v| v.as_str().map(|s| s.to_string()))
913 .or(config.container_id.clone())
914 .ok_or_else(|| {
915 CamelError::ProcessorError(format!(
916 "{} header or containerId param is required for exec operation",
917 HEADER_CONTAINER_ID
918 ))
919 })?;
920
921 let cmd = exchange
922 .input
923 .header(HEADER_CMD)
924 .and_then(|v| v.as_str().map(|s| s.to_string()))
925 .or(config.cmd.clone())
926 .ok_or_else(|| {
927 CamelError::ProcessorError(
928 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
929 )
930 })?;
931
932 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
933 let env_vars = config.parse_env();
934
935 let exec_config = bollard::exec::CreateExecOptions {
936 cmd: Some(cmd_parts),
937 env: env_vars,
938 user: config.user.clone(),
939 working_dir: config.workdir.clone(),
940 attach_stdout: Some(true),
941 attach_stderr: Some(true),
942 ..Default::default()
943 };
944
945 let create_result = docker
946 .create_exec(&container_id, exec_config)
947 .await
948 .map_err(|e| {
949 let err_str = e.to_string().to_lowercase();
950 if err_str.contains("404") || err_str.contains("no such") {
951 CamelError::ProcessorError(format!(
952 "Container '{}' not found for exec",
953 container_id
954 ))
955 } else {
956 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
957 }
958 })?;
959
960 let exec_id = create_result.id;
961
962 if config.detach {
963 docker
964 .start_exec(
965 &exec_id,
966 Some(bollard::exec::StartExecOptions {
967 detach: true,
968 ..Default::default()
969 }),
970 )
971 .await
972 .map_err(|e| {
973 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
974 })?;
975
976 exchange
977 .input
978 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
979 exchange
980 .input
981 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
982 } else {
983 let start_result = docker
984 .start_exec(&exec_id, None)
985 .await
986 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
987
988 let mut output = String::new();
989
990 match start_result {
991 bollard::exec::StartExecResults::Attached {
992 output: mut stream, ..
993 } => {
994 use futures::StreamExt;
995 while let Some(msg) = stream.next().await {
996 match msg {
997 Ok(bollard::container::LogOutput::StdOut { message }) => {
998 output.push_str(&String::from_utf8_lossy(&message));
999 }
1000 Ok(bollard::container::LogOutput::StdErr { message }) => {
1001 output.push_str(&String::from_utf8_lossy(&message));
1002 }
1003 Ok(_) => {}
1004 Err(e) => {
1005 output.push_str(&format!("[error reading stream: {}]", e));
1006 }
1007 }
1008 }
1009 }
1010 bollard::exec::StartExecResults::Detached => {}
1011 }
1012
1013 let inspect = docker
1014 .inspect_exec(&exec_id)
1015 .await
1016 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1017
1018 let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1019
1020 let output = output.trim_end().to_string();
1021 exchange.input.body = Body::Text(output);
1022 exchange.input.set_header(
1023 HEADER_EXIT_CODE,
1024 serde_json::Value::Number(exit_code.into()),
1025 );
1026 exchange
1027 .input
1028 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1029 }
1030
1031 exchange.input.set_header(
1032 HEADER_ACTION_RESULT,
1033 serde_json::Value::String("success".to_string()),
1034 );
1035 Ok(())
1036}
1037
1038async fn handle_network_create(
1039 docker: Docker,
1040 config: ContainerConfig,
1041 exchange: &mut Exchange,
1042) -> Result<(), CamelError> {
1043 let network_name = exchange
1044 .input
1045 .header(HEADER_CONTAINER_NAME)
1046 .and_then(|v| v.as_str().map(|s| s.to_string()))
1047 .or(config.name.clone())
1048 .ok_or_else(|| {
1049 CamelError::ProcessorError(
1050 "CamelContainerName header or name param is required for network-create"
1051 .to_string(),
1052 )
1053 })?;
1054
1055 let driver = config.driver.as_deref().unwrap_or("bridge");
1056
1057 let options = NetworkCreateRequest {
1058 name: network_name.clone(),
1059 driver: Some(driver.to_string()),
1060 ..Default::default()
1061 };
1062
1063 let result = docker.create_network(options).await.map_err(|e| {
1064 let err_str = e.to_string().to_lowercase();
1065 if err_str.contains("409") || err_str.contains("already exists") {
1066 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1067 } else {
1068 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1069 }
1070 })?;
1071
1072 let network_id = result.id.clone();
1073 let json_value = serde_json::to_value(&result).map_err(|e| {
1074 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1075 })?;
1076
1077 exchange.input.body = Body::Json(json_value);
1078 exchange
1079 .input
1080 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1081 exchange.input.set_header(
1082 HEADER_ACTION_RESULT,
1083 serde_json::Value::String("success".to_string()),
1084 );
1085 Ok(())
1086}
1087
1088async fn handle_network_connect(
1089 docker: Docker,
1090 config: ContainerConfig,
1091 exchange: &mut Exchange,
1092) -> Result<(), CamelError> {
1093 let network = exchange
1094 .input
1095 .header(HEADER_NETWORK)
1096 .and_then(|v| v.as_str().map(|s| s.to_string()))
1097 .or(config.network.clone())
1098 .ok_or_else(|| {
1099 CamelError::ProcessorError(
1100 "CamelContainerNetwork header or network param is required for network-connect"
1101 .to_string(),
1102 )
1103 })?;
1104
1105 let container = exchange
1106 .input
1107 .header(HEADER_CONTAINER_ID)
1108 .and_then(|v| v.as_str().map(|s| s.to_string()))
1109 .or(config.container_id.clone())
1110 .ok_or_else(|| {
1111 CamelError::ProcessorError(
1112 "CamelContainerId header or container param is required for network-connect"
1113 .to_string(),
1114 )
1115 })?;
1116
1117 docker
1118 .connect_network(
1119 &network,
1120 NetworkConnectRequest {
1121 container,
1122 ..Default::default()
1123 },
1124 )
1125 .await
1126 .map_err(|e| {
1127 let err_str = e.to_string().to_lowercase();
1128 if err_str.contains("404") || err_str.contains("not found") {
1129 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1130 } else {
1131 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1132 }
1133 })?;
1134
1135 exchange.input.set_header(
1136 HEADER_ACTION_RESULT,
1137 serde_json::Value::String("success".to_string()),
1138 );
1139 Ok(())
1140}
1141
1142async fn handle_network_disconnect(
1143 docker: Docker,
1144 config: ContainerConfig,
1145 exchange: &mut Exchange,
1146) -> Result<(), CamelError> {
1147 let network = exchange
1148 .input
1149 .header(HEADER_NETWORK)
1150 .and_then(|v| v.as_str().map(|s| s.to_string()))
1151 .or(config.network.clone())
1152 .ok_or_else(|| {
1153 CamelError::ProcessorError(
1154 "CamelContainerNetwork header or network param is required for network-disconnect"
1155 .to_string(),
1156 )
1157 })?;
1158
1159 let container = exchange
1160 .input
1161 .header(HEADER_CONTAINER_ID)
1162 .and_then(|v| v.as_str().map(|s| s.to_string()))
1163 .or(config.container_id.clone())
1164 .ok_or_else(|| {
1165 CamelError::ProcessorError(
1166 "CamelContainerId header or container param is required for network-disconnect"
1167 .to_string(),
1168 )
1169 })?;
1170
1171 docker
1172 .disconnect_network(
1173 &network,
1174 NetworkDisconnectRequest {
1175 container,
1176 force: Some(config.force),
1177 },
1178 )
1179 .await
1180 .map_err(|e| {
1181 let err_str = e.to_string().to_lowercase();
1182 if err_str.contains("404") || err_str.contains("not found") {
1183 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1184 } else {
1185 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1186 }
1187 })?;
1188
1189 exchange.input.set_header(
1190 HEADER_ACTION_RESULT,
1191 serde_json::Value::String("success".to_string()),
1192 );
1193 Ok(())
1194}
1195
1196async fn handle_network_remove(
1197 docker: Docker,
1198 config: ContainerConfig,
1199 exchange: &mut Exchange,
1200) -> Result<(), CamelError> {
1201 let network = exchange
1202 .input
1203 .header(HEADER_NETWORK)
1204 .and_then(|v| v.as_str().map(|s| s.to_string()))
1205 .or(config.network.clone())
1206 .ok_or_else(|| {
1207 CamelError::ProcessorError(
1208 "CamelContainerNetwork header or network param is required for network-remove"
1209 .to_string(),
1210 )
1211 })?;
1212
1213 docker.remove_network(&network).await.map_err(|e| {
1214 let err_str = e.to_string().to_lowercase();
1215 if err_str.contains("404") || err_str.contains("not found") {
1216 CamelError::ProcessorError(format!("Network '{}' not found", network))
1217 } else if err_str.contains("409") || err_str.contains("in use") {
1218 CamelError::ProcessorError(format!(
1219 "Network '{}' is in use and cannot be removed",
1220 network
1221 ))
1222 } else {
1223 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1224 }
1225 })?;
1226
1227 exchange.input.set_header(
1228 HEADER_ACTION_RESULT,
1229 serde_json::Value::String("success".to_string()),
1230 );
1231 Ok(())
1232}
1233
1234async fn handle_network_list(
1235 docker: Docker,
1236 _config: ContainerConfig,
1237 exchange: &mut Exchange,
1238) -> Result<(), CamelError> {
1239 let networks = docker
1240 .list_networks(None::<ListNetworksOptions>)
1241 .await
1242 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1243
1244 let json_value = serde_json::to_value(&networks)
1245 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1246
1247 exchange.input.body = Body::Json(json_value);
1248 exchange.input.set_header(
1249 HEADER_ACTION_RESULT,
1250 serde_json::Value::String("success".to_string()),
1251 );
1252 Ok(())
1253}
1254
1255#[derive(Clone)]
1260pub struct ContainerProducer {
1261 config: ContainerConfig,
1262 docker: Docker,
1263}
1264
1265impl Service<Exchange> for ContainerProducer {
1266 type Response = Exchange;
1267 type Error = CamelError;
1268 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1269
1270 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1271 Poll::Ready(Ok(()))
1272 }
1273
1274 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1275 let config = self.config.clone();
1276 let docker = self.docker.clone();
1277 Box::pin(async move {
1278 let operation_name = exchange
1279 .input
1280 .header(HEADER_ACTION)
1281 .and_then(|v| v.as_str().map(|s| s.to_string()))
1282 .unwrap_or_else(|| config.operation.clone());
1283
1284 let operation = parse_producer_operation(&operation_name)?;
1285
1286 match operation {
1287 ProducerOperation::List => {
1288 handle_list(docker, config, &mut exchange).await?;
1289 }
1290 ProducerOperation::Run => {
1291 handle_run(docker, config, &mut exchange).await?;
1292 }
1293 ProducerOperation::Start => {
1294 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1295 .await?;
1296 }
1297 ProducerOperation::Stop => {
1298 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1299 .await?;
1300 }
1301 ProducerOperation::Remove => {
1302 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1303 .await?;
1304 }
1305 ProducerOperation::Exec => {
1306 handle_exec(docker, config, &mut exchange).await?;
1307 }
1308 ProducerOperation::NetworkCreate => {
1309 handle_network_create(docker, config, &mut exchange).await?;
1310 }
1311 ProducerOperation::NetworkConnect => {
1312 handle_network_connect(docker, config, &mut exchange).await?;
1313 }
1314 ProducerOperation::NetworkDisconnect => {
1315 handle_network_disconnect(docker, config, &mut exchange).await?;
1316 }
1317 ProducerOperation::NetworkRemove => {
1318 handle_network_remove(docker, config, &mut exchange).await?;
1319 }
1320 ProducerOperation::NetworkList => {
1321 handle_network_list(docker, config, &mut exchange).await?;
1322 }
1323 }
1324
1325 Ok(exchange)
1326 })
1327 }
1328}
1329
1330pub struct ContainerConsumer {
1335 config: ContainerConfig,
1336}
1337
1338#[async_trait]
1339impl Consumer for ContainerConsumer {
1340 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1341 match self.config.operation.as_str() {
1342 "events" => self.start_events_consumer(context).await,
1343 "logs" => self.start_logs_consumer(context).await,
1344 _ => Err(CamelError::EndpointCreationFailed(format!(
1345 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1346 self.config.operation
1347 ))),
1348 }
1349 }
1350
1351 async fn stop(&mut self) -> Result<(), CamelError> {
1352 Ok(())
1353 }
1354
1355 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1356 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1357 }
1358}
1359
1360impl ContainerConsumer {
1361 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1362 use futures::StreamExt;
1363
1364 loop {
1365 if context.is_cancelled() {
1366 tracing::info!("Container events consumer shutting down");
1367 return Ok(());
1368 }
1369
1370 let docker = match self.config.connect_docker().await {
1371 Ok(d) => d,
1372 Err(e) => {
1373 tracing::error!(
1374 "Consumer failed to connect to docker: {}. Retrying in 5s...",
1375 e
1376 );
1377 tokio::select! {
1378 _ = context.cancelled() => {
1379 tracing::info!("Container events consumer shutting down");
1380 return Ok(());
1381 }
1382 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1383 }
1384 continue;
1385 }
1386 };
1387
1388 let mut event_stream = docker.events(None::<EventsOptions>);
1389
1390 loop {
1391 tokio::select! {
1392 _ = context.cancelled() => {
1393 tracing::info!("Container events consumer shutting down");
1394 return Ok(());
1395 }
1396
1397 msg = event_stream.next() => {
1398 match msg {
1399 Some(Ok(event)) => {
1400 let formatted = format_docker_event(&event);
1401 let message = Message::new(Body::Text(formatted));
1402 let exchange = Exchange::new(message);
1403
1404 if let Err(e) = context.send(exchange).await {
1405 tracing::error!("Failed to send exchange: {:?}", e);
1406 break;
1407 }
1408 }
1409 Some(Err(e)) => {
1410 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1411 break;
1412 }
1413 None => {
1414 tracing::info!("Docker event stream ended. Reconnecting...");
1415 break;
1416 }
1417 }
1418 }
1419 }
1420 }
1421
1422 tokio::select! {
1423 _ = context.cancelled() => {
1424 tracing::info!("Container events consumer shutting down");
1425 return Ok(());
1426 }
1427 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1428 }
1429 }
1430 }
1431
1432 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1433 use futures::StreamExt;
1434
1435 let container_id = self.config.container_id.clone().ok_or_else(|| {
1436 CamelError::EndpointCreationFailed(
1437 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1438 .to_string(),
1439 )
1440 })?;
1441
1442 loop {
1443 if context.is_cancelled() {
1444 tracing::info!("Container logs consumer shutting down");
1445 return Ok(());
1446 }
1447
1448 let docker = match self.config.connect_docker().await {
1449 Ok(d) => d,
1450 Err(e) => {
1451 tracing::error!(
1452 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1453 e
1454 );
1455 tokio::select! {
1456 _ = context.cancelled() => {
1457 tracing::info!("Container logs consumer shutting down");
1458 return Ok(());
1459 }
1460 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1461 }
1462 continue;
1463 }
1464 };
1465
1466 let tail = self
1467 .config
1468 .tail
1469 .clone()
1470 .unwrap_or_else(|| "all".to_string());
1471
1472 let options = LogsOptions {
1473 follow: self.config.follow,
1474 stdout: true,
1475 stderr: true,
1476 timestamps: self.config.timestamps,
1477 tail,
1478 ..Default::default()
1479 };
1480
1481 let mut log_stream = docker.logs(&container_id, Some(options));
1482 let container_id_header = container_id.clone();
1483
1484 loop {
1485 tokio::select! {
1486 _ = context.cancelled() => {
1487 tracing::info!("Container logs consumer shutting down");
1488 return Ok(());
1489 }
1490
1491 msg = log_stream.next() => {
1492 match msg {
1493 Some(Ok(log_output)) => {
1494 let (stream_type, content) = match log_output {
1495 bollard::container::LogOutput::StdOut { message } => {
1496 ("stdout", String::from_utf8_lossy(&message).into_owned())
1497 }
1498 bollard::container::LogOutput::StdErr { message } => {
1499 ("stderr", String::from_utf8_lossy(&message).into_owned())
1500 }
1501 bollard::container::LogOutput::Console { message } => {
1502 ("console", String::from_utf8_lossy(&message).into_owned())
1503 }
1504 bollard::container::LogOutput::StdIn { message } => {
1505 ("stdin", String::from_utf8_lossy(&message).into_owned())
1506 }
1507 };
1508
1509 let content = content.trim_end();
1510 if content.is_empty() {
1511 continue;
1512 }
1513
1514 let mut message = Message::new(Body::Text(content.to_string()));
1515 message.set_header(
1516 HEADER_CONTAINER_ID,
1517 serde_json::Value::String(container_id_header.clone()),
1518 );
1519 message.set_header(
1520 HEADER_LOG_STREAM,
1521 serde_json::Value::String(stream_type.to_string()),
1522 );
1523
1524 if self.config.timestamps
1525 && let Some(ts) = extract_timestamp(content) {
1526 message.set_header(
1527 HEADER_LOG_TIMESTAMP,
1528 serde_json::Value::String(ts),
1529 );
1530 }
1531
1532 let exchange = Exchange::new(message);
1533
1534 if let Err(e) = context.send(exchange).await {
1535 tracing::error!("Failed to send log exchange: {:?}", e);
1536 break;
1537 }
1538 }
1539 Some(Err(e)) => {
1540 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1541 break;
1542 }
1543 None => {
1544 if self.config.follow {
1545 tracing::info!("Docker log stream ended. Reconnecting...");
1546 break;
1547 } else {
1548 tracing::info!("Container logs consumer finished (follow=false)");
1549 return Ok(());
1550 }
1551 }
1552 }
1553 }
1554 }
1555 }
1556
1557 tokio::select! {
1558 _ = context.cancelled() => {
1559 tracing::info!("Container logs consumer shutting down");
1560 return Ok(());
1561 }
1562 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1563 }
1564 }
1565 }
1566}
1567
1568fn extract_timestamp(log_line: &str) -> Option<String> {
1569 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1570 if parts.len() > 1 && parts[0].contains('T') {
1571 Some(parts[0].to_string())
1572 } else {
1573 None
1574 }
1575}
1576
1577pub struct ContainerComponent {
1585 config: Option<ContainerGlobalConfig>,
1586}
1587
1588impl ContainerComponent {
1589 pub fn new() -> Self {
1591 Self { config: None }
1592 }
1593
1594 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1596 Self {
1597 config: Some(config),
1598 }
1599 }
1600
1601 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1603 Self { config }
1604 }
1605}
1606
1607impl Default for ContainerComponent {
1608 fn default() -> Self {
1609 Self::new()
1610 }
1611}
1612
1613impl Component for ContainerComponent {
1614 fn scheme(&self) -> &str {
1615 "container"
1616 }
1617
1618 fn create_endpoint(
1619 &self,
1620 uri: &str,
1621 _ctx: &dyn camel_component_api::ComponentContext,
1622 ) -> Result<Box<dyn Endpoint>, CamelError> {
1623 let mut config = ContainerConfig::from_uri(uri)?;
1624 if let Some(ref global) = self.config {
1626 config.apply_global_defaults(global);
1627 }
1628 Ok(Box::new(ContainerEndpoint {
1629 uri: uri.to_string(),
1630 config,
1631 }))
1632 }
1633}
1634
1635pub struct ContainerEndpoint {
1640 uri: String,
1641 config: ContainerConfig,
1642}
1643
1644impl ContainerEndpoint {
1645 pub fn docker_host(&self) -> Option<&str> {
1648 self.config.host.as_deref()
1649 }
1650}
1651
1652impl Endpoint for ContainerEndpoint {
1653 fn uri(&self) -> &str {
1654 &self.uri
1655 }
1656
1657 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1658 Ok(Box::new(ContainerConsumer {
1659 config: self.config.clone(),
1660 }))
1661 }
1662
1663 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1664 let docker = self.config.connect_docker_client()?;
1665 Ok(BoxProcessor::new(ContainerProducer {
1666 config: self.config.clone(),
1667 docker,
1668 }))
1669 }
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674 use super::*;
1675 use camel_component_api::NoOpComponentContext;
1676
1677 #[test]
1678 fn test_container_config() {
1679 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1680 assert_eq!(config.operation, "run");
1681 assert_eq!(config.image.as_deref(), Some("alpine"));
1682 assert!(config.host.is_none());
1684 }
1685
1686 #[test]
1687 fn test_global_config_applied_to_endpoint() {
1688 let global =
1691 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1692 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1693 assert!(
1694 config.host.is_none(),
1695 "URI without ?host= should leave host as None"
1696 );
1697 config.apply_global_defaults(&global);
1698 assert_eq!(
1699 config.host.as_deref(),
1700 Some("unix:///custom/docker.sock"),
1701 "global docker_host must be applied when URI did not set host"
1702 );
1703 }
1704
1705 #[test]
1706 fn test_uri_param_wins_over_global_config() {
1707 let global =
1709 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1710 let mut config =
1711 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1712 .unwrap();
1713 assert_eq!(
1714 config.host.as_deref(),
1715 Some("unix:///override.sock"),
1716 "URI-set host should be parsed correctly"
1717 );
1718 config.apply_global_defaults(&global);
1719 assert_eq!(
1720 config.host.as_deref(),
1721 Some("unix:///override.sock"),
1722 "global config must NOT override a host already set by URI"
1723 );
1724 }
1725
1726 #[test]
1727 fn test_container_config_parses_name() {
1728 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1729 assert_eq!(config.name.as_deref(), Some("my-container"));
1730 }
1731
1732 #[test]
1733 fn test_parse_producer_operation_known() {
1734 assert_eq!(
1735 parse_producer_operation("list").unwrap(),
1736 ProducerOperation::List
1737 );
1738 assert_eq!(
1739 parse_producer_operation("run").unwrap(),
1740 ProducerOperation::Run
1741 );
1742 assert_eq!(
1743 parse_producer_operation("start").unwrap(),
1744 ProducerOperation::Start
1745 );
1746 assert_eq!(
1747 parse_producer_operation("stop").unwrap(),
1748 ProducerOperation::Stop
1749 );
1750 assert_eq!(
1751 parse_producer_operation("remove").unwrap(),
1752 ProducerOperation::Remove
1753 );
1754 }
1755
1756 #[test]
1757 fn test_parse_producer_operation_unknown() {
1758 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1759 match err {
1760 CamelError::ProcessorError(msg) => {
1761 assert!(
1762 msg.contains("Unknown container operation"),
1763 "Unexpected error message: {}",
1764 msg
1765 );
1766 }
1767 _ => panic!("Expected ProcessorError for unknown operation"),
1768 }
1769 }
1770
1771 #[test]
1772 fn test_parse_producer_operation_new_variants() {
1773 assert_eq!(
1774 parse_producer_operation("exec").unwrap(),
1775 ProducerOperation::Exec
1776 );
1777 assert_eq!(
1778 parse_producer_operation("network-create").unwrap(),
1779 ProducerOperation::NetworkCreate
1780 );
1781 assert_eq!(
1782 parse_producer_operation("network-connect").unwrap(),
1783 ProducerOperation::NetworkConnect
1784 );
1785 assert_eq!(
1786 parse_producer_operation("network-disconnect").unwrap(),
1787 ProducerOperation::NetworkDisconnect
1788 );
1789 assert_eq!(
1790 parse_producer_operation("network-remove").unwrap(),
1791 ProducerOperation::NetworkRemove
1792 );
1793 assert_eq!(
1794 parse_producer_operation("network-list").unwrap(),
1795 ProducerOperation::NetworkList
1796 );
1797 }
1798
1799 #[test]
1800 fn test_resolve_container_name_header_overrides_config() {
1801 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1802 let mut exchange = Exchange::new(Message::new(""));
1803 exchange.input.set_header(
1804 HEADER_CONTAINER_NAME,
1805 serde_json::Value::String("header-name".to_string()),
1806 );
1807
1808 let resolved = resolve_container_name(&exchange, &config);
1809 assert_eq!(resolved.as_deref(), Some("header-name"));
1810 }
1811
1812 #[test]
1813 fn test_container_config_rejects_tcp_host() {
1814 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1815 let err = config.connect_docker_client().unwrap_err();
1816 match err {
1817 CamelError::ProcessorError(msg) => {
1818 assert!(
1819 msg.to_lowercase().contains("tcp"),
1820 "Expected TCP scheme error, got: {}",
1821 msg
1822 );
1823 }
1824 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1825 }
1826 }
1827
1828 #[tokio::test]
1829 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1830 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1831 let remove_called_clone = remove_called.clone();
1832
1833 let result = run_container_with_cleanup(
1834 || async { Ok("container-123".to_string()) },
1835 |_id| async move {
1836 Err(CamelError::ProcessorError(
1837 "Failed to start container".to_string(),
1838 ))
1839 },
1840 move |_id| {
1841 let remove_called_inner = remove_called_clone.clone();
1842 async move {
1843 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1844 Ok(())
1845 }
1846 },
1847 )
1848 .await;
1849
1850 assert!(result.is_err(), "Expected start failure to bubble up");
1851 assert!(
1852 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1853 "Expected cleanup to remove container"
1854 );
1855 }
1856
1857 #[test]
1858 fn test_container_component_creates_endpoint() {
1859 let component = ContainerComponent::new();
1860 assert_eq!(component.scheme(), "container");
1861 let ctx = NoOpComponentContext;
1862 let endpoint = component
1863 .create_endpoint("container:run?image=alpine", &ctx)
1864 .unwrap();
1865 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1866 }
1867
1868 #[test]
1869 fn test_container_config_parses_ports() {
1870 let config =
1871 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1872 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1873 }
1874
1875 #[test]
1876 fn test_container_config_parses_env() {
1877 let config =
1878 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1879 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1880 }
1881
1882 #[test]
1883 fn test_container_config_parses_logs_options() {
1884 let config = ContainerConfig::from_uri(
1885 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1886 )
1887 .unwrap();
1888 assert_eq!(config.operation, "logs");
1889 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1890 assert!(config.follow);
1891 assert!(config.timestamps);
1892 assert_eq!(config.tail.as_deref(), Some("100"));
1893 }
1894
1895 #[test]
1896 fn test_container_config_logs_defaults() {
1897 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1898 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1902
1903 #[test]
1904 fn test_parse_ports_single() {
1905 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1906 let (exposed, bindings) = config.parse_ports().unwrap();
1907
1908 assert!(exposed.contains(&"80/tcp".to_string()));
1909 assert!(bindings.contains_key("80/tcp"));
1910
1911 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1912 assert_eq!(binding.len(), 1);
1913 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1914 }
1915
1916 #[test]
1917 fn test_parse_ports_multiple() {
1918 let config =
1919 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1920 let (exposed, bindings) = config.parse_ports().unwrap();
1921
1922 assert!(exposed.contains(&"80/tcp".to_string()));
1923 assert!(exposed.contains(&"443/tcp".to_string()));
1924 assert_eq!(bindings.len(), 2);
1925 }
1926
1927 #[test]
1928 fn test_parse_ports_with_protocol() {
1929 let config =
1930 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1931 .unwrap();
1932 let (exposed, _bindings) = config.parse_ports().unwrap();
1933
1934 assert!(exposed.contains(&"80/tcp".to_string()));
1935 assert!(exposed.contains(&"53/udp".to_string()));
1936 }
1937
1938 #[test]
1939 fn test_parse_ports_none() {
1940 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1941 assert!(config.parse_ports().is_none());
1942 }
1943
1944 #[test]
1945 fn test_parse_env_single() {
1946 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1947 let env = config.parse_env().unwrap();
1948
1949 assert_eq!(env.len(), 1);
1950 assert_eq!(env[0], "FOO=bar");
1951 }
1952
1953 #[test]
1954 fn test_parse_env_multiple() {
1955 let config =
1956 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1957 .unwrap();
1958 let env = config.parse_env().unwrap();
1959
1960 assert_eq!(env.len(), 3);
1961 assert!(env.contains(&"FOO=bar".to_string()));
1962 assert!(env.contains(&"BAZ=qux".to_string()));
1963 assert!(env.contains(&"NUM=123".to_string()));
1964 }
1965
1966 #[test]
1967 fn test_parse_env_none() {
1968 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1969 assert!(config.parse_env().is_none());
1970 }
1971
1972 use camel_component_api::Message;
1973 use std::sync::Arc;
1974
1975 #[tokio::test]
1976 async fn test_container_producer_resolves_operation_from_header() {
1977 let docker = match Docker::connect_with_local_defaults() {
1979 Ok(d) => d,
1980 Err(_) => {
1981 eprintln!("Skipping test: Could not connect to Docker daemon");
1982 return;
1983 }
1984 };
1985
1986 if docker.ping().await.is_err() {
1987 eprintln!("Skipping test: Docker daemon not responding to ping");
1988 return;
1989 }
1990
1991 let component = ContainerComponent::new();
1992 let ctx = NoOpComponentContext;
1993 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1994
1995 let ctx = ProducerContext::new();
1996 let mut producer = endpoint.create_producer(&ctx).unwrap();
1997
1998 let mut exchange = Exchange::new(Message::new(""));
1999 exchange
2000 .input
2001 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2002
2003 use tower::ServiceExt;
2004 let result = producer
2005 .ready()
2006 .await
2007 .unwrap()
2008 .call(exchange)
2009 .await
2010 .unwrap();
2011
2012 assert_eq!(
2014 result
2015 .input
2016 .header(HEADER_ACTION_RESULT)
2017 .map(|v| v.as_str().unwrap()),
2018 Some("success")
2019 );
2020 }
2021
2022 #[tokio::test]
2023 async fn test_container_producer_connection_error_on_invalid_host() {
2024 let component = ContainerComponent::new();
2026 let ctx = NoOpComponentContext;
2027 let endpoint = component
2028 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2029 .unwrap();
2030
2031 let ctx = ProducerContext::new();
2032 let result = endpoint.create_producer(&ctx);
2033
2034 assert!(
2036 result.is_err(),
2037 "Expected error when connecting to invalid host"
2038 );
2039 let err = result.unwrap_err();
2040 match &err {
2041 CamelError::ProcessorError(msg) => {
2042 assert!(
2043 msg.to_lowercase().contains("connection")
2044 || msg.to_lowercase().contains("connect")
2045 || msg.to_lowercase().contains("socket")
2046 || msg.contains("docker"),
2047 "Error message should indicate connection failure, got: {}",
2048 msg
2049 );
2050 }
2051 _ => panic!("Expected ProcessorError, got: {:?}", err),
2052 }
2053 }
2054
2055 #[tokio::test]
2057 async fn test_container_producer_lifecycle_operations_missing_id() {
2058 let docker = match Docker::connect_with_local_defaults() {
2060 Ok(d) => d,
2061 Err(_) => {
2062 eprintln!("Skipping test: Could not connect to Docker daemon");
2063 return;
2064 }
2065 };
2066
2067 if docker.ping().await.is_err() {
2068 eprintln!("Skipping test: Docker daemon not responding to ping");
2069 return;
2070 }
2071
2072 let component = ContainerComponent::new();
2073 let ctx = NoOpComponentContext;
2074 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2075 let ctx = ProducerContext::new();
2076 let mut producer = endpoint.create_producer(&ctx).unwrap();
2077
2078 for operation in ["start", "stop", "remove"] {
2080 let mut exchange = Exchange::new(Message::new(""));
2081 exchange.input.set_header(
2082 HEADER_ACTION,
2083 serde_json::Value::String(operation.to_string()),
2084 );
2085 use tower::ServiceExt;
2088 let result = producer.ready().await.unwrap().call(exchange).await;
2089
2090 assert!(
2091 result.is_err(),
2092 "Expected error for {} operation without CamelContainerId",
2093 operation
2094 );
2095 let err = result.unwrap_err();
2096 match &err {
2097 CamelError::ProcessorError(msg) => {
2098 assert!(
2099 msg.contains(HEADER_CONTAINER_ID),
2100 "Error message should mention {}, got: {}",
2101 HEADER_CONTAINER_ID,
2102 msg
2103 );
2104 }
2105 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2106 }
2107 }
2108 }
2109
2110 #[tokio::test]
2112 async fn test_container_producer_stop_nonexistent() {
2113 let docker = match Docker::connect_with_local_defaults() {
2115 Ok(d) => d,
2116 Err(_) => {
2117 eprintln!("Skipping test: Could not connect to Docker daemon");
2118 return;
2119 }
2120 };
2121
2122 if docker.ping().await.is_err() {
2123 eprintln!("Skipping test: Docker daemon not responding to ping");
2124 return;
2125 }
2126
2127 let component = ContainerComponent::new();
2128 let ctx = NoOpComponentContext;
2129 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2130 let ctx = ProducerContext::new();
2131 let mut producer = endpoint.create_producer(&ctx).unwrap();
2132
2133 let mut exchange = Exchange::new(Message::new(""));
2134 exchange
2135 .input
2136 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2137 exchange.input.set_header(
2138 HEADER_CONTAINER_ID,
2139 serde_json::Value::String("nonexistent-container-123".into()),
2140 );
2141
2142 use tower::ServiceExt;
2143 let result = producer.ready().await.unwrap().call(exchange).await;
2144
2145 assert!(
2146 result.is_err(),
2147 "Expected error when stopping nonexistent container"
2148 );
2149 let err = result.unwrap_err();
2150 match &err {
2151 CamelError::ProcessorError(msg) => {
2152 assert!(
2154 msg.to_lowercase().contains("no such container")
2155 || msg.to_lowercase().contains("not found")
2156 || msg.contains("404"),
2157 "Error message should indicate container not found, got: {}",
2158 msg
2159 );
2160 }
2161 _ => panic!("Expected ProcessorError, got: {:?}", err),
2162 }
2163 }
2164
2165 #[tokio::test]
2167 async fn test_container_producer_run_missing_image() {
2168 let docker = match Docker::connect_with_local_defaults() {
2170 Ok(d) => d,
2171 Err(_) => {
2172 eprintln!("Skipping test: Could not connect to Docker daemon");
2173 return;
2174 }
2175 };
2176
2177 if docker.ping().await.is_err() {
2178 eprintln!("Skipping test: Docker daemon not responding to ping");
2179 return;
2180 }
2181
2182 let component = ContainerComponent::new();
2184 let ctx = NoOpComponentContext;
2185 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2186 let ctx = ProducerContext::new();
2187 let mut producer = endpoint.create_producer(&ctx).unwrap();
2188
2189 let mut exchange = Exchange::new(Message::new(""));
2190 exchange
2191 .input
2192 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2193 use tower::ServiceExt;
2196 let result = producer.ready().await.unwrap().call(exchange).await;
2197
2198 assert!(
2199 result.is_err(),
2200 "Expected error for run operation without image"
2201 );
2202 let err = result.unwrap_err();
2203 match &err {
2204 CamelError::ProcessorError(msg) => {
2205 assert!(
2206 msg.to_lowercase().contains("image"),
2207 "Error message should mention 'image', got: {}",
2208 msg
2209 );
2210 }
2211 _ => panic!("Expected ProcessorError, got: {:?}", err),
2212 }
2213 }
2214
2215 #[tokio::test]
2217 async fn test_container_producer_run_image_from_header() {
2218 let docker = match Docker::connect_with_local_defaults() {
2220 Ok(d) => d,
2221 Err(_) => {
2222 eprintln!("Skipping test: Could not connect to Docker daemon");
2223 return;
2224 }
2225 };
2226
2227 if docker.ping().await.is_err() {
2228 eprintln!("Skipping test: Docker daemon not responding to ping");
2229 return;
2230 }
2231
2232 let component = ContainerComponent::new();
2234 let ctx = NoOpComponentContext;
2235 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2236 let ctx = ProducerContext::new();
2237 let mut producer = endpoint.create_producer(&ctx).unwrap();
2238
2239 let mut exchange = Exchange::new(Message::new(""));
2240 exchange
2241 .input
2242 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2243 exchange.input.set_header(
2245 HEADER_IMAGE,
2246 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2247 );
2248
2249 use tower::ServiceExt;
2250 let result = producer.ready().await.unwrap().call(exchange).await;
2251
2252 assert!(
2254 result.is_err(),
2255 "Expected error when running container with nonexistent image"
2256 );
2257 let err = result.unwrap_err();
2258 match &err {
2259 CamelError::ProcessorError(msg) => {
2260 assert!(
2262 msg.to_lowercase().contains("no such image")
2263 || msg.to_lowercase().contains("not found")
2264 || msg.to_lowercase().contains("image")
2265 || msg.to_lowercase().contains("pull")
2266 || msg.contains("404"),
2267 "Error message should indicate image issue, got: {}",
2268 msg
2269 );
2270 }
2271 _ => panic!("Expected ProcessorError, got: {:?}", err),
2272 }
2273 }
2274
2275 #[tokio::test]
2278 async fn test_container_producer_run_alpine_container() {
2279 let docker = match Docker::connect_with_local_defaults() {
2280 Ok(d) => d,
2281 Err(_) => {
2282 eprintln!("Skipping test: Could not connect to Docker daemon");
2283 return;
2284 }
2285 };
2286
2287 if docker.ping().await.is_err() {
2288 eprintln!("Skipping test: Docker daemon not responding to ping");
2289 return;
2290 }
2291
2292 let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2294 let has_alpine = images
2295 .iter()
2296 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2297
2298 if !has_alpine {
2299 eprintln!("Pulling alpine:latest image...");
2300 let mut stream = docker.create_image(
2301 Some(CreateImageOptions {
2302 from_image: Some("alpine:latest".to_string()),
2303 ..Default::default()
2304 }),
2305 None,
2306 None,
2307 );
2308
2309 use futures::StreamExt;
2310 while let Some(_item) = stream.next().await {
2311 }
2313 eprintln!("Image pulled successfully");
2314 }
2315
2316 let component = ContainerComponent::new();
2318 let ctx = NoOpComponentContext;
2319 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2320 let ctx = ProducerContext::new();
2321 let mut producer = endpoint.create_producer(&ctx).unwrap();
2322
2323 let timestamp = std::time::SystemTime::now()
2325 .duration_since(std::time::UNIX_EPOCH)
2326 .unwrap()
2327 .as_millis();
2328 let container_name = format!("test-rust-camel-{}", timestamp);
2329 let mut exchange = Exchange::new(Message::new(""));
2330 exchange.input.set_header(
2331 HEADER_IMAGE,
2332 serde_json::Value::String("alpine:latest".into()),
2333 );
2334 exchange.input.set_header(
2335 HEADER_CONTAINER_NAME,
2336 serde_json::Value::String(container_name.clone()),
2337 );
2338
2339 use tower::ServiceExt;
2340 let result = producer
2341 .ready()
2342 .await
2343 .unwrap()
2344 .call(exchange)
2345 .await
2346 .expect("Container run should succeed");
2347
2348 let container_id = result
2350 .input
2351 .header(HEADER_CONTAINER_ID)
2352 .and_then(|v| v.as_str().map(|s| s.to_string()))
2353 .expect("Expected container ID header");
2354 assert!(!container_id.is_empty(), "Container ID should not be empty");
2355
2356 assert_eq!(
2358 result
2359 .input
2360 .header(HEADER_ACTION_RESULT)
2361 .and_then(|v| v.as_str()),
2362 Some("success")
2363 );
2364
2365 let inspect = docker
2367 .inspect_container(&container_id, None)
2368 .await
2369 .expect("Container should exist");
2370 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2371
2372 docker
2374 .remove_container(
2375 &container_id,
2376 Some(RemoveContainerOptions {
2377 force: true,
2378 ..Default::default()
2379 }),
2380 )
2381 .await
2382 .ok();
2383
2384 eprintln!("✅ Container {} created and cleaned up", container_id);
2385 }
2386
2387 #[tokio::test]
2389 async fn test_container_consumer_unsupported_operation() {
2390 use tokio::sync::mpsc;
2391
2392 let component = ContainerComponent::new();
2393 let ctx = NoOpComponentContext;
2394 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2395 let mut consumer = endpoint.create_consumer().unwrap();
2396
2397 let (tx, _rx) = mpsc::channel(16);
2399 let cancel_token = tokio_util::sync::CancellationToken::new();
2400 let context = ConsumerContext::new(tx, cancel_token);
2401
2402 let result = consumer.start(context).await;
2403
2404 assert!(
2406 result.is_err(),
2407 "Expected error for unsupported consumer operation"
2408 );
2409 let err = result.unwrap_err();
2410 match &err {
2411 CamelError::EndpointCreationFailed(msg) => {
2412 assert!(
2413 msg.contains("Consumer only supports 'events' or 'logs'"),
2414 "Error message should mention events or logs support, got: {}",
2415 msg
2416 );
2417 }
2418 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2419 }
2420 }
2421
2422 #[test]
2423 fn test_container_consumer_concurrency_model_is_concurrent() {
2424 let consumer = ContainerConsumer {
2425 config: ContainerConfig::from_uri("container:events").unwrap(),
2426 };
2427
2428 assert_eq!(
2429 consumer.concurrency_model(),
2430 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2431 );
2432 }
2433
2434 #[tokio::test]
2438 async fn test_container_consumer_cancellation() {
2439 use std::sync::atomic::{AtomicBool, Ordering};
2440 use tokio::sync::mpsc;
2441
2442 let docker = match Docker::connect_with_local_defaults() {
2444 Ok(d) => d,
2445 Err(_) => {
2446 eprintln!("Skipping test: Could not connect to Docker daemon");
2447 return;
2448 }
2449 };
2450
2451 if docker.ping().await.is_err() {
2452 eprintln!("Skipping test: Docker daemon not responding to ping");
2453 return;
2454 }
2455
2456 let component = ContainerComponent::new();
2457 let ctx = NoOpComponentContext;
2458 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2459 let mut consumer = endpoint.create_consumer().unwrap();
2460
2461 let (tx, _rx) = mpsc::channel(16);
2463 let cancel_token = tokio_util::sync::CancellationToken::new();
2464 let context = ConsumerContext::new(tx, cancel_token.clone());
2465
2466 let completed = Arc::new(AtomicBool::new(false));
2468 let completed_clone = completed.clone();
2469
2470 let handle = tokio::spawn(async move {
2472 let result = consumer.start(context).await;
2473 completed_clone.store(true, Ordering::SeqCst);
2475 result
2476 });
2477
2478 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2480
2481 assert!(
2483 !completed.load(Ordering::SeqCst),
2484 "Consumer should still be running before cancellation"
2485 );
2486
2487 cancel_token.cancel();
2489
2490 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2492
2493 assert!(
2495 result.is_ok(),
2496 "Consumer should gracefully shut down after cancellation"
2497 );
2498
2499 assert!(
2501 completed.load(Ordering::SeqCst),
2502 "Consumer should have completed after cancellation"
2503 );
2504 }
2505
2506 #[tokio::test]
2510 async fn test_container_producer_list_containers() {
2511 let docker = match Docker::connect_with_local_defaults() {
2514 Ok(d) => d,
2515 Err(_) => {
2516 eprintln!("Skipping test: Could not connect to Docker daemon");
2517 return;
2518 }
2519 };
2520
2521 if docker.ping().await.is_err() {
2522 eprintln!("Skipping test: Docker daemon not responding to ping");
2523 return;
2524 }
2525
2526 let component = ContainerComponent::new();
2528 let ctx = NoOpComponentContext;
2529 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2530
2531 let ctx = ProducerContext::new();
2532 let mut producer = endpoint.create_producer(&ctx).unwrap();
2533
2534 let mut exchange = Exchange::new(Message::new(""));
2536 exchange
2537 .input
2538 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2539
2540 use tower::ServiceExt;
2542 let result = producer
2543 .ready()
2544 .await
2545 .unwrap()
2546 .call(exchange)
2547 .await
2548 .expect("Producer should succeed when Docker is available");
2549
2550 match &result.input.body {
2553 camel_component_api::Body::Json(json_value) => {
2554 assert!(
2555 json_value.is_array(),
2556 "Expected input body to be a JSON array, got: {:?}",
2557 json_value
2558 );
2559 }
2560 other => panic!("Expected Body::Json with array, got: {:?}", other),
2561 }
2562 }
2563
2564 #[test]
2565 fn test_container_config_parses_volumes() {
2566 let config = ContainerConfig::from_uri(
2567 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2568 )
2569 .unwrap();
2570 assert_eq!(
2571 config.volumes.as_deref(),
2572 Some("./html:/usr/share/nginx/html:ro")
2573 );
2574 }
2575
2576 #[test]
2577 fn test_container_config_parses_exec_params() {
2578 let config = ContainerConfig::from_uri(
2579 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2580 )
2581 .unwrap();
2582 assert_eq!(config.operation, "exec");
2583 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2584 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2585 assert_eq!(config.user.as_deref(), Some("root"));
2586 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2587 assert!(config.detach);
2588 }
2589
2590 #[test]
2591 fn test_container_config_parses_network_create_params() {
2592 let config =
2593 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2594 .unwrap();
2595 assert_eq!(config.operation, "network-create");
2596 assert_eq!(config.name.as_deref(), Some("my-net"));
2597 assert_eq!(config.driver.as_deref(), Some("bridge"));
2598 }
2599
2600 #[test]
2601 fn test_container_config_defaults_new_fields() {
2602 let config = ContainerConfig::from_uri("container:list").unwrap();
2603 assert!(config.volumes.is_none());
2604 assert!(config.user.is_none());
2605 assert!(config.workdir.is_none());
2606 assert!(!config.detach);
2607 assert!(config.driver.is_none());
2608 assert!(!config.force);
2609 }
2610
2611 #[test]
2612 fn test_parse_volumes_bind_mount() {
2613 let config = ContainerConfig::from_uri(
2614 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2615 )
2616 .unwrap();
2617 let (binds, anon) = config.parse_volumes().unwrap();
2618 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2619 assert!(anon.is_empty());
2620 }
2621
2622 #[test]
2623 fn test_parse_volumes_named_volume() {
2624 let config =
2625 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2626 .unwrap();
2627 let (binds, anon) = config.parse_volumes().unwrap();
2628 assert_eq!(binds, vec!["data:/var/lib/data"]);
2629 assert!(anon.is_empty());
2630 }
2631
2632 #[test]
2633 fn test_parse_volumes_anonymous() {
2634 let config =
2635 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2636 let (binds, anon) = config.parse_volumes().unwrap();
2637 assert!(binds.is_empty());
2638 assert!(anon.contains(&"/tmp/app-data".to_string()));
2639 }
2640
2641 #[test]
2642 fn test_parse_volumes_anonymous_with_mode() {
2643 let config =
2644 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2645 .unwrap();
2646 let (binds, anon) = config.parse_volumes().unwrap();
2647 assert!(binds.is_empty());
2648 assert!(anon.contains(&"/tmp/app-data".to_string()));
2649 }
2650
2651 #[test]
2652 fn test_parse_volumes_multiple() {
2653 let config = ContainerConfig::from_uri(
2654 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2655 )
2656 .unwrap();
2657 let (binds, anon) = config.parse_volumes().unwrap();
2658 assert_eq!(binds.len(), 2);
2659 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2660 assert!(binds.contains(&"data:/var/log/app".to_string()));
2661 assert!(anon.is_empty());
2662 }
2663
2664 #[test]
2665 fn test_parse_volumes_mixed() {
2666 let config = ContainerConfig::from_uri(
2667 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2668 )
2669 .unwrap();
2670 let (binds, anon) = config.parse_volumes().unwrap();
2671 assert_eq!(binds.len(), 1);
2672 assert!(anon.contains(&"/tmp/cache".to_string()));
2673 }
2674
2675 #[test]
2676 fn test_parse_volumes_none() {
2677 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2678 assert!(config.parse_volumes().is_none());
2679 }
2680
2681 #[test]
2682 fn test_parse_volumes_empty_entry_skipped() {
2683 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2684 assert!(config.parse_volumes().is_none());
2685 }
2686
2687 #[test]
2688 fn test_parse_volumes_rw_mode() {
2689 let config =
2690 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2691 .unwrap();
2692 let (binds, _) = config.parse_volumes().unwrap();
2693 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2694 }
2695
2696 #[tokio::test]
2697 async fn test_container_producer_network_lifecycle() {
2698 let docker = match Docker::connect_with_local_defaults() {
2699 Ok(d) => d,
2700 Err(_) => {
2701 eprintln!("Skipping test: Could not connect to Docker daemon");
2702 return;
2703 }
2704 };
2705 if docker.ping().await.is_err() {
2706 eprintln!("Skipping test: Docker daemon not responding to ping");
2707 return;
2708 }
2709
2710 let network_name = format!("camel-test-{}", std::process::id());
2711
2712 let component = ContainerComponent::new();
2713 let component_ctx = NoOpComponentContext;
2714
2715 let endpoint = component
2717 .create_endpoint(
2718 &format!("container:network-create?name={}", network_name),
2719 &component_ctx,
2720 )
2721 .unwrap();
2722 let ctx = ProducerContext::new();
2723 let mut producer = endpoint.create_producer(&ctx).unwrap();
2724
2725 let mut exchange = Exchange::new(Message::new(""));
2726 exchange.input.set_header(
2727 HEADER_ACTION,
2728 serde_json::Value::String("network-create".into()),
2729 );
2730
2731 use tower::ServiceExt;
2732 let result = producer
2733 .ready()
2734 .await
2735 .unwrap()
2736 .call(exchange)
2737 .await
2738 .expect("Network create should succeed");
2739
2740 let network_id = result
2741 .input
2742 .header(HEADER_NETWORK)
2743 .and_then(|v| v.as_str().map(|s| s.to_string()))
2744 .expect("Should have network ID");
2745
2746 assert!(!network_id.is_empty());
2747
2748 let endpoint = component
2750 .create_endpoint("container:network-list", &component_ctx)
2751 .unwrap();
2752 let mut producer = endpoint.create_producer(&ctx).unwrap();
2753
2754 let mut exchange = Exchange::new(Message::new(""));
2755 exchange.input.set_header(
2756 HEADER_ACTION,
2757 serde_json::Value::String("network-list".into()),
2758 );
2759
2760 let list_result = producer
2761 .ready()
2762 .await
2763 .unwrap()
2764 .call(exchange)
2765 .await
2766 .expect("Network list should succeed");
2767
2768 match &list_result.input.body {
2769 Body::Json(json_value) => {
2770 assert!(json_value.is_array(), "Expected JSON array");
2771 }
2772 other => panic!("Expected Body::Json, got: {:?}", other),
2773 }
2774
2775 let endpoint = component
2777 .create_endpoint(
2778 &format!("container:network-remove?network={}", network_name),
2779 &component_ctx,
2780 )
2781 .unwrap();
2782 let mut producer = endpoint.create_producer(&ctx).unwrap();
2783
2784 let mut exchange = Exchange::new(Message::new(""));
2785 exchange.input.set_header(
2786 HEADER_ACTION,
2787 serde_json::Value::String("network-remove".into()),
2788 );
2789
2790 let remove_result = producer
2791 .ready()
2792 .await
2793 .unwrap()
2794 .call(exchange)
2795 .await
2796 .expect("Network remove should succeed");
2797
2798 let action_result = remove_result
2799 .input
2800 .header(HEADER_ACTION_RESULT)
2801 .and_then(|v| v.as_str());
2802 assert_eq!(action_result, Some("success"));
2803 }
2804}