1pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29fn track_container(id: String) {
31 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32 tracker.insert(id);
33 }
34}
35
36fn untrack_container(id: &str) {
38 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39 tracker.remove(id);
40 }
41}
42
43pub async fn cleanup_tracked_containers() {
45 let ids: Vec<String> = {
46 match CONTAINER_TRACKER.lock() {
47 Ok(tracker) => tracker.iter().cloned().collect(),
48 Err(_) => return,
49 }
50 };
51
52 if ids.is_empty() {
53 return;
54 }
55
56 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58 let docker = match Docker::connect_with_local_defaults() {
59 Ok(d) => d,
60 Err(e) => {
61 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62 return;
63 }
64 };
65
66 for id in ids {
67 match docker
68 .remove_container(
69 &id,
70 Some(bollard::container::RemoveContainerOptions {
71 force: true,
72 ..Default::default()
73 }),
74 )
75 .await
76 {
77 Ok(_) => {
78 tracing::debug!("Cleaned up container {}", id);
79 untrack_container(&id);
80 }
81 Err(e) => {
82 tracing::warn!("Failed to cleanup container {}: {}", id, e);
83 }
84 }
85 }
86}
87
88const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114pub const HEADER_CMD: &str = "CamelContainerCmd";
116
117pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
119
120pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
122
123pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
125
126pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
128
129#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
137#[serde(default)]
138pub struct ContainerGlobalConfig {
139 pub docker_host: String,
141}
142
143impl Default for ContainerGlobalConfig {
144 fn default() -> Self {
145 Self {
146 docker_host: "unix:///var/run/docker.sock".to_string(),
147 }
148 }
149}
150
151impl ContainerGlobalConfig {
152 pub fn new() -> Self {
153 Self::default()
154 }
155
156 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
157 self.docker_host = v.into();
158 self
159 }
160}
161
162#[derive(Debug, Clone)]
171pub struct ContainerConfig {
172 pub operation: String,
174 pub image: Option<String>,
176 pub name: Option<String>,
178 pub host: Option<String>,
180 pub cmd: Option<String>,
182 pub ports: Option<String>,
184 pub env: Option<String>,
186 pub network: Option<String>,
188 pub container_id: Option<String>,
190 pub follow: bool,
192 pub timestamps: bool,
194 pub tail: Option<String>,
196 pub auto_pull: bool,
198 pub auto_remove: bool,
200 pub volumes: Option<String>,
202 pub user: Option<String>,
204 pub workdir: Option<String>,
206 pub detach: bool,
208 pub driver: Option<String>,
210 pub force: bool,
212}
213
214impl ContainerConfig {
215 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
223 let parts = parse_uri(uri)?;
224 if parts.scheme != "container" {
225 return Err(CamelError::InvalidUri(format!(
226 "expected scheme 'container', got '{}'",
227 parts.scheme
228 )));
229 }
230
231 let image = parts.params.get("image").cloned();
232 let name = parts.params.get("name").cloned();
233 let cmd = parts.params.get("cmd").cloned();
234 let ports = parts.params.get("ports").cloned();
235 let env = parts.params.get("env").cloned();
236 let network = parts.params.get("network").cloned();
237 let container_id = parts.params.get("containerId").cloned();
238 let follow = parts
239 .params
240 .get("follow")
241 .map(|v| v.eq_ignore_ascii_case("true"))
242 .unwrap_or(true);
243 let timestamps = parts
244 .params
245 .get("timestamps")
246 .map(|v| v.eq_ignore_ascii_case("true"))
247 .unwrap_or(false);
248 let tail = parts.params.get("tail").cloned();
249 let auto_pull = parts
250 .params
251 .get("autoPull")
252 .map(|v| v.eq_ignore_ascii_case("true"))
253 .unwrap_or(true);
254 let auto_remove = parts
255 .params
256 .get("autoRemove")
257 .map(|v| v.eq_ignore_ascii_case("true"))
258 .unwrap_or(true);
259 let host = parts.params.get("host").cloned();
261 let volumes = parts.params.get("volumes").cloned();
262 let user = parts.params.get("user").cloned();
263 let workdir = parts.params.get("workdir").cloned();
264 let detach = parts
265 .params
266 .get("detach")
267 .map(|v| v.eq_ignore_ascii_case("true"))
268 .unwrap_or(false);
269 let driver = parts.params.get("driver").cloned();
270 let force = parts
271 .params
272 .get("force")
273 .map(|v| v.eq_ignore_ascii_case("true"))
274 .unwrap_or(false);
275
276 Ok(Self {
277 operation: parts.path,
278 image,
279 name,
280 host,
281 cmd,
282 ports,
283 env,
284 network,
285 container_id,
286 follow,
287 timestamps,
288 tail,
289 auto_pull,
290 auto_remove,
291 volumes,
292 user,
293 workdir,
294 detach,
295 driver,
296 force,
297 })
298 }
299
300 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
303 if self.host.is_none() {
304 self.host = Some(global.docker_host.clone());
305 }
306 }
307
308 fn docker_socket_path(&self) -> Result<&str, CamelError> {
309 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
310 "npipe:////./pipe/docker_engine"
311 } else {
312 "unix:///var/run/docker.sock"
313 });
314
315 if host.starts_with("unix://") || host.starts_with("npipe://") {
316 return Ok(host);
317 }
318
319 if host.contains("://") {
320 return Err(CamelError::ProcessorError(format!(
321 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
322 host
323 )));
324 }
325
326 Ok(host)
327 }
328
329 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
330 let socket_path = self.docker_socket_path()?;
331 Docker::connect_with_socket(
332 socket_path,
333 DOCKER_CONNECT_TIMEOUT_SECS,
334 bollard::API_DEFAULT_VERSION,
335 )
336 .map_err(|e| {
337 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
338 })
339 }
340
341 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
349 let docker = self.connect_docker_client()?;
350 docker
351 .ping()
352 .await
353 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
354 Ok(docker)
355 }
356
357 #[allow(clippy::type_complexity)]
358 fn parse_ports(
359 &self,
360 ) -> Option<(
361 HashMap<String, HashMap<(), ()>>,
362 HashMap<String, Option<Vec<PortBinding>>>,
363 )> {
364 let ports_str = self.ports.as_ref()?;
365
366 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
367 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
368
369 for mapping in ports_str.split(',') {
370 let mapping = mapping.trim();
371 if mapping.is_empty() {
372 continue;
373 }
374
375 let (host_port, container_spec) = mapping.split_once(':')?;
376
377 let (container_port, protocol) = if container_spec.contains('/') {
378 let parts: Vec<&str> = container_spec.split('/').collect();
379 (parts[0], parts[1])
380 } else {
381 (container_spec, "tcp")
382 };
383
384 let container_key = format!("{}/{}", container_port, protocol);
385
386 exposed_ports.insert(container_key.clone(), HashMap::new());
387
388 port_bindings.insert(
389 container_key,
390 Some(vec![PortBinding {
391 host_ip: None,
392 host_port: Some(host_port.to_string()),
393 }]),
394 );
395 }
396
397 if exposed_ports.is_empty() {
398 None
399 } else {
400 Some((exposed_ports, port_bindings))
401 }
402 }
403
404 fn parse_env(&self) -> Option<Vec<String>> {
405 let env_str = self.env.as_ref()?;
406
407 let env_vars: Vec<String> = env_str
408 .split(',')
409 .map(|s| s.trim().to_string())
410 .filter(|s| !s.is_empty())
411 .collect();
412
413 if env_vars.is_empty() {
414 None
415 } else {
416 Some(env_vars)
417 }
418 }
419
420 #[cfg(test)]
421 #[allow(clippy::type_complexity)]
422 fn parse_volumes(&self) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
423 self.volumes.as_deref().and_then(parse_volume_str)
424 }
425}
426
427#[allow(clippy::type_complexity)]
432fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, HashMap<String, HashMap<(), ()>>)> {
433 let mut binds: Vec<String> = Vec::new();
434 let mut anonymous_volumes: HashMap<String, HashMap<(), ()>> = HashMap::new();
435
436 for entry in volumes_str.split(',') {
437 let entry = entry.trim();
438 if entry.is_empty() {
439 continue;
440 }
441
442 let segments: Vec<&str> = entry.split(':').collect();
443
444 match segments.len() {
445 3 => {
446 let source = segments[0];
447 let target = segments[1];
448 let mode = segments[2];
449 if mode != "ro" && mode != "rw" {
450 continue;
451 }
452 binds.push(format!("{}:{}:{}", source, target, mode));
453 }
454 2 => {
455 let a = segments[0];
456 let b = segments[1];
457 if b == "ro" || b == "rw" {
458 anonymous_volumes.insert(a.to_string(), HashMap::new());
459 } else {
460 binds.push(format!("{}:{}", a, b));
461 }
462 }
463 1 => {
464 anonymous_volumes.insert(segments[0].to_string(), HashMap::new());
465 }
466 _ => continue,
467 }
468 }
469
470 if binds.is_empty() && anonymous_volumes.is_empty() {
471 None
472 } else {
473 Some((binds, anonymous_volumes))
474 }
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
478enum ProducerOperation {
479 List,
480 Run,
481 Start,
482 Stop,
483 Remove,
484 Exec,
485 NetworkCreate,
486 NetworkConnect,
487 NetworkDisconnect,
488 NetworkRemove,
489 NetworkList,
490}
491
492fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
493 match operation {
494 "list" => Ok(ProducerOperation::List),
495 "run" => Ok(ProducerOperation::Run),
496 "start" => Ok(ProducerOperation::Start),
497 "stop" => Ok(ProducerOperation::Stop),
498 "remove" => Ok(ProducerOperation::Remove),
499 "exec" => Ok(ProducerOperation::Exec),
500 "network-create" => Ok(ProducerOperation::NetworkCreate),
501 "network-connect" => Ok(ProducerOperation::NetworkConnect),
502 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
503 "network-remove" => Ok(ProducerOperation::NetworkRemove),
504 "network-list" => Ok(ProducerOperation::NetworkList),
505 _ => Err(CamelError::ProcessorError(format!(
506 "Unknown container operation: {}",
507 operation
508 ))),
509 }
510}
511
512fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
513 exchange
514 .input
515 .header(HEADER_CONTAINER_NAME)
516 .and_then(|v| v.as_str().map(|s| s.to_string()))
517 .or_else(|| config.name.clone())
518}
519
520async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
521 let images = docker
522 .list_images::<&str>(None)
523 .await
524 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
525
526 Ok(images.iter().any(|img| {
527 img.repo_tags
528 .iter()
529 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
530 }))
531}
532
533async fn pull_image_with_progress(
534 docker: &Docker,
535 image: &str,
536 timeout_secs: u64,
537) -> Result<(), CamelError> {
538 use futures::StreamExt;
539
540 tracing::info!("Pulling image: {}", image);
541
542 let mut stream = docker.create_image(
543 Some(bollard::image::CreateImageOptions {
544 from_image: image,
545 ..Default::default()
546 }),
547 None,
548 None,
549 );
550
551 let start = std::time::Instant::now();
552 let mut last_progress = std::time::Instant::now();
553
554 while let Some(item) = stream.next().await {
555 if start.elapsed().as_secs() > timeout_secs {
556 return Err(CamelError::ProcessorError(format!(
557 "Image pull timeout after {}s. Try manually: docker pull {}",
558 timeout_secs, image
559 )));
560 }
561
562 match item {
563 Ok(update) => {
564 if last_progress.elapsed().as_secs() >= 2 {
566 if let Some(status) = update.status {
567 tracing::debug!("Pull progress: {}", status);
568 }
569 last_progress = std::time::Instant::now();
570 }
571 }
572 Err(e) => {
573 let err_str = e.to_string().to_lowercase();
574 if err_str.contains("unauthorized") || err_str.contains("401") {
575 return Err(CamelError::ProcessorError(format!(
576 "Authentication required for image '{}'. Configure Docker credentials: docker login",
577 image
578 )));
579 }
580 if err_str.contains("not found") || err_str.contains("404") {
581 return Err(CamelError::ProcessorError(format!(
582 "Image '{}' not found in registry. Check the image name and tag",
583 image
584 )));
585 }
586 return Err(CamelError::ProcessorError(format!(
587 "Failed to pull image '{}': {}",
588 image, e
589 )));
590 }
591 }
592 }
593
594 tracing::info!("Successfully pulled image: {}", image);
595 Ok(())
596}
597
598async fn ensure_image_available(
599 docker: &Docker,
600 image: &str,
601 auto_pull: bool,
602 timeout_secs: u64,
603) -> Result<(), CamelError> {
604 if image_exists_locally(docker, image).await? {
605 tracing::debug!("Image '{}' already available locally", image);
606 return Ok(());
607 }
608
609 if !auto_pull {
610 return Err(CamelError::ProcessorError(format!(
611 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
612 image, image
613 )));
614 }
615
616 pull_image_with_progress(docker, image, timeout_secs).await
617}
618
619fn format_docker_event(event: &bollard::models::EventMessage) -> String {
620 let action = event.action.as_deref().unwrap_or("unknown");
621 let actor = event.actor.as_ref();
622
623 let container_name = actor
624 .and_then(|a| a.attributes.as_ref())
625 .and_then(|attrs| attrs.get("name"))
626 .map(|s| s.as_str())
627 .unwrap_or("unknown");
628
629 let image = actor
630 .and_then(|a| a.attributes.as_ref())
631 .and_then(|attrs| attrs.get("image"))
632 .map(|s| s.as_str())
633 .unwrap_or("");
634
635 let exit_code = actor
636 .and_then(|a| a.attributes.as_ref())
637 .and_then(|attrs| attrs.get("exitCode"))
638 .map(|s| s.as_str());
639
640 match action {
641 "create" => {
642 if image.is_empty() {
643 format!("[CREATE] Container {}", container_name)
644 } else {
645 format!("[CREATE] Container {} ({})", container_name, image)
646 }
647 }
648 "start" => format!("[START] Container {}", container_name),
649 "die" => {
650 if let Some(code) = exit_code {
651 format!("[DIE] Container {} (exit: {})", container_name, code)
652 } else {
653 format!("[DIE] Container {}", container_name)
654 }
655 }
656 "destroy" => format!("[DESTROY] Container {}", container_name),
657 "stop" => format!("[STOP] Container {}", container_name),
658 "pause" => format!("[PAUSE] Container {}", container_name),
659 "unpause" => format!("[UNPAUSE] Container {}", container_name),
660 "restart" => format!("[RESTART] Container {}", container_name),
661 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
662 }
663}
664
665async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
666 create: CreateFn,
667 start: StartFn,
668 remove: RemoveFn,
669) -> Result<String, CamelError>
670where
671 CreateFn: FnOnce() -> CreateFut,
672 CreateFut: Future<Output = Result<String, CamelError>>,
673 StartFn: FnOnce(String) -> StartFut,
674 StartFut: Future<Output = Result<(), CamelError>>,
675 RemoveFn: FnOnce(String) -> RemoveFut,
676 RemoveFut: Future<Output = Result<(), CamelError>>,
677{
678 let container_id = create().await?;
679 if let Err(start_err) = start(container_id.clone()).await {
680 if let Err(remove_err) = remove(container_id.clone()).await {
681 return Err(CamelError::ProcessorError(format!(
682 "Failed to start container: {}. Cleanup failed: {}",
683 start_err, remove_err
684 )));
685 }
686 return Err(start_err);
687 }
688
689 Ok(container_id)
690}
691
692async fn handle_list(
693 docker: Docker,
694 _config: ContainerConfig,
695 exchange: &mut Exchange,
696) -> Result<(), CamelError> {
697 let containers = docker.list_containers::<String>(None).await.map_err(|e| {
698 CamelError::ProcessorError(format!("Failed to list containers: {}", e))
699 })?;
700
701 let json_value = serde_json::to_value(&containers).map_err(|e| {
702 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
703 })?;
704
705 exchange.input.body = Body::Json(json_value);
706 exchange.input.set_header(
707 HEADER_ACTION_RESULT,
708 serde_json::Value::String("success".to_string()),
709 );
710 Ok(())
711}
712
713async fn handle_run(
714 docker: Docker,
715 config: ContainerConfig,
716 exchange: &mut Exchange,
717) -> Result<(), CamelError> {
718 let image = exchange
719 .input
720 .header(HEADER_IMAGE)
721 .and_then(|v| v.as_str().map(|s| s.to_string()))
722 .or(config.image.clone())
723 .ok_or_else(|| {
724 CamelError::ProcessorError(
725 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
726 )
727 })?;
728
729 let pull_timeout = 300;
730 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
731 .await
732 .map_err(|e| {
733 CamelError::ProcessorError(format!(
734 "Image '{}' not available: {}",
735 image, e
736 ))
737 })?;
738
739 let container_name = resolve_container_name(exchange, &config);
740 let container_name_ref = container_name.as_deref().unwrap_or("");
741 let cmd_parts: Option<Vec<String>> = config
742 .cmd
743 .as_ref()
744 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
745 let auto_remove = config.auto_remove;
746 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
747 let env_vars = config.parse_env();
748 let network_mode = config.network.clone();
749
750 let volumes_str = exchange
751 .input
752 .header(HEADER_VOLUMES)
753 .and_then(|v| v.as_str().map(|s| s.to_string()))
754 .or(config.volumes.clone());
755 let (binds, anon_volumes) = volumes_str
756 .as_deref()
757 .and_then(parse_volume_str)
758 .unwrap_or_default();
759
760 let docker_create = docker.clone();
761 let docker_start = docker.clone();
762 let docker_remove = docker.clone();
763
764 let container_id = run_container_with_cleanup(
765 move || async move {
766 let create_options = bollard::container::CreateContainerOptions {
767 name: container_name_ref,
768 ..Default::default()
769 };
770 let container_config = bollard::container::Config::<String> {
771 image: Some(image.clone()),
772 cmd: cmd_parts,
773 env: env_vars,
774 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
775 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
776 host_config: Some(HostConfig {
777 auto_remove: Some(auto_remove),
778 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
779 network_mode,
780 binds: if binds.is_empty() { None } else { Some(binds) },
781 ..Default::default()
782 }),
783 ..Default::default()
784 };
785
786 let create_response = docker_create
787 .create_container(Some(create_options), container_config)
788 .await
789 .map_err(|e| {
790 let err_str = e.to_string().to_lowercase();
791 if err_str.contains("409") || err_str.contains("conflict") {
792 CamelError::ProcessorError(format!(
793 "Container name '{}' already exists. Use a unique name or remove the existing container first",
794 container_name_ref
795 ))
796 } else {
797 CamelError::ProcessorError(format!(
798 "Failed to create container: {}",
799 e
800 ))
801 }
802 })?;
803
804 Ok(create_response.id)
805 },
806 move |container_id| async move {
807 docker_start
808 .start_container::<String>(&container_id, None)
809 .await
810 .map_err(|e| {
811 CamelError::ProcessorError(format!(
812 "Failed to start container: {}",
813 e
814 ))
815 })
816 },
817 move |container_id| async move {
818 docker_remove
819 .remove_container(&container_id, None)
820 .await
821 .map_err(|e| {
822 CamelError::ProcessorError(format!(
823 "Failed to remove container after start failure: {}",
824 e
825 ))
826 })
827 },
828 )
829 .await?;
830
831 track_container(container_id.clone());
832
833 exchange
834 .input
835 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
836 exchange.input.set_header(
837 HEADER_ACTION_RESULT,
838 serde_json::Value::String("success".to_string()),
839 );
840 Ok(())
841}
842
843async fn handle_lifecycle(
844 docker: Docker,
845 _config: ContainerConfig,
846 exchange: &mut Exchange,
847 operation: ProducerOperation,
848 operation_name: &str,
849) -> Result<(), CamelError> {
850 let container_id = exchange
851 .input
852 .header(HEADER_CONTAINER_ID)
853 .and_then(|v| v.as_str().map(|s| s.to_string()))
854 .ok_or_else(|| {
855 CamelError::ProcessorError(format!(
856 "{} header is required for {} operation",
857 HEADER_CONTAINER_ID, operation_name
858 ))
859 })?;
860
861 match operation {
862 ProducerOperation::Start => {
863 docker
864 .start_container::<String>(&container_id, None)
865 .await
866 .map_err(|e| {
867 CamelError::ProcessorError(format!(
868 "Failed to start container: {}",
869 e
870 ))
871 })?;
872 }
873 ProducerOperation::Stop => {
874 docker
875 .stop_container(&container_id, None)
876 .await
877 .map_err(|e| {
878 CamelError::ProcessorError(format!(
879 "Failed to stop container: {}",
880 e
881 ))
882 })?;
883 }
884 ProducerOperation::Remove => {
885 docker
886 .remove_container(&container_id, None)
887 .await
888 .map_err(|e| {
889 CamelError::ProcessorError(format!(
890 "Failed to remove container: {}",
891 e
892 ))
893 })?;
894 untrack_container(&container_id);
895 }
896 _ => {}
897 }
898
899 exchange.input.set_header(
900 HEADER_ACTION_RESULT,
901 serde_json::Value::String("success".to_string()),
902 );
903 Ok(())
904}
905
906async fn handle_exec(
907 docker: Docker,
908 config: ContainerConfig,
909 exchange: &mut Exchange,
910) -> Result<(), CamelError> {
911 let container_id = exchange
912 .input
913 .header(HEADER_CONTAINER_ID)
914 .and_then(|v| v.as_str().map(|s| s.to_string()))
915 .or(config.container_id.clone())
916 .ok_or_else(|| {
917 CamelError::ProcessorError(format!(
918 "{} header or containerId param is required for exec operation",
919 HEADER_CONTAINER_ID
920 ))
921 })?;
922
923 let cmd = exchange
924 .input
925 .header(HEADER_CMD)
926 .and_then(|v| v.as_str().map(|s| s.to_string()))
927 .or(config.cmd.clone())
928 .ok_or_else(|| {
929 CamelError::ProcessorError(
930 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
931 )
932 })?;
933
934 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
935 let env_vars = config.parse_env();
936
937 let exec_config = bollard::exec::CreateExecOptions {
938 cmd: Some(cmd_parts),
939 env: env_vars,
940 user: config.user.clone(),
941 working_dir: config.workdir.clone(),
942 attach_stdout: Some(true),
943 attach_stderr: Some(true),
944 ..Default::default()
945 };
946
947 let create_result = docker
948 .create_exec(&container_id, exec_config)
949 .await
950 .map_err(|e| {
951 let err_str = e.to_string().to_lowercase();
952 if err_str.contains("404") || err_str.contains("no such") {
953 CamelError::ProcessorError(format!(
954 "Container '{}' not found for exec",
955 container_id
956 ))
957 } else {
958 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
959 }
960 })?;
961
962 let exec_id = create_result.id;
963
964 if config.detach {
965 docker
966 .start_exec(
967 &exec_id,
968 Some(bollard::exec::StartExecOptions {
969 detach: true,
970 ..Default::default()
971 }),
972 )
973 .await
974 .map_err(|e| {
975 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
976 })?;
977
978 exchange.input.set_header(
979 HEADER_EXEC_ID,
980 serde_json::Value::String(exec_id),
981 );
982 exchange.input.set_header(
983 HEADER_CONTAINER_ID,
984 serde_json::Value::String(container_id),
985 );
986 } else {
987 let start_result = docker
988 .start_exec(&exec_id, None)
989 .await
990 .map_err(|e| {
991 CamelError::ProcessorError(format!("Failed to start exec: {}", e))
992 })?;
993
994 let mut output = String::new();
995
996 match start_result {
997 bollard::exec::StartExecResults::Attached { output: mut stream, .. } => {
998 use futures::StreamExt;
999 while let Some(msg) = stream.next().await {
1000 match msg {
1001 Ok(bollard::container::LogOutput::StdOut { message }) => {
1002 output.push_str(&String::from_utf8_lossy(&message));
1003 }
1004 Ok(bollard::container::LogOutput::StdErr { message }) => {
1005 output.push_str(&String::from_utf8_lossy(&message));
1006 }
1007 Ok(_) => {}
1008 Err(e) => {
1009 output.push_str(&format!("[error reading stream: {}]", e));
1010 }
1011 }
1012 }
1013 }
1014 bollard::exec::StartExecResults::Detached => {}
1015 }
1016
1017 let inspect = docker.inspect_exec(&exec_id).await.map_err(|e| {
1018 CamelError::ProcessorError(format!("Failed to inspect exec: {}", e))
1019 })?;
1020
1021 let exit_code: i64 = inspect.exit_code.unwrap_or(0);
1022
1023 let output = output.trim_end().to_string();
1024 exchange.input.body = Body::Text(output);
1025 exchange.input.set_header(
1026 HEADER_EXIT_CODE,
1027 serde_json::Value::Number(exit_code.into()),
1028 );
1029 exchange.input.set_header(
1030 HEADER_CONTAINER_ID,
1031 serde_json::Value::String(container_id),
1032 );
1033 }
1034
1035 exchange.input.set_header(
1036 HEADER_ACTION_RESULT,
1037 serde_json::Value::String("success".to_string()),
1038 );
1039 Ok(())
1040}
1041
1042async fn handle_network_create(
1043 docker: Docker,
1044 config: ContainerConfig,
1045 exchange: &mut Exchange,
1046) -> Result<(), CamelError> {
1047 let network_name = exchange
1048 .input
1049 .header(HEADER_CONTAINER_NAME)
1050 .and_then(|v| v.as_str().map(|s| s.to_string()))
1051 .or(config.name.clone())
1052 .ok_or_else(|| {
1053 CamelError::ProcessorError(
1054 "CamelContainerName header or name param is required for network-create".to_string(),
1055 )
1056 })?;
1057
1058 let driver = config.driver.as_deref().unwrap_or("bridge");
1059
1060 let options = bollard::network::CreateNetworkOptions {
1061 name: network_name.clone(),
1062 driver: driver.to_string(),
1063 ..Default::default()
1064 };
1065
1066 let result = docker.create_network(options).await.map_err(|e| {
1067 let err_str = e.to_string().to_lowercase();
1068 if err_str.contains("409") || err_str.contains("already exists") {
1069 CamelError::ProcessorError(format!(
1070 "Network '{}' already exists",
1071 network_name
1072 ))
1073 } else {
1074 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1075 }
1076 })?;
1077
1078 let network_id = result.id.clone();
1079 let json_value = serde_json::to_value(&result).map_err(|e| {
1080 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1081 })?;
1082
1083 exchange.input.body = Body::Json(json_value);
1084 exchange.input.set_header(
1085 HEADER_NETWORK,
1086 serde_json::Value::String(network_id),
1087 );
1088 exchange.input.set_header(
1089 HEADER_ACTION_RESULT,
1090 serde_json::Value::String("success".to_string()),
1091 );
1092 Ok(())
1093}
1094
1095async fn handle_network_connect(
1096 docker: Docker,
1097 config: ContainerConfig,
1098 exchange: &mut Exchange,
1099) -> Result<(), CamelError> {
1100 let network = exchange
1101 .input
1102 .header(HEADER_NETWORK)
1103 .and_then(|v| v.as_str().map(|s| s.to_string()))
1104 .or(config.network.clone())
1105 .ok_or_else(|| {
1106 CamelError::ProcessorError(
1107 "CamelContainerNetwork header or network param is required for network-connect"
1108 .to_string(),
1109 )
1110 })?;
1111
1112 let container = exchange
1113 .input
1114 .header(HEADER_CONTAINER_ID)
1115 .and_then(|v| v.as_str().map(|s| s.to_string()))
1116 .or(config.container_id.clone())
1117 .ok_or_else(|| {
1118 CamelError::ProcessorError(
1119 "CamelContainerId header or container param is required for network-connect"
1120 .to_string(),
1121 )
1122 })?;
1123
1124 docker
1125 .connect_network(
1126 &network,
1127 bollard::network::ConnectNetworkOptions {
1128 container,
1129 ..Default::default()
1130 },
1131 )
1132 .await
1133 .map_err(|e| {
1134 let err_str = e.to_string().to_lowercase();
1135 if err_str.contains("404") || err_str.contains("not found") {
1136 CamelError::ProcessorError(format!(
1137 "Network '{}' or container not found",
1138 network
1139 ))
1140 } else {
1141 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1142 }
1143 })?;
1144
1145 exchange.input.set_header(
1146 HEADER_ACTION_RESULT,
1147 serde_json::Value::String("success".to_string()),
1148 );
1149 Ok(())
1150}
1151
1152async fn handle_network_disconnect(
1153 docker: Docker,
1154 config: ContainerConfig,
1155 exchange: &mut Exchange,
1156) -> Result<(), CamelError> {
1157 let network = exchange
1158 .input
1159 .header(HEADER_NETWORK)
1160 .and_then(|v| v.as_str().map(|s| s.to_string()))
1161 .or(config.network.clone())
1162 .ok_or_else(|| {
1163 CamelError::ProcessorError(
1164 "CamelContainerNetwork header or network param is required for network-disconnect"
1165 .to_string(),
1166 )
1167 })?;
1168
1169 let container = exchange
1170 .input
1171 .header(HEADER_CONTAINER_ID)
1172 .and_then(|v| v.as_str().map(|s| s.to_string()))
1173 .or(config.container_id.clone())
1174 .ok_or_else(|| {
1175 CamelError::ProcessorError(
1176 "CamelContainerId header or container param is required for network-disconnect"
1177 .to_string(),
1178 )
1179 })?;
1180
1181 docker
1182 .disconnect_network(
1183 &network,
1184 bollard::network::DisconnectNetworkOptions {
1185 container,
1186 force: config.force,
1187 },
1188 )
1189 .await
1190 .map_err(|e| {
1191 let err_str = e.to_string().to_lowercase();
1192 if err_str.contains("404") || err_str.contains("not found") {
1193 CamelError::ProcessorError(format!(
1194 "Network '{}' or container not found",
1195 network
1196 ))
1197 } else {
1198 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1199 }
1200 })?;
1201
1202 exchange.input.set_header(
1203 HEADER_ACTION_RESULT,
1204 serde_json::Value::String("success".to_string()),
1205 );
1206 Ok(())
1207}
1208
1209async fn handle_network_remove(
1210 docker: Docker,
1211 config: ContainerConfig,
1212 exchange: &mut Exchange,
1213) -> Result<(), CamelError> {
1214 let network = exchange
1215 .input
1216 .header(HEADER_NETWORK)
1217 .and_then(|v| v.as_str().map(|s| s.to_string()))
1218 .or(config.network.clone())
1219 .ok_or_else(|| {
1220 CamelError::ProcessorError(
1221 "CamelContainerNetwork header or network param is required for network-remove"
1222 .to_string(),
1223 )
1224 })?;
1225
1226 docker
1227 .remove_network(&network)
1228 .await
1229 .map_err(|e| {
1230 let err_str = e.to_string().to_lowercase();
1231 if err_str.contains("404") || err_str.contains("not found") {
1232 CamelError::ProcessorError(format!("Network '{}' not found", network))
1233 } else if err_str.contains("409") || err_str.contains("in use") {
1234 CamelError::ProcessorError(format!(
1235 "Network '{}' is in use and cannot be removed",
1236 network
1237 ))
1238 } else {
1239 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1240 }
1241 })?;
1242
1243 exchange.input.set_header(
1244 HEADER_ACTION_RESULT,
1245 serde_json::Value::String("success".to_string()),
1246 );
1247 Ok(())
1248}
1249
1250async fn handle_network_list(
1251 docker: Docker,
1252 _config: ContainerConfig,
1253 exchange: &mut Exchange,
1254) -> Result<(), CamelError> {
1255 let networks = docker.list_networks::<String>(None).await.map_err(|e| {
1256 CamelError::ProcessorError(format!("Failed to list networks: {}", e))
1257 })?;
1258
1259 let json_value = serde_json::to_value(&networks).map_err(|e| {
1260 CamelError::ProcessorError(format!("Failed to serialize networks: {}", e))
1261 })?;
1262
1263 exchange.input.body = Body::Json(json_value);
1264 exchange.input.set_header(
1265 HEADER_ACTION_RESULT,
1266 serde_json::Value::String("success".to_string()),
1267 );
1268 Ok(())
1269}
1270
1271#[derive(Clone)]
1276pub struct ContainerProducer {
1277 config: ContainerConfig,
1278 docker: Docker,
1279}
1280
1281impl Service<Exchange> for ContainerProducer {
1282 type Response = Exchange;
1283 type Error = CamelError;
1284 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1285
1286 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1287 Poll::Ready(Ok(()))
1288 }
1289
1290 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1291 let config = self.config.clone();
1292 let docker = self.docker.clone();
1293 Box::pin(async move {
1294 let operation_name = exchange
1295 .input
1296 .header(HEADER_ACTION)
1297 .and_then(|v| v.as_str().map(|s| s.to_string()))
1298 .unwrap_or_else(|| config.operation.clone());
1299
1300 let operation = parse_producer_operation(&operation_name)?;
1301
1302 match operation {
1303 ProducerOperation::List => {
1304 handle_list(docker, config, &mut exchange).await?;
1305 }
1306 ProducerOperation::Run => {
1307 handle_run(docker, config, &mut exchange).await?;
1308 }
1309 ProducerOperation::Start => {
1310 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1311 .await?;
1312 }
1313 ProducerOperation::Stop => {
1314 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1315 .await?;
1316 }
1317 ProducerOperation::Remove => {
1318 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1319 .await?;
1320 }
1321 ProducerOperation::Exec => {
1322 handle_exec(docker, config, &mut exchange).await?;
1323 }
1324 ProducerOperation::NetworkCreate => {
1325 handle_network_create(docker, config, &mut exchange).await?;
1326 }
1327 ProducerOperation::NetworkConnect => {
1328 handle_network_connect(docker, config, &mut exchange).await?;
1329 }
1330 ProducerOperation::NetworkDisconnect => {
1331 handle_network_disconnect(docker, config, &mut exchange).await?;
1332 }
1333 ProducerOperation::NetworkRemove => {
1334 handle_network_remove(docker, config, &mut exchange).await?;
1335 }
1336 ProducerOperation::NetworkList => {
1337 handle_network_list(docker, config, &mut exchange).await?;
1338 }
1339 }
1340
1341 Ok(exchange)
1342 })
1343 }
1344}
1345
1346pub struct ContainerConsumer {
1351 config: ContainerConfig,
1352}
1353
1354#[async_trait]
1355impl Consumer for ContainerConsumer {
1356 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1357 match self.config.operation.as_str() {
1358 "events" => self.start_events_consumer(context).await,
1359 "logs" => self.start_logs_consumer(context).await,
1360 _ => Err(CamelError::EndpointCreationFailed(format!(
1361 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1362 self.config.operation
1363 ))),
1364 }
1365 }
1366
1367 async fn stop(&mut self) -> Result<(), CamelError> {
1368 Ok(())
1369 }
1370
1371 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1372 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1373 }
1374}
1375
1376impl ContainerConsumer {
1377 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1378 use futures::StreamExt;
1379
1380 loop {
1381 if context.is_cancelled() {
1382 tracing::info!("Container events consumer shutting down");
1383 return Ok(());
1384 }
1385
1386 let docker = match self.config.connect_docker().await {
1387 Ok(d) => d,
1388 Err(e) => {
1389 tracing::error!(
1390 "Consumer failed to connect to docker: {}. Retrying in 5s...",
1391 e
1392 );
1393 tokio::select! {
1394 _ = context.cancelled() => {
1395 tracing::info!("Container events consumer shutting down");
1396 return Ok(());
1397 }
1398 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1399 }
1400 continue;
1401 }
1402 };
1403
1404 let mut event_stream = docker.events::<String>(None);
1405
1406 loop {
1407 tokio::select! {
1408 _ = context.cancelled() => {
1409 tracing::info!("Container events consumer shutting down");
1410 return Ok(());
1411 }
1412
1413 msg = event_stream.next() => {
1414 match msg {
1415 Some(Ok(event)) => {
1416 let formatted = format_docker_event(&event);
1417 let message = Message::new(Body::Text(formatted));
1418 let exchange = Exchange::new(message);
1419
1420 if let Err(e) = context.send(exchange).await {
1421 tracing::error!("Failed to send exchange: {:?}", e);
1422 break;
1423 }
1424 }
1425 Some(Err(e)) => {
1426 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1427 break;
1428 }
1429 None => {
1430 tracing::info!("Docker event stream ended. Reconnecting...");
1431 break;
1432 }
1433 }
1434 }
1435 }
1436 }
1437
1438 tokio::select! {
1439 _ = context.cancelled() => {
1440 tracing::info!("Container events consumer shutting down");
1441 return Ok(());
1442 }
1443 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1444 }
1445 }
1446 }
1447
1448 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1449 use futures::StreamExt;
1450
1451 let container_id = self.config.container_id.clone().ok_or_else(|| {
1452 CamelError::EndpointCreationFailed(
1453 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1454 .to_string(),
1455 )
1456 })?;
1457
1458 loop {
1459 if context.is_cancelled() {
1460 tracing::info!("Container logs consumer shutting down");
1461 return Ok(());
1462 }
1463
1464 let docker = match self.config.connect_docker().await {
1465 Ok(d) => d,
1466 Err(e) => {
1467 tracing::error!(
1468 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1469 e
1470 );
1471 tokio::select! {
1472 _ = context.cancelled() => {
1473 tracing::info!("Container logs consumer shutting down");
1474 return Ok(());
1475 }
1476 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1477 }
1478 continue;
1479 }
1480 };
1481
1482 let tail = self
1483 .config
1484 .tail
1485 .clone()
1486 .unwrap_or_else(|| "all".to_string());
1487
1488 let options = bollard::container::LogsOptions::<String> {
1489 follow: self.config.follow,
1490 stdout: true,
1491 stderr: true,
1492 timestamps: self.config.timestamps,
1493 tail,
1494 ..Default::default()
1495 };
1496
1497 let mut log_stream = docker.logs(&container_id, Some(options));
1498 let container_id_header = container_id.clone();
1499
1500 loop {
1501 tokio::select! {
1502 _ = context.cancelled() => {
1503 tracing::info!("Container logs consumer shutting down");
1504 return Ok(());
1505 }
1506
1507 msg = log_stream.next() => {
1508 match msg {
1509 Some(Ok(log_output)) => {
1510 let (stream_type, content) = match log_output {
1511 bollard::container::LogOutput::StdOut { message } => {
1512 ("stdout", String::from_utf8_lossy(&message).into_owned())
1513 }
1514 bollard::container::LogOutput::StdErr { message } => {
1515 ("stderr", String::from_utf8_lossy(&message).into_owned())
1516 }
1517 bollard::container::LogOutput::Console { message } => {
1518 ("console", String::from_utf8_lossy(&message).into_owned())
1519 }
1520 bollard::container::LogOutput::StdIn { message } => {
1521 ("stdin", String::from_utf8_lossy(&message).into_owned())
1522 }
1523 };
1524
1525 let content = content.trim_end();
1526 if content.is_empty() {
1527 continue;
1528 }
1529
1530 let mut message = Message::new(Body::Text(content.to_string()));
1531 message.set_header(
1532 HEADER_CONTAINER_ID,
1533 serde_json::Value::String(container_id_header.clone()),
1534 );
1535 message.set_header(
1536 HEADER_LOG_STREAM,
1537 serde_json::Value::String(stream_type.to_string()),
1538 );
1539
1540 if self.config.timestamps
1541 && let Some(ts) = extract_timestamp(content) {
1542 message.set_header(
1543 HEADER_LOG_TIMESTAMP,
1544 serde_json::Value::String(ts),
1545 );
1546 }
1547
1548 let exchange = Exchange::new(message);
1549
1550 if let Err(e) = context.send(exchange).await {
1551 tracing::error!("Failed to send log exchange: {:?}", e);
1552 break;
1553 }
1554 }
1555 Some(Err(e)) => {
1556 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1557 break;
1558 }
1559 None => {
1560 if self.config.follow {
1561 tracing::info!("Docker log stream ended. Reconnecting...");
1562 break;
1563 } else {
1564 tracing::info!("Container logs consumer finished (follow=false)");
1565 return Ok(());
1566 }
1567 }
1568 }
1569 }
1570 }
1571 }
1572
1573 tokio::select! {
1574 _ = context.cancelled() => {
1575 tracing::info!("Container logs consumer shutting down");
1576 return Ok(());
1577 }
1578 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1579 }
1580 }
1581 }
1582}
1583
1584fn extract_timestamp(log_line: &str) -> Option<String> {
1585 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1586 if parts.len() > 1 && parts[0].contains('T') {
1587 Some(parts[0].to_string())
1588 } else {
1589 None
1590 }
1591}
1592
1593pub struct ContainerComponent {
1601 config: Option<ContainerGlobalConfig>,
1602}
1603
1604impl ContainerComponent {
1605 pub fn new() -> Self {
1607 Self { config: None }
1608 }
1609
1610 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1612 Self {
1613 config: Some(config),
1614 }
1615 }
1616
1617 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1619 Self { config }
1620 }
1621}
1622
1623impl Default for ContainerComponent {
1624 fn default() -> Self {
1625 Self::new()
1626 }
1627}
1628
1629impl Component for ContainerComponent {
1630 fn scheme(&self) -> &str {
1631 "container"
1632 }
1633
1634 fn create_endpoint(
1635 &self,
1636 uri: &str,
1637 _ctx: &dyn camel_component_api::ComponentContext,
1638 ) -> Result<Box<dyn Endpoint>, CamelError> {
1639 let mut config = ContainerConfig::from_uri(uri)?;
1640 if let Some(ref global) = self.config {
1642 config.apply_global_defaults(global);
1643 }
1644 Ok(Box::new(ContainerEndpoint {
1645 uri: uri.to_string(),
1646 config,
1647 }))
1648 }
1649}
1650
1651pub struct ContainerEndpoint {
1656 uri: String,
1657 config: ContainerConfig,
1658}
1659
1660impl ContainerEndpoint {
1661 pub fn docker_host(&self) -> Option<&str> {
1664 self.config.host.as_deref()
1665 }
1666}
1667
1668impl Endpoint for ContainerEndpoint {
1669 fn uri(&self) -> &str {
1670 &self.uri
1671 }
1672
1673 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1674 Ok(Box::new(ContainerConsumer {
1675 config: self.config.clone(),
1676 }))
1677 }
1678
1679 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1680 let docker = self.config.connect_docker_client()?;
1681 Ok(BoxProcessor::new(ContainerProducer {
1682 config: self.config.clone(),
1683 docker,
1684 }))
1685 }
1686}
1687
1688#[cfg(test)]
1689mod tests {
1690 use super::*;
1691 use camel_component_api::NoOpComponentContext;
1692
1693 #[test]
1694 fn test_container_config() {
1695 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1696 assert_eq!(config.operation, "run");
1697 assert_eq!(config.image.as_deref(), Some("alpine"));
1698 assert!(config.host.is_none());
1700 }
1701
1702 #[test]
1703 fn test_global_config_applied_to_endpoint() {
1704 let global =
1707 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1708 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1709 assert!(
1710 config.host.is_none(),
1711 "URI without ?host= should leave host as None"
1712 );
1713 config.apply_global_defaults(&global);
1714 assert_eq!(
1715 config.host.as_deref(),
1716 Some("unix:///custom/docker.sock"),
1717 "global docker_host must be applied when URI did not set host"
1718 );
1719 }
1720
1721 #[test]
1722 fn test_uri_param_wins_over_global_config() {
1723 let global =
1725 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1726 let mut config =
1727 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1728 .unwrap();
1729 assert_eq!(
1730 config.host.as_deref(),
1731 Some("unix:///override.sock"),
1732 "URI-set host should be parsed correctly"
1733 );
1734 config.apply_global_defaults(&global);
1735 assert_eq!(
1736 config.host.as_deref(),
1737 Some("unix:///override.sock"),
1738 "global config must NOT override a host already set by URI"
1739 );
1740 }
1741
1742 #[test]
1743 fn test_container_config_parses_name() {
1744 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1745 assert_eq!(config.name.as_deref(), Some("my-container"));
1746 }
1747
1748 #[test]
1749 fn test_parse_producer_operation_known() {
1750 assert_eq!(
1751 parse_producer_operation("list").unwrap(),
1752 ProducerOperation::List
1753 );
1754 assert_eq!(
1755 parse_producer_operation("run").unwrap(),
1756 ProducerOperation::Run
1757 );
1758 assert_eq!(
1759 parse_producer_operation("start").unwrap(),
1760 ProducerOperation::Start
1761 );
1762 assert_eq!(
1763 parse_producer_operation("stop").unwrap(),
1764 ProducerOperation::Stop
1765 );
1766 assert_eq!(
1767 parse_producer_operation("remove").unwrap(),
1768 ProducerOperation::Remove
1769 );
1770 }
1771
1772 #[test]
1773 fn test_parse_producer_operation_unknown() {
1774 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1775 match err {
1776 CamelError::ProcessorError(msg) => {
1777 assert!(
1778 msg.contains("Unknown container operation"),
1779 "Unexpected error message: {}",
1780 msg
1781 );
1782 }
1783 _ => panic!("Expected ProcessorError for unknown operation"),
1784 }
1785 }
1786
1787 #[test]
1788 fn test_parse_producer_operation_new_variants() {
1789 assert_eq!(
1790 parse_producer_operation("exec").unwrap(),
1791 ProducerOperation::Exec
1792 );
1793 assert_eq!(
1794 parse_producer_operation("network-create").unwrap(),
1795 ProducerOperation::NetworkCreate
1796 );
1797 assert_eq!(
1798 parse_producer_operation("network-connect").unwrap(),
1799 ProducerOperation::NetworkConnect
1800 );
1801 assert_eq!(
1802 parse_producer_operation("network-disconnect").unwrap(),
1803 ProducerOperation::NetworkDisconnect
1804 );
1805 assert_eq!(
1806 parse_producer_operation("network-remove").unwrap(),
1807 ProducerOperation::NetworkRemove
1808 );
1809 assert_eq!(
1810 parse_producer_operation("network-list").unwrap(),
1811 ProducerOperation::NetworkList
1812 );
1813 }
1814
1815 #[test]
1816 fn test_resolve_container_name_header_overrides_config() {
1817 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1818 let mut exchange = Exchange::new(Message::new(""));
1819 exchange.input.set_header(
1820 HEADER_CONTAINER_NAME,
1821 serde_json::Value::String("header-name".to_string()),
1822 );
1823
1824 let resolved = resolve_container_name(&exchange, &config);
1825 assert_eq!(resolved.as_deref(), Some("header-name"));
1826 }
1827
1828 #[test]
1829 fn test_container_config_rejects_tcp_host() {
1830 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1831 let err = config.connect_docker_client().unwrap_err();
1832 match err {
1833 CamelError::ProcessorError(msg) => {
1834 assert!(
1835 msg.to_lowercase().contains("tcp"),
1836 "Expected TCP scheme error, got: {}",
1837 msg
1838 );
1839 }
1840 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1841 }
1842 }
1843
1844 #[tokio::test]
1845 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1846 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1847 let remove_called_clone = remove_called.clone();
1848
1849 let result = run_container_with_cleanup(
1850 || async { Ok("container-123".to_string()) },
1851 |_id| async move {
1852 Err(CamelError::ProcessorError(
1853 "Failed to start container".to_string(),
1854 ))
1855 },
1856 move |_id| {
1857 let remove_called_inner = remove_called_clone.clone();
1858 async move {
1859 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1860 Ok(())
1861 }
1862 },
1863 )
1864 .await;
1865
1866 assert!(result.is_err(), "Expected start failure to bubble up");
1867 assert!(
1868 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1869 "Expected cleanup to remove container"
1870 );
1871 }
1872
1873 #[test]
1874 fn test_container_component_creates_endpoint() {
1875 let component = ContainerComponent::new();
1876 assert_eq!(component.scheme(), "container");
1877 let ctx = NoOpComponentContext;
1878 let endpoint = component
1879 .create_endpoint("container:run?image=alpine", &ctx)
1880 .unwrap();
1881 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1882 }
1883
1884 #[test]
1885 fn test_container_config_parses_ports() {
1886 let config =
1887 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1888 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1889 }
1890
1891 #[test]
1892 fn test_container_config_parses_env() {
1893 let config =
1894 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1895 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1896 }
1897
1898 #[test]
1899 fn test_container_config_parses_logs_options() {
1900 let config = ContainerConfig::from_uri(
1901 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1902 )
1903 .unwrap();
1904 assert_eq!(config.operation, "logs");
1905 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1906 assert!(config.follow);
1907 assert!(config.timestamps);
1908 assert_eq!(config.tail.as_deref(), Some("100"));
1909 }
1910
1911 #[test]
1912 fn test_container_config_logs_defaults() {
1913 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1914 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1918
1919 #[test]
1920 fn test_parse_ports_single() {
1921 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1922 let (exposed, bindings) = config.parse_ports().unwrap();
1923
1924 assert!(exposed.contains_key("80/tcp"));
1925 assert!(bindings.contains_key("80/tcp"));
1926
1927 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1928 assert_eq!(binding.len(), 1);
1929 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1930 }
1931
1932 #[test]
1933 fn test_parse_ports_multiple() {
1934 let config =
1935 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1936 let (exposed, bindings) = config.parse_ports().unwrap();
1937
1938 assert!(exposed.contains_key("80/tcp"));
1939 assert!(exposed.contains_key("443/tcp"));
1940 assert_eq!(bindings.len(), 2);
1941 }
1942
1943 #[test]
1944 fn test_parse_ports_with_protocol() {
1945 let config =
1946 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1947 .unwrap();
1948 let (exposed, _bindings) = config.parse_ports().unwrap();
1949
1950 assert!(exposed.contains_key("80/tcp"));
1951 assert!(exposed.contains_key("53/udp"));
1952 }
1953
1954 #[test]
1955 fn test_parse_ports_none() {
1956 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1957 assert!(config.parse_ports().is_none());
1958 }
1959
1960 #[test]
1961 fn test_parse_env_single() {
1962 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1963 let env = config.parse_env().unwrap();
1964
1965 assert_eq!(env.len(), 1);
1966 assert_eq!(env[0], "FOO=bar");
1967 }
1968
1969 #[test]
1970 fn test_parse_env_multiple() {
1971 let config =
1972 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1973 .unwrap();
1974 let env = config.parse_env().unwrap();
1975
1976 assert_eq!(env.len(), 3);
1977 assert!(env.contains(&"FOO=bar".to_string()));
1978 assert!(env.contains(&"BAZ=qux".to_string()));
1979 assert!(env.contains(&"NUM=123".to_string()));
1980 }
1981
1982 #[test]
1983 fn test_parse_env_none() {
1984 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1985 assert!(config.parse_env().is_none());
1986 }
1987
1988 use camel_component_api::Message;
1989 use std::sync::Arc;
1990
1991 #[tokio::test]
1992 async fn test_container_producer_resolves_operation_from_header() {
1993 let docker = match Docker::connect_with_local_defaults() {
1995 Ok(d) => d,
1996 Err(_) => {
1997 eprintln!("Skipping test: Could not connect to Docker daemon");
1998 return;
1999 }
2000 };
2001
2002 if docker.ping().await.is_err() {
2003 eprintln!("Skipping test: Docker daemon not responding to ping");
2004 return;
2005 }
2006
2007 let component = ContainerComponent::new();
2008 let ctx = NoOpComponentContext;
2009 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2010
2011 let ctx = ProducerContext::new();
2012 let mut producer = endpoint.create_producer(&ctx).unwrap();
2013
2014 let mut exchange = Exchange::new(Message::new(""));
2015 exchange
2016 .input
2017 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2018
2019 use tower::ServiceExt;
2020 let result = producer
2021 .ready()
2022 .await
2023 .unwrap()
2024 .call(exchange)
2025 .await
2026 .unwrap();
2027
2028 assert_eq!(
2030 result
2031 .input
2032 .header(HEADER_ACTION_RESULT)
2033 .map(|v| v.as_str().unwrap()),
2034 Some("success")
2035 );
2036 }
2037
2038 #[tokio::test]
2039 async fn test_container_producer_connection_error_on_invalid_host() {
2040 let component = ContainerComponent::new();
2042 let ctx = NoOpComponentContext;
2043 let endpoint = component
2044 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2045 .unwrap();
2046
2047 let ctx = ProducerContext::new();
2048 let result = endpoint.create_producer(&ctx);
2049
2050 assert!(
2052 result.is_err(),
2053 "Expected error when connecting to invalid host"
2054 );
2055 let err = result.unwrap_err();
2056 match &err {
2057 CamelError::ProcessorError(msg) => {
2058 assert!(
2059 msg.to_lowercase().contains("connection")
2060 || msg.to_lowercase().contains("connect")
2061 || msg.to_lowercase().contains("socket")
2062 || msg.contains("docker"),
2063 "Error message should indicate connection failure, got: {}",
2064 msg
2065 );
2066 }
2067 _ => panic!("Expected ProcessorError, got: {:?}", err),
2068 }
2069 }
2070
2071 #[tokio::test]
2073 async fn test_container_producer_lifecycle_operations_missing_id() {
2074 let docker = match Docker::connect_with_local_defaults() {
2076 Ok(d) => d,
2077 Err(_) => {
2078 eprintln!("Skipping test: Could not connect to Docker daemon");
2079 return;
2080 }
2081 };
2082
2083 if docker.ping().await.is_err() {
2084 eprintln!("Skipping test: Docker daemon not responding to ping");
2085 return;
2086 }
2087
2088 let component = ContainerComponent::new();
2089 let ctx = NoOpComponentContext;
2090 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2091 let ctx = ProducerContext::new();
2092 let mut producer = endpoint.create_producer(&ctx).unwrap();
2093
2094 for operation in ["start", "stop", "remove"] {
2096 let mut exchange = Exchange::new(Message::new(""));
2097 exchange.input.set_header(
2098 HEADER_ACTION,
2099 serde_json::Value::String(operation.to_string()),
2100 );
2101 use tower::ServiceExt;
2104 let result = producer.ready().await.unwrap().call(exchange).await;
2105
2106 assert!(
2107 result.is_err(),
2108 "Expected error for {} operation without CamelContainerId",
2109 operation
2110 );
2111 let err = result.unwrap_err();
2112 match &err {
2113 CamelError::ProcessorError(msg) => {
2114 assert!(
2115 msg.contains(HEADER_CONTAINER_ID),
2116 "Error message should mention {}, got: {}",
2117 HEADER_CONTAINER_ID,
2118 msg
2119 );
2120 }
2121 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2122 }
2123 }
2124 }
2125
2126 #[tokio::test]
2128 async fn test_container_producer_stop_nonexistent() {
2129 let docker = match Docker::connect_with_local_defaults() {
2131 Ok(d) => d,
2132 Err(_) => {
2133 eprintln!("Skipping test: Could not connect to Docker daemon");
2134 return;
2135 }
2136 };
2137
2138 if docker.ping().await.is_err() {
2139 eprintln!("Skipping test: Docker daemon not responding to ping");
2140 return;
2141 }
2142
2143 let component = ContainerComponent::new();
2144 let ctx = NoOpComponentContext;
2145 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2146 let ctx = ProducerContext::new();
2147 let mut producer = endpoint.create_producer(&ctx).unwrap();
2148
2149 let mut exchange = Exchange::new(Message::new(""));
2150 exchange
2151 .input
2152 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2153 exchange.input.set_header(
2154 HEADER_CONTAINER_ID,
2155 serde_json::Value::String("nonexistent-container-123".into()),
2156 );
2157
2158 use tower::ServiceExt;
2159 let result = producer.ready().await.unwrap().call(exchange).await;
2160
2161 assert!(
2162 result.is_err(),
2163 "Expected error when stopping nonexistent container"
2164 );
2165 let err = result.unwrap_err();
2166 match &err {
2167 CamelError::ProcessorError(msg) => {
2168 assert!(
2170 msg.to_lowercase().contains("no such container")
2171 || msg.to_lowercase().contains("not found")
2172 || msg.contains("404"),
2173 "Error message should indicate container not found, got: {}",
2174 msg
2175 );
2176 }
2177 _ => panic!("Expected ProcessorError, got: {:?}", err),
2178 }
2179 }
2180
2181 #[tokio::test]
2183 async fn test_container_producer_run_missing_image() {
2184 let docker = match Docker::connect_with_local_defaults() {
2186 Ok(d) => d,
2187 Err(_) => {
2188 eprintln!("Skipping test: Could not connect to Docker daemon");
2189 return;
2190 }
2191 };
2192
2193 if docker.ping().await.is_err() {
2194 eprintln!("Skipping test: Docker daemon not responding to ping");
2195 return;
2196 }
2197
2198 let component = ContainerComponent::new();
2200 let ctx = NoOpComponentContext;
2201 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2202 let ctx = ProducerContext::new();
2203 let mut producer = endpoint.create_producer(&ctx).unwrap();
2204
2205 let mut exchange = Exchange::new(Message::new(""));
2206 exchange
2207 .input
2208 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2209 use tower::ServiceExt;
2212 let result = producer.ready().await.unwrap().call(exchange).await;
2213
2214 assert!(
2215 result.is_err(),
2216 "Expected error for run operation without image"
2217 );
2218 let err = result.unwrap_err();
2219 match &err {
2220 CamelError::ProcessorError(msg) => {
2221 assert!(
2222 msg.to_lowercase().contains("image"),
2223 "Error message should mention 'image', got: {}",
2224 msg
2225 );
2226 }
2227 _ => panic!("Expected ProcessorError, got: {:?}", err),
2228 }
2229 }
2230
2231 #[tokio::test]
2233 async fn test_container_producer_run_image_from_header() {
2234 let docker = match Docker::connect_with_local_defaults() {
2236 Ok(d) => d,
2237 Err(_) => {
2238 eprintln!("Skipping test: Could not connect to Docker daemon");
2239 return;
2240 }
2241 };
2242
2243 if docker.ping().await.is_err() {
2244 eprintln!("Skipping test: Docker daemon not responding to ping");
2245 return;
2246 }
2247
2248 let component = ContainerComponent::new();
2250 let ctx = NoOpComponentContext;
2251 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2252 let ctx = ProducerContext::new();
2253 let mut producer = endpoint.create_producer(&ctx).unwrap();
2254
2255 let mut exchange = Exchange::new(Message::new(""));
2256 exchange
2257 .input
2258 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2259 exchange.input.set_header(
2261 HEADER_IMAGE,
2262 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2263 );
2264
2265 use tower::ServiceExt;
2266 let result = producer.ready().await.unwrap().call(exchange).await;
2267
2268 assert!(
2270 result.is_err(),
2271 "Expected error when running container with nonexistent image"
2272 );
2273 let err = result.unwrap_err();
2274 match &err {
2275 CamelError::ProcessorError(msg) => {
2276 assert!(
2278 msg.to_lowercase().contains("no such image")
2279 || msg.to_lowercase().contains("not found")
2280 || msg.to_lowercase().contains("image")
2281 || msg.to_lowercase().contains("pull")
2282 || msg.contains("404"),
2283 "Error message should indicate image issue, got: {}",
2284 msg
2285 );
2286 }
2287 _ => panic!("Expected ProcessorError, got: {:?}", err),
2288 }
2289 }
2290
2291 #[tokio::test]
2294 async fn test_container_producer_run_alpine_container() {
2295 let docker = match Docker::connect_with_local_defaults() {
2296 Ok(d) => d,
2297 Err(_) => {
2298 eprintln!("Skipping test: Could not connect to Docker daemon");
2299 return;
2300 }
2301 };
2302
2303 if docker.ping().await.is_err() {
2304 eprintln!("Skipping test: Docker daemon not responding to ping");
2305 return;
2306 }
2307
2308 let images = docker.list_images::<&str>(None).await.unwrap();
2310 let has_alpine = images
2311 .iter()
2312 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2313
2314 if !has_alpine {
2315 eprintln!("Pulling alpine:latest image...");
2316 let mut stream = docker.create_image(
2317 Some(bollard::image::CreateImageOptions {
2318 from_image: "alpine:latest",
2319 ..Default::default()
2320 }),
2321 None,
2322 None,
2323 );
2324
2325 use futures::StreamExt;
2326 while let Some(_item) = stream.next().await {
2327 }
2329 eprintln!("Image pulled successfully");
2330 }
2331
2332 let component = ContainerComponent::new();
2334 let ctx = NoOpComponentContext;
2335 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2336 let ctx = ProducerContext::new();
2337 let mut producer = endpoint.create_producer(&ctx).unwrap();
2338
2339 let timestamp = std::time::SystemTime::now()
2341 .duration_since(std::time::UNIX_EPOCH)
2342 .unwrap()
2343 .as_millis();
2344 let container_name = format!("test-rust-camel-{}", timestamp);
2345 let mut exchange = Exchange::new(Message::new(""));
2346 exchange.input.set_header(
2347 HEADER_IMAGE,
2348 serde_json::Value::String("alpine:latest".into()),
2349 );
2350 exchange.input.set_header(
2351 HEADER_CONTAINER_NAME,
2352 serde_json::Value::String(container_name.clone()),
2353 );
2354
2355 use tower::ServiceExt;
2356 let result = producer
2357 .ready()
2358 .await
2359 .unwrap()
2360 .call(exchange)
2361 .await
2362 .expect("Container run should succeed");
2363
2364 let container_id = result
2366 .input
2367 .header(HEADER_CONTAINER_ID)
2368 .and_then(|v| v.as_str().map(|s| s.to_string()))
2369 .expect("Expected container ID header");
2370 assert!(!container_id.is_empty(), "Container ID should not be empty");
2371
2372 assert_eq!(
2374 result
2375 .input
2376 .header(HEADER_ACTION_RESULT)
2377 .and_then(|v| v.as_str()),
2378 Some("success")
2379 );
2380
2381 let inspect = docker
2383 .inspect_container(&container_id, None)
2384 .await
2385 .expect("Container should exist");
2386 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2387
2388 docker
2390 .remove_container(
2391 &container_id,
2392 Some(bollard::container::RemoveContainerOptions {
2393 force: true,
2394 ..Default::default()
2395 }),
2396 )
2397 .await
2398 .ok();
2399
2400 eprintln!("✅ Container {} created and cleaned up", container_id);
2401 }
2402
2403 #[tokio::test]
2405 async fn test_container_consumer_unsupported_operation() {
2406 use tokio::sync::mpsc;
2407
2408 let component = ContainerComponent::new();
2409 let ctx = NoOpComponentContext;
2410 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2411 let mut consumer = endpoint.create_consumer().unwrap();
2412
2413 let (tx, _rx) = mpsc::channel(16);
2415 let cancel_token = tokio_util::sync::CancellationToken::new();
2416 let context = ConsumerContext::new(tx, cancel_token);
2417
2418 let result = consumer.start(context).await;
2419
2420 assert!(
2422 result.is_err(),
2423 "Expected error for unsupported consumer operation"
2424 );
2425 let err = result.unwrap_err();
2426 match &err {
2427 CamelError::EndpointCreationFailed(msg) => {
2428 assert!(
2429 msg.contains("Consumer only supports 'events' or 'logs'"),
2430 "Error message should mention events or logs support, got: {}",
2431 msg
2432 );
2433 }
2434 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2435 }
2436 }
2437
2438 #[test]
2439 fn test_container_consumer_concurrency_model_is_concurrent() {
2440 let consumer = ContainerConsumer {
2441 config: ContainerConfig::from_uri("container:events").unwrap(),
2442 };
2443
2444 assert_eq!(
2445 consumer.concurrency_model(),
2446 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2447 );
2448 }
2449
2450 #[tokio::test]
2454 async fn test_container_consumer_cancellation() {
2455 use std::sync::atomic::{AtomicBool, Ordering};
2456 use tokio::sync::mpsc;
2457
2458 let docker = match Docker::connect_with_local_defaults() {
2460 Ok(d) => d,
2461 Err(_) => {
2462 eprintln!("Skipping test: Could not connect to Docker daemon");
2463 return;
2464 }
2465 };
2466
2467 if docker.ping().await.is_err() {
2468 eprintln!("Skipping test: Docker daemon not responding to ping");
2469 return;
2470 }
2471
2472 let component = ContainerComponent::new();
2473 let ctx = NoOpComponentContext;
2474 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2475 let mut consumer = endpoint.create_consumer().unwrap();
2476
2477 let (tx, _rx) = mpsc::channel(16);
2479 let cancel_token = tokio_util::sync::CancellationToken::new();
2480 let context = ConsumerContext::new(tx, cancel_token.clone());
2481
2482 let completed = Arc::new(AtomicBool::new(false));
2484 let completed_clone = completed.clone();
2485
2486 let handle = tokio::spawn(async move {
2488 let result = consumer.start(context).await;
2489 completed_clone.store(true, Ordering::SeqCst);
2491 result
2492 });
2493
2494 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2496
2497 assert!(
2499 !completed.load(Ordering::SeqCst),
2500 "Consumer should still be running before cancellation"
2501 );
2502
2503 cancel_token.cancel();
2505
2506 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2508
2509 assert!(
2511 result.is_ok(),
2512 "Consumer should gracefully shut down after cancellation"
2513 );
2514
2515 assert!(
2517 completed.load(Ordering::SeqCst),
2518 "Consumer should have completed after cancellation"
2519 );
2520 }
2521
2522 #[tokio::test]
2526 async fn test_container_producer_list_containers() {
2527 let docker = match Docker::connect_with_local_defaults() {
2530 Ok(d) => d,
2531 Err(_) => {
2532 eprintln!("Skipping test: Could not connect to Docker daemon");
2533 return;
2534 }
2535 };
2536
2537 if docker.ping().await.is_err() {
2538 eprintln!("Skipping test: Docker daemon not responding to ping");
2539 return;
2540 }
2541
2542 let component = ContainerComponent::new();
2544 let ctx = NoOpComponentContext;
2545 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2546
2547 let ctx = ProducerContext::new();
2548 let mut producer = endpoint.create_producer(&ctx).unwrap();
2549
2550 let mut exchange = Exchange::new(Message::new(""));
2552 exchange
2553 .input
2554 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2555
2556 use tower::ServiceExt;
2558 let result = producer
2559 .ready()
2560 .await
2561 .unwrap()
2562 .call(exchange)
2563 .await
2564 .expect("Producer should succeed when Docker is available");
2565
2566 match &result.input.body {
2569 camel_component_api::Body::Json(json_value) => {
2570 assert!(
2571 json_value.is_array(),
2572 "Expected input body to be a JSON array, got: {:?}",
2573 json_value
2574 );
2575 }
2576 other => panic!("Expected Body::Json with array, got: {:?}", other),
2577 }
2578 }
2579
2580 #[test]
2581 fn test_container_config_parses_volumes() {
2582 let config = ContainerConfig::from_uri(
2583 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2584 )
2585 .unwrap();
2586 assert_eq!(
2587 config.volumes.as_deref(),
2588 Some("./html:/usr/share/nginx/html:ro")
2589 );
2590 }
2591
2592 #[test]
2593 fn test_container_config_parses_exec_params() {
2594 let config = ContainerConfig::from_uri(
2595 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2596 )
2597 .unwrap();
2598 assert_eq!(config.operation, "exec");
2599 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2600 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2601 assert_eq!(config.user.as_deref(), Some("root"));
2602 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2603 assert!(config.detach);
2604 }
2605
2606 #[test]
2607 fn test_container_config_parses_network_create_params() {
2608 let config =
2609 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge").unwrap();
2610 assert_eq!(config.operation, "network-create");
2611 assert_eq!(config.name.as_deref(), Some("my-net"));
2612 assert_eq!(config.driver.as_deref(), Some("bridge"));
2613 }
2614
2615 #[test]
2616 fn test_container_config_defaults_new_fields() {
2617 let config = ContainerConfig::from_uri("container:list").unwrap();
2618 assert!(config.volumes.is_none());
2619 assert!(config.user.is_none());
2620 assert!(config.workdir.is_none());
2621 assert!(!config.detach);
2622 assert!(config.driver.is_none());
2623 assert!(!config.force);
2624 }
2625
2626 #[test]
2627 fn test_parse_volumes_bind_mount() {
2628 let config = ContainerConfig::from_uri(
2629 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2630 )
2631 .unwrap();
2632 let (binds, anon) = config.parse_volumes().unwrap();
2633 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2634 assert!(anon.is_empty());
2635 }
2636
2637 #[test]
2638 fn test_parse_volumes_named_volume() {
2639 let config =
2640 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2641 .unwrap();
2642 let (binds, anon) = config.parse_volumes().unwrap();
2643 assert_eq!(binds, vec!["data:/var/lib/data"]);
2644 assert!(anon.is_empty());
2645 }
2646
2647 #[test]
2648 fn test_parse_volumes_anonymous() {
2649 let config =
2650 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2651 let (binds, anon) = config.parse_volumes().unwrap();
2652 assert!(binds.is_empty());
2653 assert!(anon.contains_key("/tmp/app-data"));
2654 }
2655
2656 #[test]
2657 fn test_parse_volumes_anonymous_with_mode() {
2658 let config =
2659 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2660 .unwrap();
2661 let (binds, anon) = config.parse_volumes().unwrap();
2662 assert!(binds.is_empty());
2663 assert!(anon.contains_key("/tmp/app-data"));
2664 }
2665
2666 #[test]
2667 fn test_parse_volumes_multiple() {
2668 let config = ContainerConfig::from_uri(
2669 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2670 )
2671 .unwrap();
2672 let (binds, anon) = config.parse_volumes().unwrap();
2673 assert_eq!(binds.len(), 2);
2674 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2675 assert!(binds.contains(&"data:/var/log/app".to_string()));
2676 assert!(anon.is_empty());
2677 }
2678
2679 #[test]
2680 fn test_parse_volumes_mixed() {
2681 let config = ContainerConfig::from_uri(
2682 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2683 )
2684 .unwrap();
2685 let (binds, anon) = config.parse_volumes().unwrap();
2686 assert_eq!(binds.len(), 1);
2687 assert!(anon.contains_key("/tmp/cache"));
2688 }
2689
2690 #[test]
2691 fn test_parse_volumes_none() {
2692 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2693 assert!(config.parse_volumes().is_none());
2694 }
2695
2696 #[test]
2697 fn test_parse_volumes_empty_entry_skipped() {
2698 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2699 assert!(config.parse_volumes().is_none());
2700 }
2701
2702 #[test]
2703 fn test_parse_volumes_rw_mode() {
2704 let config = ContainerConfig::from_uri(
2705 "container:run?image=nginx&volumes=./data:/app/data:rw",
2706 )
2707 .unwrap();
2708 let (binds, _) = config.parse_volumes().unwrap();
2709 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2710 }
2711
2712 #[tokio::test]
2713 async fn test_container_producer_run_with_volumes() {
2714 let docker = match Docker::connect_with_local_defaults() {
2715 Ok(d) => d,
2716 Err(_) => {
2717 eprintln!("Skipping test: Could not connect to Docker daemon");
2718 return;
2719 }
2720 };
2721 if docker.ping().await.is_err() {
2722 eprintln!("Skipping test: Docker daemon not responding to ping");
2723 return;
2724 }
2725
2726 let cargo_toml = std::fs::canonicalize("./Cargo.toml")
2727 .expect("Cargo.toml should exist");
2728 let volume_path = cargo_toml.to_str().unwrap();
2729
2730 let component = ContainerComponent::new();
2731 let ctx = NoOpComponentContext;
2732 let uri = format!(
2733 "container:run?image=alpine&cmd=cat /mnt/test.txt&volumes={}:/mnt/test.txt:ro&autoRemove=true",
2734 volume_path
2735 );
2736 let endpoint = component.create_endpoint(&uri, &ctx).unwrap();
2737 let ctx = ProducerContext::new();
2738 let mut producer = endpoint.create_producer(&ctx).unwrap();
2739
2740 let mut exchange = Exchange::new(Message::new(""));
2741 exchange
2742 .input
2743 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2744
2745 use tower::ServiceExt;
2746 let result = producer
2747 .ready()
2748 .await
2749 .unwrap()
2750 .call(exchange)
2751 .await
2752 .expect("Run with volumes should succeed");
2753
2754 let container_id = result
2755 .input
2756 .header(HEADER_CONTAINER_ID)
2757 .and_then(|v| v.as_str().map(|s| s.to_string()))
2758 .expect("Should have container ID");
2759
2760 let _ = docker
2761 .remove_container(
2762 &container_id,
2763 Some(bollard::container::RemoveContainerOptions {
2764 force: true,
2765 ..Default::default()
2766 }),
2767 )
2768 .await;
2769 }
2770
2771 #[tokio::test]
2772 async fn test_container_producer_exec() {
2773 let docker = match Docker::connect_with_local_defaults() {
2774 Ok(d) => d,
2775 Err(_) => {
2776 eprintln!("Skipping test: Could not connect to Docker daemon");
2777 return;
2778 }
2779 };
2780 if docker.ping().await.is_err() {
2781 eprintln!("Skipping test: Docker daemon not responding to ping");
2782 return;
2783 }
2784
2785 let component = ContainerComponent::new();
2786 let ctx = NoOpComponentContext;
2787 let endpoint = component
2788 .create_endpoint("container:run?image=alpine&cmd=sleep 30&autoRemove=true", &ctx)
2789 .unwrap();
2790 let ctx = ProducerContext::new();
2791 let mut producer = endpoint.create_producer(&ctx).unwrap();
2792
2793 let mut exchange = Exchange::new(Message::new(""));
2794 exchange
2795 .input
2796 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2797
2798 use tower::ServiceExt;
2799 let result = producer
2800 .ready()
2801 .await
2802 .unwrap()
2803 .call(exchange)
2804 .await
2805 .expect("Run should succeed");
2806
2807 let container_id = result
2808 .input
2809 .header(HEADER_CONTAINER_ID)
2810 .and_then(|v| v.as_str().map(|s| s.to_string()))
2811 .expect("Should have container ID");
2812
2813 let mut exec_exchange = Exchange::new(Message::new(""));
2814 exec_exchange.input.set_header(
2815 HEADER_ACTION,
2816 serde_json::Value::String("exec".into()),
2817 );
2818 exec_exchange.input.set_header(
2819 HEADER_CONTAINER_ID,
2820 serde_json::Value::String(container_id.clone()),
2821 );
2822 exec_exchange.input.set_header(
2823 HEADER_CMD,
2824 serde_json::Value::String("echo hello".into()),
2825 );
2826
2827 let exec_result = producer
2828 .ready()
2829 .await
2830 .unwrap()
2831 .call(exec_exchange)
2832 .await
2833 .expect("Exec should succeed");
2834
2835 match &exec_result.input.body {
2836 Body::Text(text) => {
2837 assert!(text.contains("hello"), "Exec output should contain 'hello', got: {}", text);
2838 }
2839 other => panic!("Expected Body::Text, got: {:?}", other),
2840 }
2841
2842 let exit_code = exec_result
2843 .input
2844 .header(HEADER_EXIT_CODE)
2845 .and_then(|v| v.as_i64());
2846 assert_eq!(exit_code, Some(0));
2847
2848 let action_result = exec_result
2849 .input
2850 .header(HEADER_ACTION_RESULT)
2851 .and_then(|v| v.as_str());
2852 assert_eq!(action_result, Some("success"));
2853
2854 let _ = docker
2855 .remove_container(
2856 &container_id,
2857 Some(bollard::container::RemoveContainerOptions {
2858 force: true,
2859 ..Default::default()
2860 }),
2861 )
2862 .await;
2863 }
2864
2865 #[tokio::test]
2866 async fn test_container_producer_network_lifecycle() {
2867 let docker = match Docker::connect_with_local_defaults() {
2868 Ok(d) => d,
2869 Err(_) => {
2870 eprintln!("Skipping test: Could not connect to Docker daemon");
2871 return;
2872 }
2873 };
2874 if docker.ping().await.is_err() {
2875 eprintln!("Skipping test: Docker daemon not responding to ping");
2876 return;
2877 }
2878
2879 let network_name = format!("camel-test-{}", std::process::id());
2880
2881 let component = ContainerComponent::new();
2882 let component_ctx = NoOpComponentContext;
2883
2884 let endpoint = component
2886 .create_endpoint(
2887 &format!("container:network-create?name={}", network_name),
2888 &component_ctx,
2889 )
2890 .unwrap();
2891 let ctx = ProducerContext::new();
2892 let mut producer = endpoint.create_producer(&ctx).unwrap();
2893
2894 let mut exchange = Exchange::new(Message::new(""));
2895 exchange.input.set_header(
2896 HEADER_ACTION,
2897 serde_json::Value::String("network-create".into()),
2898 );
2899
2900 use tower::ServiceExt;
2901 let result = producer
2902 .ready()
2903 .await
2904 .unwrap()
2905 .call(exchange)
2906 .await
2907 .expect("Network create should succeed");
2908
2909 let network_id = result
2910 .input
2911 .header(HEADER_NETWORK)
2912 .and_then(|v| v.as_str().map(|s| s.to_string()))
2913 .expect("Should have network ID");
2914
2915 assert!(!network_id.is_empty());
2916
2917 let endpoint = component
2919 .create_endpoint("container:network-list", &component_ctx)
2920 .unwrap();
2921 let mut producer = endpoint.create_producer(&ctx).unwrap();
2922
2923 let mut exchange = Exchange::new(Message::new(""));
2924 exchange.input.set_header(
2925 HEADER_ACTION,
2926 serde_json::Value::String("network-list".into()),
2927 );
2928
2929 let list_result = producer
2930 .ready()
2931 .await
2932 .unwrap()
2933 .call(exchange)
2934 .await
2935 .expect("Network list should succeed");
2936
2937 match &list_result.input.body {
2938 Body::Json(json_value) => {
2939 assert!(json_value.is_array(), "Expected JSON array");
2940 }
2941 other => panic!("Expected Body::Json, got: {:?}", other),
2942 }
2943
2944 let endpoint = component
2946 .create_endpoint(
2947 &format!("container:network-remove?network={}", network_name),
2948 &component_ctx,
2949 )
2950 .unwrap();
2951 let mut producer = endpoint.create_producer(&ctx).unwrap();
2952
2953 let mut exchange = Exchange::new(Message::new(""));
2954 exchange.input.set_header(
2955 HEADER_ACTION,
2956 serde_json::Value::String("network-remove".into()),
2957 );
2958
2959 let remove_result = producer
2960 .ready()
2961 .await
2962 .unwrap()
2963 .call(exchange)
2964 .await
2965 .expect("Network remove should succeed");
2966
2967 let action_result = remove_result
2968 .input
2969 .header(HEADER_ACTION_RESULT)
2970 .and_then(|v| v.as_str());
2971 assert_eq!(action_result, Some("success"));
2972 }
2973}