1pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29fn track_container(id: String) {
31 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32 tracker.insert(id);
33 }
34}
35
36fn untrack_container(id: &str) {
38 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39 tracker.remove(id);
40 }
41}
42
43pub async fn cleanup_tracked_containers() {
45 let ids: Vec<String> = {
46 match CONTAINER_TRACKER.lock() {
47 Ok(tracker) => tracker.iter().cloned().collect(),
48 Err(_) => return,
49 }
50 };
51
52 if ids.is_empty() {
53 return;
54 }
55
56 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58 let docker = match Docker::connect_with_local_defaults() {
59 Ok(d) => d,
60 Err(e) => {
61 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62 return;
63 }
64 };
65
66 for id in ids {
67 match docker
68 .remove_container(
69 &id,
70 Some(bollard::container::RemoveContainerOptions {
71 force: true,
72 ..Default::default()
73 }),
74 )
75 .await
76 {
77 Ok(_) => {
78 tracing::debug!("Cleaned up container {}", id);
79 untrack_container(&id);
80 }
81 Err(e) => {
82 tracing::warn!("Failed to cleanup container {}: {}", id, e);
83 }
84 }
85 }
86}
87
88const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114pub const HEADER_CMD: &str = "CamelContainerCmd";
116
117pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
119
120pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
122
123pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
125
126pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
128
129#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
137#[serde(default)]
138pub struct ContainerGlobalConfig {
139 pub docker_host: String,
141}
142
143impl Default for ContainerGlobalConfig {
144 fn default() -> Self {
145 Self {
146 docker_host: "unix:///var/run/docker.sock".to_string(),
147 }
148 }
149}
150
151impl ContainerGlobalConfig {
152 pub fn new() -> Self {
153 Self::default()
154 }
155
156 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
157 self.docker_host = v.into();
158 self
159 }
160}
161
162#[derive(Debug, Clone)]
171pub struct ContainerConfig {
172 pub operation: String,
174 pub image: Option<String>,
176 pub name: Option<String>,
178 pub host: Option<String>,
180 pub cmd: Option<String>,
182 pub ports: Option<String>,
184 pub env: Option<String>,
186 pub network: Option<String>,
188 pub container_id: Option<String>,
190 pub follow: bool,
192 pub timestamps: bool,
194 pub tail: Option<String>,
196 pub auto_pull: bool,
198 pub auto_remove: bool,
200 pub volumes: Option<String>,
202 pub user: Option<String>,
204 pub workdir: Option<String>,
206 pub detach: bool,
208 pub driver: Option<String>,
210 pub force: bool,
212}
213
214impl ContainerConfig {
215 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
223 let parts = parse_uri(uri)?;
224 if parts.scheme != "container" {
225 return Err(CamelError::InvalidUri(format!(
226 "expected scheme 'container', got '{}'",
227 parts.scheme
228 )));
229 }
230
231 let image = parts.params.get("image").cloned();
232 let name = parts.params.get("name").cloned();
233 let cmd = parts.params.get("cmd").cloned();
234 let ports = parts.params.get("ports").cloned();
235 let env = parts.params.get("env").cloned();
236 let network = parts.params.get("network").cloned();
237 let container_id = parts.params.get("containerId").cloned();
238 let follow = parts
239 .params
240 .get("follow")
241 .map(|v| v.eq_ignore_ascii_case("true"))
242 .unwrap_or(true);
243 let timestamps = parts
244 .params
245 .get("timestamps")
246 .map(|v| v.eq_ignore_ascii_case("true"))
247 .unwrap_or(false);
248 let tail = parts.params.get("tail").cloned();
249 let auto_pull = parts
250 .params
251 .get("autoPull")
252 .map(|v| v.eq_ignore_ascii_case("true"))
253 .unwrap_or(true);
254 let auto_remove = parts
255 .params
256 .get("autoRemove")
257 .map(|v| v.eq_ignore_ascii_case("true"))
258 .unwrap_or(true);
259 let host = parts.params.get("host").cloned();
261 let volumes = parts.params.get("volumes").cloned();
262 let user = parts.params.get("user").cloned();
263 let workdir = parts.params.get("workdir").cloned();
264 let detach = parts
265 .params
266 .get("detach")
267 .map(|v| v.eq_ignore_ascii_case("true"))
268 .unwrap_or(false);
269 let driver = parts.params.get("driver").cloned();
270 let force = parts
271 .params
272 .get("force")
273 .map(|v| v.eq_ignore_ascii_case("true"))
274 .unwrap_or(false);
275
276 Ok(Self {
277 operation: parts.path,
278 image,
279 name,
280 host,
281 cmd,
282 ports,
283 env,
284 network,
285 container_id,
286 follow,
287 timestamps,
288 tail,
289 auto_pull,
290 auto_remove,
291 volumes,
292 user,
293 workdir,
294 detach,
295 driver,
296 force,
297 })
298 }
299
300 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
303 if self.host.is_none() {
304 self.host = Some(global.docker_host.clone());
305 }
306 }
307
308 fn docker_socket_path(&self) -> Result<&str, CamelError> {
309 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
310 "npipe:////./pipe/docker_engine"
311 } else {
312 "unix:///var/run/docker.sock"
313 });
314
315 if host.starts_with("unix://") || host.starts_with("npipe://") {
316 return Ok(host);
317 }
318
319 if host.contains("://") {
320 return Err(CamelError::ProcessorError(format!(
321 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
322 host
323 )));
324 }
325
326 Ok(host)
327 }
328
329 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
330 let socket_path = self.docker_socket_path()?;
331 Docker::connect_with_socket(
332 socket_path,
333 DOCKER_CONNECT_TIMEOUT_SECS,
334 bollard::API_DEFAULT_VERSION,
335 )
336 .map_err(|e| {
337 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
338 })
339 }
340
341 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
349 let docker = self.connect_docker_client()?;
350 docker
351 .ping()
352 .await
353 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
354 Ok(docker)
355 }
356
357 #[allow(clippy::type_complexity)]
358 fn parse_ports(
359 &self,
360 ) -> Option<(
361 HashMap<String, HashMap<(), ()>>,
362 HashMap<String, Option<Vec<PortBinding>>>,
363 )> {
364 let ports_str = self.ports.as_ref()?;
365
366 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
367 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
368
369 for mapping in ports_str.split(',') {
370 let mapping = mapping.trim();
371 if mapping.is_empty() {
372 continue;
373 }
374
375 let (host_port, container_spec) = mapping.split_once(':')?;
376
377 let (container_port, protocol) = if container_spec.contains('/') {
378 let parts: Vec<&str> = container_spec.split('/').collect();
379 (parts[0], parts[1])
380 } else {
381 (container_spec, "tcp")
382 };
383
384 let container_key = format!("{}/{}", container_port, protocol);
385
386 exposed_ports.insert(container_key.clone(), HashMap::new());
387
388 port_bindings.insert(
389 container_key,
390 Some(vec![PortBinding {
391 host_ip: None,
392 host_port: Some(host_port.to_string()),
393 }]),
394 );
395 }
396
397 if exposed_ports.is_empty() {
398 None
399 } else {
400 Some((exposed_ports, port_bindings))
401 }
402 }
403
404 fn parse_env(&self) -> Option<Vec<String>> {
405 let env_str = self.env.as_ref()?;
406
407 let env_vars: Vec<String> = env_str
408 .split(',')
409 .map(|s| s.trim().to_string())
410 .filter(|s| !s.is_empty())
411 .collect();
412
413 if env_vars.is_empty() {
414 None
415 } else {
416 Some(env_vars)
417 }
418 }
419
420 #[cfg(test)]
421 #[allow(clippy::type_complexity)]
422 fn parse_volumes(&self) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
423 self.volumes.as_deref().and_then(parse_volume_str)
424 }
425}
426
427#[allow(clippy::type_complexity)]
432fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
433 let mut binds: Vec<String> = Vec::new();
434 let mut anonymous_volumes: HashMap<String, HashMap<(), ()>> = HashMap::new();
435
436 for entry in volumes_str.split(',') {
437 let entry = entry.trim();
438 if entry.is_empty() {
439 continue;
440 }
441
442 let segments: Vec<&str> = entry.split(':').collect();
443
444 match segments.len() {
445 3 => {
446 let source = segments[0];
447 let target = segments[1];
448 let mode = segments[2];
449 if mode != "ro" && mode != "rw" {
450 continue;
451 }
452 binds.push(format!("{}:{}:{}", source, target, mode));
453 }
454 2 => {
455 let a = segments[0];
456 let b = segments[1];
457 if b == "ro" || b == "rw" {
458 anonymous_volumes.insert(a.to_string(), HashMap::new());
459 } else {
460 binds.push(format!("{}:{}", a, b));
461 }
462 }
463 1 => {
464 anonymous_volumes.insert(segments[0].to_string(), HashMap::new());
465 }
466 _ => continue,
467 }
468 }
469
470 if binds.is_empty() && anonymous_volumes.is_empty() {
471 None
472 } else {
473 Some((binds, anonymous_volumes))
474 }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478enum ProducerOperation {
479 List,
480 Run,
481 Start,
482 Stop,
483 Remove,
484 Exec,
485 NetworkCreate,
486 NetworkConnect,
487 NetworkDisconnect,
488 NetworkRemove,
489 NetworkList,
490}
491
492fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
493 match operation {
494 "list" => Ok(ProducerOperation::List),
495 "run" => Ok(ProducerOperation::Run),
496 "start" => Ok(ProducerOperation::Start),
497 "stop" => Ok(ProducerOperation::Stop),
498 "remove" => Ok(ProducerOperation::Remove),
499 "exec" => Ok(ProducerOperation::Exec),
500 "network-create" => Ok(ProducerOperation::NetworkCreate),
501 "network-connect" => Ok(ProducerOperation::NetworkConnect),
502 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
503 "network-remove" => Ok(ProducerOperation::NetworkRemove),
504 "network-list" => Ok(ProducerOperation::NetworkList),
505 _ => Err(CamelError::ProcessorError(format!(
506 "Unknown container operation: {}",
507 operation
508 ))),
509 }
510}
511
512fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
513 exchange
514 .input
515 .header(HEADER_CONTAINER_NAME)
516 .and_then(|v| v.as_str().map(|s| s.to_string()))
517 .or_else(|| config.name.clone())
518}
519
520async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
521 let images = docker
522 .list_images::<&str>(None)
523 .await
524 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
525
526 Ok(images.iter().any(|img| {
527 img.repo_tags
528 .iter()
529 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
530 }))
531}
532
533async fn pull_image_with_progress(
534 docker: &Docker,
535 image: &str,
536 timeout_secs: u64,
537) -> Result<(), CamelError> {
538 use futures::StreamExt;
539
540 tracing::info!("Pulling image: {}", image);
541
542 let mut stream = docker.create_image(
543 Some(bollard::image::CreateImageOptions {
544 from_image: image,
545 ..Default::default()
546 }),
547 None,
548 None,
549 );
550
551 let start = std::time::Instant::now();
552 let mut last_progress = std::time::Instant::now();
553
554 while let Some(item) = stream.next().await {
555 if start.elapsed().as_secs() > timeout_secs {
556 return Err(CamelError::ProcessorError(format!(
557 "Image pull timeout after {}s. Try manually: docker pull {}",
558 timeout_secs, image
559 )));
560 }
561
562 match item {
563 Ok(update) => {
564 if last_progress.elapsed().as_secs() >= 2 {
566 if let Some(status) = update.status {
567 tracing::debug!("Pull progress: {}", status);
568 }
569 last_progress = std::time::Instant::now();
570 }
571 }
572 Err(e) => {
573 let err_str = e.to_string().to_lowercase();
574 if err_str.contains("unauthorized") || err_str.contains("401") {
575 return Err(CamelError::ProcessorError(format!(
576 "Authentication required for image '{}'. Configure Docker credentials: docker login",
577 image
578 )));
579 }
580 if err_str.contains("not found") || err_str.contains("404") {
581 return Err(CamelError::ProcessorError(format!(
582 "Image '{}' not found in registry. Check the image name and tag",
583 image
584 )));
585 }
586 return Err(CamelError::ProcessorError(format!(
587 "Failed to pull image '{}': {}",
588 image, e
589 )));
590 }
591 }
592 }
593
594 tracing::info!("Successfully pulled image: {}", image);
595 Ok(())
596}
597
598async fn ensure_image_available(
599 docker: &Docker,
600 image: &str,
601 auto_pull: bool,
602 timeout_secs: u64,
603) -> Result<(), CamelError> {
604 if image_exists_locally(docker, image).await? {
605 tracing::debug!("Image '{}' already available locally", image);
606 return Ok(());
607 }
608
609 if !auto_pull {
610 return Err(CamelError::ProcessorError(format!(
611 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
612 image, image
613 )));
614 }
615
616 pull_image_with_progress(docker, image, timeout_secs).await
617}
618
619fn format_docker_event(event: &bollard::models::EventMessage) -> String {
620 let action = event.action.as_deref().unwrap_or("unknown");
621 let actor = event.actor.as_ref();
622
623 let container_name = actor
624 .and_then(|a| a.attributes.as_ref())
625 .and_then(|attrs| attrs.get("name"))
626 .map(|s| s.as_str())
627 .unwrap_or("unknown");
628
629 let image = actor
630 .and_then(|a| a.attributes.as_ref())
631 .and_then(|attrs| attrs.get("image"))
632 .map(|s| s.as_str())
633 .unwrap_or("");
634
635 let exit_code = actor
636 .and_then(|a| a.attributes.as_ref())
637 .and_then(|attrs| attrs.get("exitCode"))
638 .map(|s| s.as_str());
639
640 match action {
641 "create" => {
642 if image.is_empty() {
643 format!("[CREATE] Container {}", container_name)
644 } else {
645 format!("[CREATE] Container {} ({})", container_name, image)
646 }
647 }
648 "start" => format!("[START] Container {}", container_name),
649 "die" => {
650 if let Some(code) = exit_code {
651 format!("[DIE] Container {} (exit: {})", container_name, code)
652 } else {
653 format!("[DIE] Container {}", container_name)
654 }
655 }
656 "destroy" => format!("[DESTROY] Container {}", container_name),
657 "stop" => format!("[STOP] Container {}", container_name),
658 "pause" => format!("[PAUSE] Container {}", container_name),
659 "unpause" => format!("[UNPAUSE] Container {}", container_name),
660 "restart" => format!("[RESTART] Container {}", container_name),
661 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
662 }
663}
664
665async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
666 create: CreateFn,
667 start: StartFn,
668 remove: RemoveFn,
669) -> Result<String, CamelError>
670where
671 CreateFn: FnOnce() -> CreateFut,
672 CreateFut: Future<Output = Result<String, CamelError>>,
673 StartFn: FnOnce(String) -> StartFut,
674 StartFut: Future<Output = Result<(), CamelError>>,
675 RemoveFn: FnOnce(String) -> RemoveFut,
676 RemoveFut: Future<Output = Result<(), CamelError>>,
677{
678 let container_id = create().await?;
679 if let Err(start_err) = start(container_id.clone()).await {
680 if let Err(remove_err) = remove(container_id.clone()).await {
681 return Err(CamelError::ProcessorError(format!(
682 "Failed to start container: {}. Cleanup failed: {}",
683 start_err, remove_err
684 )));
685 }
686 return Err(start_err);
687 }
688
689 Ok(container_id)
690}
691
692async fn handle_list(
693 docker: Docker,
694 _config: ContainerConfig,
695 exchange: &mut Exchange,
696) -> Result<(), CamelError> {
697 let containers = docker
698 .list_containers::<String>(None)
699 .await
700 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
701
702 let json_value = serde_json::to_value(&containers).map_err(|e| {
703 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
704 })?;
705
706 exchange.input.body = Body::Json(json_value);
707 exchange.input.set_header(
708 HEADER_ACTION_RESULT,
709 serde_json::Value::String("success".to_string()),
710 );
711 Ok(())
712}
713
714async fn handle_run(
715 docker: Docker,
716 config: ContainerConfig,
717 exchange: &mut Exchange,
718) -> Result<(), CamelError> {
719 let image = exchange
720 .input
721 .header(HEADER_IMAGE)
722 .and_then(|v| v.as_str().map(|s| s.to_string()))
723 .or(config.image.clone())
724 .ok_or_else(|| {
725 CamelError::ProcessorError(
726 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
727 )
728 })?;
729
730 let pull_timeout = 300;
731 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
732 .await
733 .map_err(|e| {
734 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
735 })?;
736
737 let container_name = resolve_container_name(exchange, &config);
738 let container_name_ref = container_name.as_deref().unwrap_or("");
739 let cmd_parts: Option<Vec<String>> = config
740 .cmd
741 .as_ref()
742 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
743 let auto_remove = config.auto_remove;
744 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
745 let env_vars = config.parse_env();
746 let network_mode = config.network.clone();
747
748 let volumes_str = exchange
749 .input
750 .header(HEADER_VOLUMES)
751 .and_then(|v| v.as_str().map(|s| s.to_string()))
752 .or(config.volumes.clone());
753 let (binds, anon_volumes) = volumes_str
754 .as_deref()
755 .and_then(parse_volume_str)
756 .unwrap_or_default();
757
758 let docker_create = docker.clone();
759 let docker_start = docker.clone();
760 let docker_remove = docker.clone();
761
762 let container_id = run_container_with_cleanup(
763 move || async move {
764 let create_options = bollard::container::CreateContainerOptions {
765 name: container_name_ref,
766 ..Default::default()
767 };
768 let container_config = bollard::container::Config::<String> {
769 image: Some(image.clone()),
770 cmd: cmd_parts,
771 env: env_vars,
772 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
773 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
774 host_config: Some(HostConfig {
775 auto_remove: Some(auto_remove),
776 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
777 network_mode,
778 binds: if binds.is_empty() { None } else { Some(binds) },
779 ..Default::default()
780 }),
781 ..Default::default()
782 };
783
784 let create_response = docker_create
785 .create_container(Some(create_options), container_config)
786 .await
787 .map_err(|e| {
788 let err_str = e.to_string().to_lowercase();
789 if err_str.contains("409") || err_str.contains("conflict") {
790 CamelError::ProcessorError(format!(
791 "Container name '{}' already exists. Use a unique name or remove the existing container first",
792 container_name_ref
793 ))
794 } else {
795 CamelError::ProcessorError(format!(
796 "Failed to create container: {}",
797 e
798 ))
799 }
800 })?;
801
802 Ok(create_response.id)
803 },
804 move |container_id| async move {
805 docker_start
806 .start_container::<String>(&container_id, None)
807 .await
808 .map_err(|e| {
809 CamelError::ProcessorError(format!(
810 "Failed to start container: {}",
811 e
812 ))
813 })
814 },
815 move |container_id| async move {
816 docker_remove
817 .remove_container(&container_id, None)
818 .await
819 .map_err(|e| {
820 CamelError::ProcessorError(format!(
821 "Failed to remove container after start failure: {}",
822 e
823 ))
824 })
825 },
826 )
827 .await?;
828
829 track_container(container_id.clone());
830
831 exchange
832 .input
833 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
834 exchange.input.set_header(
835 HEADER_ACTION_RESULT,
836 serde_json::Value::String("success".to_string()),
837 );
838 Ok(())
839}
840
841async fn handle_lifecycle(
842 docker: Docker,
843 _config: ContainerConfig,
844 exchange: &mut Exchange,
845 operation: ProducerOperation,
846 operation_name: &str,
847) -> Result<(), CamelError> {
848 let container_id = exchange
849 .input
850 .header(HEADER_CONTAINER_ID)
851 .and_then(|v| v.as_str().map(|s| s.to_string()))
852 .ok_or_else(|| {
853 CamelError::ProcessorError(format!(
854 "{} header is required for {} operation",
855 HEADER_CONTAINER_ID, operation_name
856 ))
857 })?;
858
859 match operation {
860 ProducerOperation::Start => {
861 docker
862 .start_container::<String>(&container_id, None)
863 .await
864 .map_err(|e| {
865 CamelError::ProcessorError(format!("Failed to start container: {}", e))
866 })?;
867 }
868 ProducerOperation::Stop => {
869 docker
870 .stop_container(&container_id, None)
871 .await
872 .map_err(|e| {
873 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
874 })?;
875 }
876 ProducerOperation::Remove => {
877 docker
878 .remove_container(&container_id, None)
879 .await
880 .map_err(|e| {
881 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
882 })?;
883 untrack_container(&container_id);
884 }
885 _ => {}
886 }
887
888 exchange.input.set_header(
889 HEADER_ACTION_RESULT,
890 serde_json::Value::String("success".to_string()),
891 );
892 Ok(())
893}
894
895async fn handle_exec(
896 docker: Docker,
897 config: ContainerConfig,
898 exchange: &mut Exchange,
899) -> Result<(), CamelError> {
900 let container_id = exchange
901 .input
902 .header(HEADER_CONTAINER_ID)
903 .and_then(|v| v.as_str().map(|s| s.to_string()))
904 .or(config.container_id.clone())
905 .ok_or_else(|| {
906 CamelError::ProcessorError(format!(
907 "{} header or containerId param is required for exec operation",
908 HEADER_CONTAINER_ID
909 ))
910 })?;
911
912 let cmd = exchange
913 .input
914 .header(HEADER_CMD)
915 .and_then(|v| v.as_str().map(|s| s.to_string()))
916 .or(config.cmd.clone())
917 .ok_or_else(|| {
918 CamelError::ProcessorError(
919 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
920 )
921 })?;
922
923 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
924 let env_vars = config.parse_env();
925
926 let exec_config = bollard::exec::CreateExecOptions {
927 cmd: Some(cmd_parts),
928 env: env_vars,
929 user: config.user.clone(),
930 working_dir: config.workdir.clone(),
931 attach_stdout: Some(true),
932 attach_stderr: Some(true),
933 ..Default::default()
934 };
935
936 let create_result = docker
937 .create_exec(&container_id, exec_config)
938 .await
939 .map_err(|e| {
940 let err_str = e.to_string().to_lowercase();
941 if err_str.contains("404") || err_str.contains("no such") {
942 CamelError::ProcessorError(format!(
943 "Container '{}' not found for exec",
944 container_id
945 ))
946 } else {
947 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
948 }
949 })?;
950
951 let exec_id = create_result.id;
952
953 if config.detach {
954 docker
955 .start_exec(
956 &exec_id,
957 Some(bollard::exec::StartExecOptions {
958 detach: true,
959 ..Default::default()
960 }),
961 )
962 .await
963 .map_err(|e| {
964 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
965 })?;
966
967 exchange
968 .input
969 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
970 exchange
971 .input
972 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
973 } else {
974 let start_result = docker
975 .start_exec(&exec_id, None)
976 .await
977 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
978
979 let mut output = String::new();
980
981 match start_result {
982 bollard::exec::StartExecResults::Attached {
983 output: mut stream, ..
984 } => {
985 use futures::StreamExt;
986 while let Some(msg) = stream.next().await {
987 match msg {
988 Ok(bollard::container::LogOutput::StdOut { message }) => {
989 output.push_str(&String::from_utf8_lossy(&message));
990 }
991 Ok(bollard::container::LogOutput::StdErr { message }) => {
992 output.push_str(&String::from_utf8_lossy(&message));
993 }
994 Ok(_) => {}
995 Err(e) => {
996 output.push_str(&format!("[error reading stream: {}]", e));
997 }
998 }
999 }
1000 }
1001 bollard::exec::StartExecResults::Detached => {}
1002 }
1003
1004 let inspect = docker
1005 .inspect_exec(&exec_id)
1006 .await
1007 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1008
1009 let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1010
1011 let output = output.trim_end().to_string();
1012 exchange.input.body = Body::Text(output);
1013 exchange.input.set_header(
1014 HEADER_EXIT_CODE,
1015 serde_json::Value::Number(exit_code.into()),
1016 );
1017 exchange
1018 .input
1019 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1020 }
1021
1022 exchange.input.set_header(
1023 HEADER_ACTION_RESULT,
1024 serde_json::Value::String("success".to_string()),
1025 );
1026 Ok(())
1027}
1028
1029async fn handle_network_create(
1030 docker: Docker,
1031 config: ContainerConfig,
1032 exchange: &mut Exchange,
1033) -> Result<(), CamelError> {
1034 let network_name = exchange
1035 .input
1036 .header(HEADER_CONTAINER_NAME)
1037 .and_then(|v| v.as_str().map(|s| s.to_string()))
1038 .or(config.name.clone())
1039 .ok_or_else(|| {
1040 CamelError::ProcessorError(
1041 "CamelContainerName header or name param is required for network-create"
1042 .to_string(),
1043 )
1044 })?;
1045
1046 let driver = config.driver.as_deref().unwrap_or("bridge");
1047
1048 let options = bollard::network::CreateNetworkOptions {
1049 name: network_name.clone(),
1050 driver: driver.to_string(),
1051 ..Default::default()
1052 };
1053
1054 let result = docker.create_network(options).await.map_err(|e| {
1055 let err_str = e.to_string().to_lowercase();
1056 if err_str.contains("409") || err_str.contains("already exists") {
1057 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1058 } else {
1059 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1060 }
1061 })?;
1062
1063 let network_id = result.id.clone();
1064 let json_value = serde_json::to_value(&result).map_err(|e| {
1065 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1066 })?;
1067
1068 exchange.input.body = Body::Json(json_value);
1069 exchange
1070 .input
1071 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1072 exchange.input.set_header(
1073 HEADER_ACTION_RESULT,
1074 serde_json::Value::String("success".to_string()),
1075 );
1076 Ok(())
1077}
1078
1079async fn handle_network_connect(
1080 docker: Docker,
1081 config: ContainerConfig,
1082 exchange: &mut Exchange,
1083) -> Result<(), CamelError> {
1084 let network = exchange
1085 .input
1086 .header(HEADER_NETWORK)
1087 .and_then(|v| v.as_str().map(|s| s.to_string()))
1088 .or(config.network.clone())
1089 .ok_or_else(|| {
1090 CamelError::ProcessorError(
1091 "CamelContainerNetwork header or network param is required for network-connect"
1092 .to_string(),
1093 )
1094 })?;
1095
1096 let container = exchange
1097 .input
1098 .header(HEADER_CONTAINER_ID)
1099 .and_then(|v| v.as_str().map(|s| s.to_string()))
1100 .or(config.container_id.clone())
1101 .ok_or_else(|| {
1102 CamelError::ProcessorError(
1103 "CamelContainerId header or container param is required for network-connect"
1104 .to_string(),
1105 )
1106 })?;
1107
1108 docker
1109 .connect_network(
1110 &network,
1111 bollard::network::ConnectNetworkOptions {
1112 container,
1113 ..Default::default()
1114 },
1115 )
1116 .await
1117 .map_err(|e| {
1118 let err_str = e.to_string().to_lowercase();
1119 if err_str.contains("404") || err_str.contains("not found") {
1120 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1121 } else {
1122 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1123 }
1124 })?;
1125
1126 exchange.input.set_header(
1127 HEADER_ACTION_RESULT,
1128 serde_json::Value::String("success".to_string()),
1129 );
1130 Ok(())
1131}
1132
1133async fn handle_network_disconnect(
1134 docker: Docker,
1135 config: ContainerConfig,
1136 exchange: &mut Exchange,
1137) -> Result<(), CamelError> {
1138 let network = exchange
1139 .input
1140 .header(HEADER_NETWORK)
1141 .and_then(|v| v.as_str().map(|s| s.to_string()))
1142 .or(config.network.clone())
1143 .ok_or_else(|| {
1144 CamelError::ProcessorError(
1145 "CamelContainerNetwork header or network param is required for network-disconnect"
1146 .to_string(),
1147 )
1148 })?;
1149
1150 let container = exchange
1151 .input
1152 .header(HEADER_CONTAINER_ID)
1153 .and_then(|v| v.as_str().map(|s| s.to_string()))
1154 .or(config.container_id.clone())
1155 .ok_or_else(|| {
1156 CamelError::ProcessorError(
1157 "CamelContainerId header or container param is required for network-disconnect"
1158 .to_string(),
1159 )
1160 })?;
1161
1162 docker
1163 .disconnect_network(
1164 &network,
1165 bollard::network::DisconnectNetworkOptions {
1166 container,
1167 force: config.force,
1168 },
1169 )
1170 .await
1171 .map_err(|e| {
1172 let err_str = e.to_string().to_lowercase();
1173 if err_str.contains("404") || err_str.contains("not found") {
1174 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1175 } else {
1176 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1177 }
1178 })?;
1179
1180 exchange.input.set_header(
1181 HEADER_ACTION_RESULT,
1182 serde_json::Value::String("success".to_string()),
1183 );
1184 Ok(())
1185}
1186
1187async fn handle_network_remove(
1188 docker: Docker,
1189 config: ContainerConfig,
1190 exchange: &mut Exchange,
1191) -> Result<(), CamelError> {
1192 let network = exchange
1193 .input
1194 .header(HEADER_NETWORK)
1195 .and_then(|v| v.as_str().map(|s| s.to_string()))
1196 .or(config.network.clone())
1197 .ok_or_else(|| {
1198 CamelError::ProcessorError(
1199 "CamelContainerNetwork header or network param is required for network-remove"
1200 .to_string(),
1201 )
1202 })?;
1203
1204 docker.remove_network(&network).await.map_err(|e| {
1205 let err_str = e.to_string().to_lowercase();
1206 if err_str.contains("404") || err_str.contains("not found") {
1207 CamelError::ProcessorError(format!("Network '{}' not found", network))
1208 } else if err_str.contains("409") || err_str.contains("in use") {
1209 CamelError::ProcessorError(format!(
1210 "Network '{}' is in use and cannot be removed",
1211 network
1212 ))
1213 } else {
1214 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1215 }
1216 })?;
1217
1218 exchange.input.set_header(
1219 HEADER_ACTION_RESULT,
1220 serde_json::Value::String("success".to_string()),
1221 );
1222 Ok(())
1223}
1224
1225async fn handle_network_list(
1226 docker: Docker,
1227 _config: ContainerConfig,
1228 exchange: &mut Exchange,
1229) -> Result<(), CamelError> {
1230 let networks = docker
1231 .list_networks::<String>(None)
1232 .await
1233 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1234
1235 let json_value = serde_json::to_value(&networks)
1236 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1237
1238 exchange.input.body = Body::Json(json_value);
1239 exchange.input.set_header(
1240 HEADER_ACTION_RESULT,
1241 serde_json::Value::String("success".to_string()),
1242 );
1243 Ok(())
1244}
1245
1246#[derive(Clone)]
1251pub struct ContainerProducer {
1252 config: ContainerConfig,
1253 docker: Docker,
1254}
1255
1256impl Service<Exchange> for ContainerProducer {
1257 type Response = Exchange;
1258 type Error = CamelError;
1259 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1260
1261 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1262 Poll::Ready(Ok(()))
1263 }
1264
1265 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1266 let config = self.config.clone();
1267 let docker = self.docker.clone();
1268 Box::pin(async move {
1269 let operation_name = exchange
1270 .input
1271 .header(HEADER_ACTION)
1272 .and_then(|v| v.as_str().map(|s| s.to_string()))
1273 .unwrap_or_else(|| config.operation.clone());
1274
1275 let operation = parse_producer_operation(&operation_name)?;
1276
1277 match operation {
1278 ProducerOperation::List => {
1279 handle_list(docker, config, &mut exchange).await?;
1280 }
1281 ProducerOperation::Run => {
1282 handle_run(docker, config, &mut exchange).await?;
1283 }
1284 ProducerOperation::Start => {
1285 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1286 .await?;
1287 }
1288 ProducerOperation::Stop => {
1289 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1290 .await?;
1291 }
1292 ProducerOperation::Remove => {
1293 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1294 .await?;
1295 }
1296 ProducerOperation::Exec => {
1297 handle_exec(docker, config, &mut exchange).await?;
1298 }
1299 ProducerOperation::NetworkCreate => {
1300 handle_network_create(docker, config, &mut exchange).await?;
1301 }
1302 ProducerOperation::NetworkConnect => {
1303 handle_network_connect(docker, config, &mut exchange).await?;
1304 }
1305 ProducerOperation::NetworkDisconnect => {
1306 handle_network_disconnect(docker, config, &mut exchange).await?;
1307 }
1308 ProducerOperation::NetworkRemove => {
1309 handle_network_remove(docker, config, &mut exchange).await?;
1310 }
1311 ProducerOperation::NetworkList => {
1312 handle_network_list(docker, config, &mut exchange).await?;
1313 }
1314 }
1315
1316 Ok(exchange)
1317 })
1318 }
1319}
1320
1321pub struct ContainerConsumer {
1326 config: ContainerConfig,
1327}
1328
1329#[async_trait]
1330impl Consumer for ContainerConsumer {
1331 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1332 match self.config.operation.as_str() {
1333 "events" => self.start_events_consumer(context).await,
1334 "logs" => self.start_logs_consumer(context).await,
1335 _ => Err(CamelError::EndpointCreationFailed(format!(
1336 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1337 self.config.operation
1338 ))),
1339 }
1340 }
1341
1342 async fn stop(&mut self) -> Result<(), CamelError> {
1343 Ok(())
1344 }
1345
1346 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1347 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1348 }
1349}
1350
1351impl ContainerConsumer {
1352 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1353 use futures::StreamExt;
1354
1355 loop {
1356 if context.is_cancelled() {
1357 tracing::info!("Container events consumer shutting down");
1358 return Ok(());
1359 }
1360
1361 let docker = match self.config.connect_docker().await {
1362 Ok(d) => d,
1363 Err(e) => {
1364 tracing::error!(
1365 "Consumer failed to connect to docker: {}. Retrying in 5s...",
1366 e
1367 );
1368 tokio::select! {
1369 _ = context.cancelled() => {
1370 tracing::info!("Container events consumer shutting down");
1371 return Ok(());
1372 }
1373 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1374 }
1375 continue;
1376 }
1377 };
1378
1379 let mut event_stream = docker.events::<String>(None);
1380
1381 loop {
1382 tokio::select! {
1383 _ = context.cancelled() => {
1384 tracing::info!("Container events consumer shutting down");
1385 return Ok(());
1386 }
1387
1388 msg = event_stream.next() => {
1389 match msg {
1390 Some(Ok(event)) => {
1391 let formatted = format_docker_event(&event);
1392 let message = Message::new(Body::Text(formatted));
1393 let exchange = Exchange::new(message);
1394
1395 if let Err(e) = context.send(exchange).await {
1396 tracing::error!("Failed to send exchange: {:?}", e);
1397 break;
1398 }
1399 }
1400 Some(Err(e)) => {
1401 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1402 break;
1403 }
1404 None => {
1405 tracing::info!("Docker event stream ended. Reconnecting...");
1406 break;
1407 }
1408 }
1409 }
1410 }
1411 }
1412
1413 tokio::select! {
1414 _ = context.cancelled() => {
1415 tracing::info!("Container events consumer shutting down");
1416 return Ok(());
1417 }
1418 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1419 }
1420 }
1421 }
1422
1423 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1424 use futures::StreamExt;
1425
1426 let container_id = self.config.container_id.clone().ok_or_else(|| {
1427 CamelError::EndpointCreationFailed(
1428 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1429 .to_string(),
1430 )
1431 })?;
1432
1433 loop {
1434 if context.is_cancelled() {
1435 tracing::info!("Container logs consumer shutting down");
1436 return Ok(());
1437 }
1438
1439 let docker = match self.config.connect_docker().await {
1440 Ok(d) => d,
1441 Err(e) => {
1442 tracing::error!(
1443 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1444 e
1445 );
1446 tokio::select! {
1447 _ = context.cancelled() => {
1448 tracing::info!("Container logs consumer shutting down");
1449 return Ok(());
1450 }
1451 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1452 }
1453 continue;
1454 }
1455 };
1456
1457 let tail = self
1458 .config
1459 .tail
1460 .clone()
1461 .unwrap_or_else(|| "all".to_string());
1462
1463 let options = bollard::container::LogsOptions::<String> {
1464 follow: self.config.follow,
1465 stdout: true,
1466 stderr: true,
1467 timestamps: self.config.timestamps,
1468 tail,
1469 ..Default::default()
1470 };
1471
1472 let mut log_stream = docker.logs(&container_id, Some(options));
1473 let container_id_header = container_id.clone();
1474
1475 loop {
1476 tokio::select! {
1477 _ = context.cancelled() => {
1478 tracing::info!("Container logs consumer shutting down");
1479 return Ok(());
1480 }
1481
1482 msg = log_stream.next() => {
1483 match msg {
1484 Some(Ok(log_output)) => {
1485 let (stream_type, content) = match log_output {
1486 bollard::container::LogOutput::StdOut { message } => {
1487 ("stdout", String::from_utf8_lossy(&message).into_owned())
1488 }
1489 bollard::container::LogOutput::StdErr { message } => {
1490 ("stderr", String::from_utf8_lossy(&message).into_owned())
1491 }
1492 bollard::container::LogOutput::Console { message } => {
1493 ("console", String::from_utf8_lossy(&message).into_owned())
1494 }
1495 bollard::container::LogOutput::StdIn { message } => {
1496 ("stdin", String::from_utf8_lossy(&message).into_owned())
1497 }
1498 };
1499
1500 let content = content.trim_end();
1501 if content.is_empty() {
1502 continue;
1503 }
1504
1505 let mut message = Message::new(Body::Text(content.to_string()));
1506 message.set_header(
1507 HEADER_CONTAINER_ID,
1508 serde_json::Value::String(container_id_header.clone()),
1509 );
1510 message.set_header(
1511 HEADER_LOG_STREAM,
1512 serde_json::Value::String(stream_type.to_string()),
1513 );
1514
1515 if self.config.timestamps
1516 && let Some(ts) = extract_timestamp(content) {
1517 message.set_header(
1518 HEADER_LOG_TIMESTAMP,
1519 serde_json::Value::String(ts),
1520 );
1521 }
1522
1523 let exchange = Exchange::new(message);
1524
1525 if let Err(e) = context.send(exchange).await {
1526 tracing::error!("Failed to send log exchange: {:?}", e);
1527 break;
1528 }
1529 }
1530 Some(Err(e)) => {
1531 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1532 break;
1533 }
1534 None => {
1535 if self.config.follow {
1536 tracing::info!("Docker log stream ended. Reconnecting...");
1537 break;
1538 } else {
1539 tracing::info!("Container logs consumer finished (follow=false)");
1540 return Ok(());
1541 }
1542 }
1543 }
1544 }
1545 }
1546 }
1547
1548 tokio::select! {
1549 _ = context.cancelled() => {
1550 tracing::info!("Container logs consumer shutting down");
1551 return Ok(());
1552 }
1553 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1554 }
1555 }
1556 }
1557}
1558
1559fn extract_timestamp(log_line: &str) -> Option<String> {
1560 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1561 if parts.len() > 1 && parts[0].contains('T') {
1562 Some(parts[0].to_string())
1563 } else {
1564 None
1565 }
1566}
1567
1568pub struct ContainerComponent {
1576 config: Option<ContainerGlobalConfig>,
1577}
1578
1579impl ContainerComponent {
1580 pub fn new() -> Self {
1582 Self { config: None }
1583 }
1584
1585 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1587 Self {
1588 config: Some(config),
1589 }
1590 }
1591
1592 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1594 Self { config }
1595 }
1596}
1597
1598impl Default for ContainerComponent {
1599 fn default() -> Self {
1600 Self::new()
1601 }
1602}
1603
1604impl Component for ContainerComponent {
1605 fn scheme(&self) -> &str {
1606 "container"
1607 }
1608
1609 fn create_endpoint(
1610 &self,
1611 uri: &str,
1612 _ctx: &dyn camel_component_api::ComponentContext,
1613 ) -> Result<Box<dyn Endpoint>, CamelError> {
1614 let mut config = ContainerConfig::from_uri(uri)?;
1615 if let Some(ref global) = self.config {
1617 config.apply_global_defaults(global);
1618 }
1619 Ok(Box::new(ContainerEndpoint {
1620 uri: uri.to_string(),
1621 config,
1622 }))
1623 }
1624}
1625
1626pub struct ContainerEndpoint {
1631 uri: String,
1632 config: ContainerConfig,
1633}
1634
1635impl ContainerEndpoint {
1636 pub fn docker_host(&self) -> Option<&str> {
1639 self.config.host.as_deref()
1640 }
1641}
1642
1643impl Endpoint for ContainerEndpoint {
1644 fn uri(&self) -> &str {
1645 &self.uri
1646 }
1647
1648 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1649 Ok(Box::new(ContainerConsumer {
1650 config: self.config.clone(),
1651 }))
1652 }
1653
1654 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1655 let docker = self.config.connect_docker_client()?;
1656 Ok(BoxProcessor::new(ContainerProducer {
1657 config: self.config.clone(),
1658 docker,
1659 }))
1660 }
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665 use super::*;
1666 use camel_component_api::NoOpComponentContext;
1667
1668 #[test]
1669 fn test_container_config() {
1670 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1671 assert_eq!(config.operation, "run");
1672 assert_eq!(config.image.as_deref(), Some("alpine"));
1673 assert!(config.host.is_none());
1675 }
1676
1677 #[test]
1678 fn test_global_config_applied_to_endpoint() {
1679 let global =
1682 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1683 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1684 assert!(
1685 config.host.is_none(),
1686 "URI without ?host= should leave host as None"
1687 );
1688 config.apply_global_defaults(&global);
1689 assert_eq!(
1690 config.host.as_deref(),
1691 Some("unix:///custom/docker.sock"),
1692 "global docker_host must be applied when URI did not set host"
1693 );
1694 }
1695
1696 #[test]
1697 fn test_uri_param_wins_over_global_config() {
1698 let global =
1700 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1701 let mut config =
1702 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1703 .unwrap();
1704 assert_eq!(
1705 config.host.as_deref(),
1706 Some("unix:///override.sock"),
1707 "URI-set host should be parsed correctly"
1708 );
1709 config.apply_global_defaults(&global);
1710 assert_eq!(
1711 config.host.as_deref(),
1712 Some("unix:///override.sock"),
1713 "global config must NOT override a host already set by URI"
1714 );
1715 }
1716
1717 #[test]
1718 fn test_container_config_parses_name() {
1719 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1720 assert_eq!(config.name.as_deref(), Some("my-container"));
1721 }
1722
1723 #[test]
1724 fn test_parse_producer_operation_known() {
1725 assert_eq!(
1726 parse_producer_operation("list").unwrap(),
1727 ProducerOperation::List
1728 );
1729 assert_eq!(
1730 parse_producer_operation("run").unwrap(),
1731 ProducerOperation::Run
1732 );
1733 assert_eq!(
1734 parse_producer_operation("start").unwrap(),
1735 ProducerOperation::Start
1736 );
1737 assert_eq!(
1738 parse_producer_operation("stop").unwrap(),
1739 ProducerOperation::Stop
1740 );
1741 assert_eq!(
1742 parse_producer_operation("remove").unwrap(),
1743 ProducerOperation::Remove
1744 );
1745 }
1746
1747 #[test]
1748 fn test_parse_producer_operation_unknown() {
1749 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1750 match err {
1751 CamelError::ProcessorError(msg) => {
1752 assert!(
1753 msg.contains("Unknown container operation"),
1754 "Unexpected error message: {}",
1755 msg
1756 );
1757 }
1758 _ => panic!("Expected ProcessorError for unknown operation"),
1759 }
1760 }
1761
1762 #[test]
1763 fn test_parse_producer_operation_new_variants() {
1764 assert_eq!(
1765 parse_producer_operation("exec").unwrap(),
1766 ProducerOperation::Exec
1767 );
1768 assert_eq!(
1769 parse_producer_operation("network-create").unwrap(),
1770 ProducerOperation::NetworkCreate
1771 );
1772 assert_eq!(
1773 parse_producer_operation("network-connect").unwrap(),
1774 ProducerOperation::NetworkConnect
1775 );
1776 assert_eq!(
1777 parse_producer_operation("network-disconnect").unwrap(),
1778 ProducerOperation::NetworkDisconnect
1779 );
1780 assert_eq!(
1781 parse_producer_operation("network-remove").unwrap(),
1782 ProducerOperation::NetworkRemove
1783 );
1784 assert_eq!(
1785 parse_producer_operation("network-list").unwrap(),
1786 ProducerOperation::NetworkList
1787 );
1788 }
1789
1790 #[test]
1791 fn test_resolve_container_name_header_overrides_config() {
1792 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1793 let mut exchange = Exchange::new(Message::new(""));
1794 exchange.input.set_header(
1795 HEADER_CONTAINER_NAME,
1796 serde_json::Value::String("header-name".to_string()),
1797 );
1798
1799 let resolved = resolve_container_name(&exchange, &config);
1800 assert_eq!(resolved.as_deref(), Some("header-name"));
1801 }
1802
1803 #[test]
1804 fn test_container_config_rejects_tcp_host() {
1805 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1806 let err = config.connect_docker_client().unwrap_err();
1807 match err {
1808 CamelError::ProcessorError(msg) => {
1809 assert!(
1810 msg.to_lowercase().contains("tcp"),
1811 "Expected TCP scheme error, got: {}",
1812 msg
1813 );
1814 }
1815 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1816 }
1817 }
1818
1819 #[tokio::test]
1820 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1821 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1822 let remove_called_clone = remove_called.clone();
1823
1824 let result = run_container_with_cleanup(
1825 || async { Ok("container-123".to_string()) },
1826 |_id| async move {
1827 Err(CamelError::ProcessorError(
1828 "Failed to start container".to_string(),
1829 ))
1830 },
1831 move |_id| {
1832 let remove_called_inner = remove_called_clone.clone();
1833 async move {
1834 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1835 Ok(())
1836 }
1837 },
1838 )
1839 .await;
1840
1841 assert!(result.is_err(), "Expected start failure to bubble up");
1842 assert!(
1843 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1844 "Expected cleanup to remove container"
1845 );
1846 }
1847
1848 #[test]
1849 fn test_container_component_creates_endpoint() {
1850 let component = ContainerComponent::new();
1851 assert_eq!(component.scheme(), "container");
1852 let ctx = NoOpComponentContext;
1853 let endpoint = component
1854 .create_endpoint("container:run?image=alpine", &ctx)
1855 .unwrap();
1856 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1857 }
1858
1859 #[test]
1860 fn test_container_config_parses_ports() {
1861 let config =
1862 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1863 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1864 }
1865
1866 #[test]
1867 fn test_container_config_parses_env() {
1868 let config =
1869 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1870 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1871 }
1872
1873 #[test]
1874 fn test_container_config_parses_logs_options() {
1875 let config = ContainerConfig::from_uri(
1876 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1877 )
1878 .unwrap();
1879 assert_eq!(config.operation, "logs");
1880 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1881 assert!(config.follow);
1882 assert!(config.timestamps);
1883 assert_eq!(config.tail.as_deref(), Some("100"));
1884 }
1885
1886 #[test]
1887 fn test_container_config_logs_defaults() {
1888 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1889 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1893
1894 #[test]
1895 fn test_parse_ports_single() {
1896 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1897 let (exposed, bindings) = config.parse_ports().unwrap();
1898
1899 assert!(exposed.contains_key("80/tcp"));
1900 assert!(bindings.contains_key("80/tcp"));
1901
1902 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1903 assert_eq!(binding.len(), 1);
1904 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1905 }
1906
1907 #[test]
1908 fn test_parse_ports_multiple() {
1909 let config =
1910 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1911 let (exposed, bindings) = config.parse_ports().unwrap();
1912
1913 assert!(exposed.contains_key("80/tcp"));
1914 assert!(exposed.contains_key("443/tcp"));
1915 assert_eq!(bindings.len(), 2);
1916 }
1917
1918 #[test]
1919 fn test_parse_ports_with_protocol() {
1920 let config =
1921 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1922 .unwrap();
1923 let (exposed, _bindings) = config.parse_ports().unwrap();
1924
1925 assert!(exposed.contains_key("80/tcp"));
1926 assert!(exposed.contains_key("53/udp"));
1927 }
1928
1929 #[test]
1930 fn test_parse_ports_none() {
1931 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1932 assert!(config.parse_ports().is_none());
1933 }
1934
1935 #[test]
1936 fn test_parse_env_single() {
1937 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1938 let env = config.parse_env().unwrap();
1939
1940 assert_eq!(env.len(), 1);
1941 assert_eq!(env[0], "FOO=bar");
1942 }
1943
1944 #[test]
1945 fn test_parse_env_multiple() {
1946 let config =
1947 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1948 .unwrap();
1949 let env = config.parse_env().unwrap();
1950
1951 assert_eq!(env.len(), 3);
1952 assert!(env.contains(&"FOO=bar".to_string()));
1953 assert!(env.contains(&"BAZ=qux".to_string()));
1954 assert!(env.contains(&"NUM=123".to_string()));
1955 }
1956
1957 #[test]
1958 fn test_parse_env_none() {
1959 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1960 assert!(config.parse_env().is_none());
1961 }
1962
1963 use camel_component_api::Message;
1964 use std::sync::Arc;
1965
1966 #[tokio::test]
1967 async fn test_container_producer_resolves_operation_from_header() {
1968 let docker = match Docker::connect_with_local_defaults() {
1970 Ok(d) => d,
1971 Err(_) => {
1972 eprintln!("Skipping test: Could not connect to Docker daemon");
1973 return;
1974 }
1975 };
1976
1977 if docker.ping().await.is_err() {
1978 eprintln!("Skipping test: Docker daemon not responding to ping");
1979 return;
1980 }
1981
1982 let component = ContainerComponent::new();
1983 let ctx = NoOpComponentContext;
1984 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1985
1986 let ctx = ProducerContext::new();
1987 let mut producer = endpoint.create_producer(&ctx).unwrap();
1988
1989 let mut exchange = Exchange::new(Message::new(""));
1990 exchange
1991 .input
1992 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1993
1994 use tower::ServiceExt;
1995 let result = producer
1996 .ready()
1997 .await
1998 .unwrap()
1999 .call(exchange)
2000 .await
2001 .unwrap();
2002
2003 assert_eq!(
2005 result
2006 .input
2007 .header(HEADER_ACTION_RESULT)
2008 .map(|v| v.as_str().unwrap()),
2009 Some("success")
2010 );
2011 }
2012
2013 #[tokio::test]
2014 async fn test_container_producer_connection_error_on_invalid_host() {
2015 let component = ContainerComponent::new();
2017 let ctx = NoOpComponentContext;
2018 let endpoint = component
2019 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2020 .unwrap();
2021
2022 let ctx = ProducerContext::new();
2023 let result = endpoint.create_producer(&ctx);
2024
2025 assert!(
2027 result.is_err(),
2028 "Expected error when connecting to invalid host"
2029 );
2030 let err = result.unwrap_err();
2031 match &err {
2032 CamelError::ProcessorError(msg) => {
2033 assert!(
2034 msg.to_lowercase().contains("connection")
2035 || msg.to_lowercase().contains("connect")
2036 || msg.to_lowercase().contains("socket")
2037 || msg.contains("docker"),
2038 "Error message should indicate connection failure, got: {}",
2039 msg
2040 );
2041 }
2042 _ => panic!("Expected ProcessorError, got: {:?}", err),
2043 }
2044 }
2045
2046 #[tokio::test]
2048 async fn test_container_producer_lifecycle_operations_missing_id() {
2049 let docker = match Docker::connect_with_local_defaults() {
2051 Ok(d) => d,
2052 Err(_) => {
2053 eprintln!("Skipping test: Could not connect to Docker daemon");
2054 return;
2055 }
2056 };
2057
2058 if docker.ping().await.is_err() {
2059 eprintln!("Skipping test: Docker daemon not responding to ping");
2060 return;
2061 }
2062
2063 let component = ContainerComponent::new();
2064 let ctx = NoOpComponentContext;
2065 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2066 let ctx = ProducerContext::new();
2067 let mut producer = endpoint.create_producer(&ctx).unwrap();
2068
2069 for operation in ["start", "stop", "remove"] {
2071 let mut exchange = Exchange::new(Message::new(""));
2072 exchange.input.set_header(
2073 HEADER_ACTION,
2074 serde_json::Value::String(operation.to_string()),
2075 );
2076 use tower::ServiceExt;
2079 let result = producer.ready().await.unwrap().call(exchange).await;
2080
2081 assert!(
2082 result.is_err(),
2083 "Expected error for {} operation without CamelContainerId",
2084 operation
2085 );
2086 let err = result.unwrap_err();
2087 match &err {
2088 CamelError::ProcessorError(msg) => {
2089 assert!(
2090 msg.contains(HEADER_CONTAINER_ID),
2091 "Error message should mention {}, got: {}",
2092 HEADER_CONTAINER_ID,
2093 msg
2094 );
2095 }
2096 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2097 }
2098 }
2099 }
2100
2101 #[tokio::test]
2103 async fn test_container_producer_stop_nonexistent() {
2104 let docker = match Docker::connect_with_local_defaults() {
2106 Ok(d) => d,
2107 Err(_) => {
2108 eprintln!("Skipping test: Could not connect to Docker daemon");
2109 return;
2110 }
2111 };
2112
2113 if docker.ping().await.is_err() {
2114 eprintln!("Skipping test: Docker daemon not responding to ping");
2115 return;
2116 }
2117
2118 let component = ContainerComponent::new();
2119 let ctx = NoOpComponentContext;
2120 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2121 let ctx = ProducerContext::new();
2122 let mut producer = endpoint.create_producer(&ctx).unwrap();
2123
2124 let mut exchange = Exchange::new(Message::new(""));
2125 exchange
2126 .input
2127 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2128 exchange.input.set_header(
2129 HEADER_CONTAINER_ID,
2130 serde_json::Value::String("nonexistent-container-123".into()),
2131 );
2132
2133 use tower::ServiceExt;
2134 let result = producer.ready().await.unwrap().call(exchange).await;
2135
2136 assert!(
2137 result.is_err(),
2138 "Expected error when stopping nonexistent container"
2139 );
2140 let err = result.unwrap_err();
2141 match &err {
2142 CamelError::ProcessorError(msg) => {
2143 assert!(
2145 msg.to_lowercase().contains("no such container")
2146 || msg.to_lowercase().contains("not found")
2147 || msg.contains("404"),
2148 "Error message should indicate container not found, got: {}",
2149 msg
2150 );
2151 }
2152 _ => panic!("Expected ProcessorError, got: {:?}", err),
2153 }
2154 }
2155
2156 #[tokio::test]
2158 async fn test_container_producer_run_missing_image() {
2159 let docker = match Docker::connect_with_local_defaults() {
2161 Ok(d) => d,
2162 Err(_) => {
2163 eprintln!("Skipping test: Could not connect to Docker daemon");
2164 return;
2165 }
2166 };
2167
2168 if docker.ping().await.is_err() {
2169 eprintln!("Skipping test: Docker daemon not responding to ping");
2170 return;
2171 }
2172
2173 let component = ContainerComponent::new();
2175 let ctx = NoOpComponentContext;
2176 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2177 let ctx = ProducerContext::new();
2178 let mut producer = endpoint.create_producer(&ctx).unwrap();
2179
2180 let mut exchange = Exchange::new(Message::new(""));
2181 exchange
2182 .input
2183 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2184 use tower::ServiceExt;
2187 let result = producer.ready().await.unwrap().call(exchange).await;
2188
2189 assert!(
2190 result.is_err(),
2191 "Expected error for run operation without image"
2192 );
2193 let err = result.unwrap_err();
2194 match &err {
2195 CamelError::ProcessorError(msg) => {
2196 assert!(
2197 msg.to_lowercase().contains("image"),
2198 "Error message should mention 'image', got: {}",
2199 msg
2200 );
2201 }
2202 _ => panic!("Expected ProcessorError, got: {:?}", err),
2203 }
2204 }
2205
2206 #[tokio::test]
2208 async fn test_container_producer_run_image_from_header() {
2209 let docker = match Docker::connect_with_local_defaults() {
2211 Ok(d) => d,
2212 Err(_) => {
2213 eprintln!("Skipping test: Could not connect to Docker daemon");
2214 return;
2215 }
2216 };
2217
2218 if docker.ping().await.is_err() {
2219 eprintln!("Skipping test: Docker daemon not responding to ping");
2220 return;
2221 }
2222
2223 let component = ContainerComponent::new();
2225 let ctx = NoOpComponentContext;
2226 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2227 let ctx = ProducerContext::new();
2228 let mut producer = endpoint.create_producer(&ctx).unwrap();
2229
2230 let mut exchange = Exchange::new(Message::new(""));
2231 exchange
2232 .input
2233 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2234 exchange.input.set_header(
2236 HEADER_IMAGE,
2237 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2238 );
2239
2240 use tower::ServiceExt;
2241 let result = producer.ready().await.unwrap().call(exchange).await;
2242
2243 assert!(
2245 result.is_err(),
2246 "Expected error when running container with nonexistent image"
2247 );
2248 let err = result.unwrap_err();
2249 match &err {
2250 CamelError::ProcessorError(msg) => {
2251 assert!(
2253 msg.to_lowercase().contains("no such image")
2254 || msg.to_lowercase().contains("not found")
2255 || msg.to_lowercase().contains("image")
2256 || msg.to_lowercase().contains("pull")
2257 || msg.contains("404"),
2258 "Error message should indicate image issue, got: {}",
2259 msg
2260 );
2261 }
2262 _ => panic!("Expected ProcessorError, got: {:?}", err),
2263 }
2264 }
2265
2266 #[tokio::test]
2269 async fn test_container_producer_run_alpine_container() {
2270 let docker = match Docker::connect_with_local_defaults() {
2271 Ok(d) => d,
2272 Err(_) => {
2273 eprintln!("Skipping test: Could not connect to Docker daemon");
2274 return;
2275 }
2276 };
2277
2278 if docker.ping().await.is_err() {
2279 eprintln!("Skipping test: Docker daemon not responding to ping");
2280 return;
2281 }
2282
2283 let images = docker.list_images::<&str>(None).await.unwrap();
2285 let has_alpine = images
2286 .iter()
2287 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2288
2289 if !has_alpine {
2290 eprintln!("Pulling alpine:latest image...");
2291 let mut stream = docker.create_image(
2292 Some(bollard::image::CreateImageOptions {
2293 from_image: "alpine:latest",
2294 ..Default::default()
2295 }),
2296 None,
2297 None,
2298 );
2299
2300 use futures::StreamExt;
2301 while let Some(_item) = stream.next().await {
2302 }
2304 eprintln!("Image pulled successfully");
2305 }
2306
2307 let component = ContainerComponent::new();
2309 let ctx = NoOpComponentContext;
2310 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2311 let ctx = ProducerContext::new();
2312 let mut producer = endpoint.create_producer(&ctx).unwrap();
2313
2314 let timestamp = std::time::SystemTime::now()
2316 .duration_since(std::time::UNIX_EPOCH)
2317 .unwrap()
2318 .as_millis();
2319 let container_name = format!("test-rust-camel-{}", timestamp);
2320 let mut exchange = Exchange::new(Message::new(""));
2321 exchange.input.set_header(
2322 HEADER_IMAGE,
2323 serde_json::Value::String("alpine:latest".into()),
2324 );
2325 exchange.input.set_header(
2326 HEADER_CONTAINER_NAME,
2327 serde_json::Value::String(container_name.clone()),
2328 );
2329
2330 use tower::ServiceExt;
2331 let result = producer
2332 .ready()
2333 .await
2334 .unwrap()
2335 .call(exchange)
2336 .await
2337 .expect("Container run should succeed");
2338
2339 let container_id = result
2341 .input
2342 .header(HEADER_CONTAINER_ID)
2343 .and_then(|v| v.as_str().map(|s| s.to_string()))
2344 .expect("Expected container ID header");
2345 assert!(!container_id.is_empty(), "Container ID should not be empty");
2346
2347 assert_eq!(
2349 result
2350 .input
2351 .header(HEADER_ACTION_RESULT)
2352 .and_then(|v| v.as_str()),
2353 Some("success")
2354 );
2355
2356 let inspect = docker
2358 .inspect_container(&container_id, None)
2359 .await
2360 .expect("Container should exist");
2361 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2362
2363 docker
2365 .remove_container(
2366 &container_id,
2367 Some(bollard::container::RemoveContainerOptions {
2368 force: true,
2369 ..Default::default()
2370 }),
2371 )
2372 .await
2373 .ok();
2374
2375 eprintln!("✅ Container {} created and cleaned up", container_id);
2376 }
2377
2378 #[tokio::test]
2380 async fn test_container_consumer_unsupported_operation() {
2381 use tokio::sync::mpsc;
2382
2383 let component = ContainerComponent::new();
2384 let ctx = NoOpComponentContext;
2385 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2386 let mut consumer = endpoint.create_consumer().unwrap();
2387
2388 let (tx, _rx) = mpsc::channel(16);
2390 let cancel_token = tokio_util::sync::CancellationToken::new();
2391 let context = ConsumerContext::new(tx, cancel_token);
2392
2393 let result = consumer.start(context).await;
2394
2395 assert!(
2397 result.is_err(),
2398 "Expected error for unsupported consumer operation"
2399 );
2400 let err = result.unwrap_err();
2401 match &err {
2402 CamelError::EndpointCreationFailed(msg) => {
2403 assert!(
2404 msg.contains("Consumer only supports 'events' or 'logs'"),
2405 "Error message should mention events or logs support, got: {}",
2406 msg
2407 );
2408 }
2409 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2410 }
2411 }
2412
2413 #[test]
2414 fn test_container_consumer_concurrency_model_is_concurrent() {
2415 let consumer = ContainerConsumer {
2416 config: ContainerConfig::from_uri("container:events").unwrap(),
2417 };
2418
2419 assert_eq!(
2420 consumer.concurrency_model(),
2421 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2422 );
2423 }
2424
2425 #[tokio::test]
2429 async fn test_container_consumer_cancellation() {
2430 use std::sync::atomic::{AtomicBool, Ordering};
2431 use tokio::sync::mpsc;
2432
2433 let docker = match Docker::connect_with_local_defaults() {
2435 Ok(d) => d,
2436 Err(_) => {
2437 eprintln!("Skipping test: Could not connect to Docker daemon");
2438 return;
2439 }
2440 };
2441
2442 if docker.ping().await.is_err() {
2443 eprintln!("Skipping test: Docker daemon not responding to ping");
2444 return;
2445 }
2446
2447 let component = ContainerComponent::new();
2448 let ctx = NoOpComponentContext;
2449 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2450 let mut consumer = endpoint.create_consumer().unwrap();
2451
2452 let (tx, _rx) = mpsc::channel(16);
2454 let cancel_token = tokio_util::sync::CancellationToken::new();
2455 let context = ConsumerContext::new(tx, cancel_token.clone());
2456
2457 let completed = Arc::new(AtomicBool::new(false));
2459 let completed_clone = completed.clone();
2460
2461 let handle = tokio::spawn(async move {
2463 let result = consumer.start(context).await;
2464 completed_clone.store(true, Ordering::SeqCst);
2466 result
2467 });
2468
2469 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2471
2472 assert!(
2474 !completed.load(Ordering::SeqCst),
2475 "Consumer should still be running before cancellation"
2476 );
2477
2478 cancel_token.cancel();
2480
2481 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2483
2484 assert!(
2486 result.is_ok(),
2487 "Consumer should gracefully shut down after cancellation"
2488 );
2489
2490 assert!(
2492 completed.load(Ordering::SeqCst),
2493 "Consumer should have completed after cancellation"
2494 );
2495 }
2496
2497 #[tokio::test]
2501 async fn test_container_producer_list_containers() {
2502 let docker = match Docker::connect_with_local_defaults() {
2505 Ok(d) => d,
2506 Err(_) => {
2507 eprintln!("Skipping test: Could not connect to Docker daemon");
2508 return;
2509 }
2510 };
2511
2512 if docker.ping().await.is_err() {
2513 eprintln!("Skipping test: Docker daemon not responding to ping");
2514 return;
2515 }
2516
2517 let component = ContainerComponent::new();
2519 let ctx = NoOpComponentContext;
2520 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2521
2522 let ctx = ProducerContext::new();
2523 let mut producer = endpoint.create_producer(&ctx).unwrap();
2524
2525 let mut exchange = Exchange::new(Message::new(""));
2527 exchange
2528 .input
2529 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2530
2531 use tower::ServiceExt;
2533 let result = producer
2534 .ready()
2535 .await
2536 .unwrap()
2537 .call(exchange)
2538 .await
2539 .expect("Producer should succeed when Docker is available");
2540
2541 match &result.input.body {
2544 camel_component_api::Body::Json(json_value) => {
2545 assert!(
2546 json_value.is_array(),
2547 "Expected input body to be a JSON array, got: {:?}",
2548 json_value
2549 );
2550 }
2551 other => panic!("Expected Body::Json with array, got: {:?}", other),
2552 }
2553 }
2554
2555 #[test]
2556 fn test_container_config_parses_volumes() {
2557 let config = ContainerConfig::from_uri(
2558 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2559 )
2560 .unwrap();
2561 assert_eq!(
2562 config.volumes.as_deref(),
2563 Some("./html:/usr/share/nginx/html:ro")
2564 );
2565 }
2566
2567 #[test]
2568 fn test_container_config_parses_exec_params() {
2569 let config = ContainerConfig::from_uri(
2570 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2571 )
2572 .unwrap();
2573 assert_eq!(config.operation, "exec");
2574 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2575 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2576 assert_eq!(config.user.as_deref(), Some("root"));
2577 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2578 assert!(config.detach);
2579 }
2580
2581 #[test]
2582 fn test_container_config_parses_network_create_params() {
2583 let config =
2584 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2585 .unwrap();
2586 assert_eq!(config.operation, "network-create");
2587 assert_eq!(config.name.as_deref(), Some("my-net"));
2588 assert_eq!(config.driver.as_deref(), Some("bridge"));
2589 }
2590
2591 #[test]
2592 fn test_container_config_defaults_new_fields() {
2593 let config = ContainerConfig::from_uri("container:list").unwrap();
2594 assert!(config.volumes.is_none());
2595 assert!(config.user.is_none());
2596 assert!(config.workdir.is_none());
2597 assert!(!config.detach);
2598 assert!(config.driver.is_none());
2599 assert!(!config.force);
2600 }
2601
2602 #[test]
2603 fn test_parse_volumes_bind_mount() {
2604 let config = ContainerConfig::from_uri(
2605 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2606 )
2607 .unwrap();
2608 let (binds, anon) = config.parse_volumes().unwrap();
2609 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2610 assert!(anon.is_empty());
2611 }
2612
2613 #[test]
2614 fn test_parse_volumes_named_volume() {
2615 let config =
2616 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2617 .unwrap();
2618 let (binds, anon) = config.parse_volumes().unwrap();
2619 assert_eq!(binds, vec!["data:/var/lib/data"]);
2620 assert!(anon.is_empty());
2621 }
2622
2623 #[test]
2624 fn test_parse_volumes_anonymous() {
2625 let config =
2626 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2627 let (binds, anon) = config.parse_volumes().unwrap();
2628 assert!(binds.is_empty());
2629 assert!(anon.contains_key("/tmp/app-data"));
2630 }
2631
2632 #[test]
2633 fn test_parse_volumes_anonymous_with_mode() {
2634 let config =
2635 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2636 .unwrap();
2637 let (binds, anon) = config.parse_volumes().unwrap();
2638 assert!(binds.is_empty());
2639 assert!(anon.contains_key("/tmp/app-data"));
2640 }
2641
2642 #[test]
2643 fn test_parse_volumes_multiple() {
2644 let config = ContainerConfig::from_uri(
2645 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2646 )
2647 .unwrap();
2648 let (binds, anon) = config.parse_volumes().unwrap();
2649 assert_eq!(binds.len(), 2);
2650 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2651 assert!(binds.contains(&"data:/var/log/app".to_string()));
2652 assert!(anon.is_empty());
2653 }
2654
2655 #[test]
2656 fn test_parse_volumes_mixed() {
2657 let config = ContainerConfig::from_uri(
2658 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2659 )
2660 .unwrap();
2661 let (binds, anon) = config.parse_volumes().unwrap();
2662 assert_eq!(binds.len(), 1);
2663 assert!(anon.contains_key("/tmp/cache"));
2664 }
2665
2666 #[test]
2667 fn test_parse_volumes_none() {
2668 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2669 assert!(config.parse_volumes().is_none());
2670 }
2671
2672 #[test]
2673 fn test_parse_volumes_empty_entry_skipped() {
2674 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2675 assert!(config.parse_volumes().is_none());
2676 }
2677
2678 #[test]
2679 fn test_parse_volumes_rw_mode() {
2680 let config =
2681 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2682 .unwrap();
2683 let (binds, _) = config.parse_volumes().unwrap();
2684 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2685 }
2686
2687 #[tokio::test]
2688 async fn test_container_producer_network_lifecycle() {
2689 let docker = match Docker::connect_with_local_defaults() {
2690 Ok(d) => d,
2691 Err(_) => {
2692 eprintln!("Skipping test: Could not connect to Docker daemon");
2693 return;
2694 }
2695 };
2696 if docker.ping().await.is_err() {
2697 eprintln!("Skipping test: Docker daemon not responding to ping");
2698 return;
2699 }
2700
2701 let network_name = format!("camel-test-{}", std::process::id());
2702
2703 let component = ContainerComponent::new();
2704 let component_ctx = NoOpComponentContext;
2705
2706 let endpoint = component
2708 .create_endpoint(
2709 &format!("container:network-create?name={}", network_name),
2710 &component_ctx,
2711 )
2712 .unwrap();
2713 let ctx = ProducerContext::new();
2714 let mut producer = endpoint.create_producer(&ctx).unwrap();
2715
2716 let mut exchange = Exchange::new(Message::new(""));
2717 exchange.input.set_header(
2718 HEADER_ACTION,
2719 serde_json::Value::String("network-create".into()),
2720 );
2721
2722 use tower::ServiceExt;
2723 let result = producer
2724 .ready()
2725 .await
2726 .unwrap()
2727 .call(exchange)
2728 .await
2729 .expect("Network create should succeed");
2730
2731 let network_id = result
2732 .input
2733 .header(HEADER_NETWORK)
2734 .and_then(|v| v.as_str().map(|s| s.to_string()))
2735 .expect("Should have network ID");
2736
2737 assert!(!network_id.is_empty());
2738
2739 let endpoint = component
2741 .create_endpoint("container:network-list", &component_ctx)
2742 .unwrap();
2743 let mut producer = endpoint.create_producer(&ctx).unwrap();
2744
2745 let mut exchange = Exchange::new(Message::new(""));
2746 exchange.input.set_header(
2747 HEADER_ACTION,
2748 serde_json::Value::String("network-list".into()),
2749 );
2750
2751 let list_result = producer
2752 .ready()
2753 .await
2754 .unwrap()
2755 .call(exchange)
2756 .await
2757 .expect("Network list should succeed");
2758
2759 match &list_result.input.body {
2760 Body::Json(json_value) => {
2761 assert!(json_value.is_array(), "Expected JSON array");
2762 }
2763 other => panic!("Expected Body::Json, got: {:?}", other),
2764 }
2765
2766 let endpoint = component
2768 .create_endpoint(
2769 &format!("container:network-remove?network={}", network_name),
2770 &component_ctx,
2771 )
2772 .unwrap();
2773 let mut producer = endpoint.create_producer(&ctx).unwrap();
2774
2775 let mut exchange = Exchange::new(Message::new(""));
2776 exchange.input.set_header(
2777 HEADER_ACTION,
2778 serde_json::Value::String("network-remove".into()),
2779 );
2780
2781 let remove_result = producer
2782 .ready()
2783 .await
2784 .unwrap()
2785 .call(exchange)
2786 .await
2787 .expect("Network remove should succeed");
2788
2789 let action_result = remove_result
2790 .input
2791 .header(HEADER_ACTION_RESULT)
2792 .and_then(|v| v.as_str());
2793 assert_eq!(action_result, Some("success"));
2794 }
2795}