1pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::models::{
19 ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
20};
21use bollard::query_parameters::{
22 CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
23 ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
24 StartContainerOptions,
25};
26use bollard::service::{HostConfig, PortBinding};
27use camel_component_api::parse_uri;
28use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
29use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
30use tower::Service;
31
32static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
35 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
36
37fn track_container(id: String) {
39 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
40 tracker.insert(id);
41 }
42}
43
44fn untrack_container(id: &str) {
46 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
47 tracker.remove(id);
48 }
49}
50
51pub async fn cleanup_tracked_containers() {
53 let ids: Vec<String> = {
54 match CONTAINER_TRACKER.lock() {
55 Ok(tracker) => tracker.iter().cloned().collect(),
56 Err(_) => return,
57 }
58 };
59
60 if ids.is_empty() {
61 return;
62 }
63
64 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
65
66 let docker = match Docker::connect_with_local_defaults() {
67 Ok(d) => d,
68 Err(e) => {
69 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
70 return;
71 }
72 };
73
74 for id in ids {
75 match docker
76 .remove_container(
77 &id,
78 Some(RemoveContainerOptions {
79 force: true,
80 ..Default::default()
81 }),
82 )
83 .await
84 {
85 Ok(_) => {
86 tracing::debug!("Cleaned up container {}", id);
87 untrack_container(&id);
88 }
89 Err(e) => {
90 tracing::warn!("Failed to cleanup container {}: {}", id, e);
91 }
92 }
93 }
94}
95
96const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
100
101pub const HEADER_ACTION: &str = "CamelContainerAction";
103
104pub const HEADER_IMAGE: &str = "CamelContainerImage";
106
107pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
109
110pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
112
113pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
115
116pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
118
119pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
121
122pub const HEADER_CMD: &str = "CamelContainerCmd";
124
125pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
127
128pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
130
131pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
133
134pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
136
137#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
145#[serde(default)]
146pub struct ContainerGlobalConfig {
147 pub docker_host: String,
149}
150
151impl Default for ContainerGlobalConfig {
152 fn default() -> Self {
153 Self {
154 docker_host: "unix:///var/run/docker.sock".to_string(),
155 }
156 }
157}
158
159impl ContainerGlobalConfig {
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
165 self.docker_host = v.into();
166 self
167 }
168}
169
170#[derive(Debug, Clone)]
179pub struct ContainerConfig {
180 pub operation: String,
182 pub image: Option<String>,
184 pub name: Option<String>,
186 pub host: Option<String>,
188 pub cmd: Option<String>,
190 pub ports: Option<String>,
192 pub env: Option<String>,
194 pub network: Option<String>,
196 pub container_id: Option<String>,
198 pub follow: bool,
200 pub timestamps: bool,
202 pub tail: Option<String>,
204 pub auto_pull: bool,
206 pub auto_remove: bool,
208 pub volumes: Option<String>,
210 pub user: Option<String>,
212 pub workdir: Option<String>,
214 pub detach: bool,
216 pub driver: Option<String>,
218 pub force: bool,
220}
221
222impl ContainerConfig {
223 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
231 let parts = parse_uri(uri)?;
232 if parts.scheme != "container" {
233 return Err(CamelError::InvalidUri(format!(
234 "expected scheme 'container', got '{}'",
235 parts.scheme
236 )));
237 }
238
239 let image = parts.params.get("image").cloned();
240 let name = parts.params.get("name").cloned();
241 let cmd = parts.params.get("cmd").cloned();
242 let ports = parts.params.get("ports").cloned();
243 let env = parts.params.get("env").cloned();
244 let network = parts.params.get("network").cloned();
245 let container_id = parts.params.get("containerId").cloned();
246 let follow = parts
247 .params
248 .get("follow")
249 .map(|v| v.eq_ignore_ascii_case("true"))
250 .unwrap_or(true);
251 let timestamps = parts
252 .params
253 .get("timestamps")
254 .map(|v| v.eq_ignore_ascii_case("true"))
255 .unwrap_or(false);
256 let tail = parts.params.get("tail").cloned();
257 let auto_pull = parts
258 .params
259 .get("autoPull")
260 .map(|v| v.eq_ignore_ascii_case("true"))
261 .unwrap_or(true);
262 let auto_remove = parts
263 .params
264 .get("autoRemove")
265 .map(|v| v.eq_ignore_ascii_case("true"))
266 .unwrap_or(true);
267 let host = parts.params.get("host").cloned();
269 let volumes = parts.params.get("volumes").cloned();
270 let user = parts.params.get("user").cloned();
271 let workdir = parts.params.get("workdir").cloned();
272 let detach = parts
273 .params
274 .get("detach")
275 .map(|v| v.eq_ignore_ascii_case("true"))
276 .unwrap_or(false);
277 let driver = parts.params.get("driver").cloned();
278 let force = parts
279 .params
280 .get("force")
281 .map(|v| v.eq_ignore_ascii_case("true"))
282 .unwrap_or(false);
283
284 Ok(Self {
285 operation: parts.path,
286 image,
287 name,
288 host,
289 cmd,
290 ports,
291 env,
292 network,
293 container_id,
294 follow,
295 timestamps,
296 tail,
297 auto_pull,
298 auto_remove,
299 volumes,
300 user,
301 workdir,
302 detach,
303 driver,
304 force,
305 })
306 }
307
308 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
311 if self.host.is_none() {
312 self.host = Some(global.docker_host.clone());
313 }
314 }
315
316 fn docker_socket_path(&self) -> Result<&str, CamelError> {
317 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
318 "npipe:////./pipe/docker_engine"
319 } else {
320 "unix:///var/run/docker.sock"
321 });
322
323 if host.starts_with("unix://") || host.starts_with("npipe://") {
324 return Ok(host);
325 }
326
327 if host.contains("://") {
328 return Err(CamelError::ProcessorError(format!(
329 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
330 host
331 )));
332 }
333
334 Ok(host)
335 }
336
337 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
338 let socket_path = self.docker_socket_path()?;
339 Docker::connect_with_socket(
340 socket_path,
341 DOCKER_CONNECT_TIMEOUT_SECS,
342 bollard::API_DEFAULT_VERSION,
343 )
344 .map_err(|e| {
345 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
346 })
347 }
348
349 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
357 let docker = self.connect_docker_client()?;
358 docker
359 .ping()
360 .await
361 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
362 Ok(docker)
363 }
364
365 #[allow(clippy::type_complexity)]
366 fn parse_ports(
367 &self,
368 ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
369 let ports_str = match self.ports.as_ref() {
370 Some(s) => s,
371 None => return Ok((Vec::new(), HashMap::new())),
372 };
373
374 let mut exposed_ports: Vec<String> = Vec::new();
375 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
376
377 for mapping in ports_str.split(',') {
378 let mapping = mapping.trim();
379 if mapping.is_empty() {
380 continue;
381 }
382
383 let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
384 CamelError::ProcessorError(format!(
385 "malformed port mapping '{}': expected hostPort:containerPort",
386 mapping
387 ))
388 })?;
389
390 let (container_port, protocol) = if container_spec.contains('/') {
391 let parts: Vec<&str> = container_spec.split('/').collect();
392 (parts[0], parts[1])
393 } else {
394 (container_spec, "tcp")
395 };
396
397 let container_key = format!("{}/{}", container_port, protocol);
398
399 exposed_ports.push(container_key.clone());
400
401 port_bindings.insert(
402 container_key,
403 Some(vec![PortBinding {
404 host_ip: None,
405 host_port: Some(host_port.to_string()),
406 }]),
407 );
408 }
409
410 Ok((exposed_ports, port_bindings))
411 }
412
413 fn parse_env(&self) -> Option<Vec<String>> {
414 let env_str = self.env.as_ref()?;
415
416 let env_vars: Vec<String> = env_str
417 .split(',')
418 .map(|s| s.trim().to_string())
419 .filter(|s| !s.is_empty())
420 .collect();
421
422 if env_vars.is_empty() {
423 None
424 } else {
425 Some(env_vars)
426 }
427 }
428
429 #[cfg(test)]
430 #[allow(clippy::type_complexity)]
431 fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
432 self.volumes.as_deref().and_then(parse_volume_str)
433 }
434}
435
436#[allow(clippy::type_complexity)]
441fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
442 let mut binds: Vec<String> = Vec::new();
443 let mut anonymous_volumes: Vec<String> = Vec::new();
444
445 for entry in volumes_str.split(',') {
446 let entry = entry.trim();
447 if entry.is_empty() {
448 continue;
449 }
450
451 let segments: Vec<&str> = entry.split(':').collect();
452
453 match segments.len() {
454 3 => {
455 let source = segments[0];
456 let target = segments[1];
457 let mode = segments[2];
458 if mode != "ro" && mode != "rw" {
459 continue;
460 }
461 binds.push(format!("{}:{}:{}", source, target, mode));
462 }
463 2 => {
464 let a = segments[0];
465 let b = segments[1];
466 if b == "ro" || b == "rw" {
467 anonymous_volumes.push(a.to_string());
468 } else {
469 binds.push(format!("{}:{}", a, b));
470 }
471 }
472 1 => {
473 anonymous_volumes.push(segments[0].to_string());
474 }
475 _ => continue,
476 }
477 }
478
479 if binds.is_empty() && anonymous_volumes.is_empty() {
480 None
481 } else {
482 Some((binds, anonymous_volumes))
483 }
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487enum ProducerOperation {
488 List,
489 Run,
490 Start,
491 Stop,
492 Remove,
493 Exec,
494 NetworkCreate,
495 NetworkConnect,
496 NetworkDisconnect,
497 NetworkRemove,
498 NetworkList,
499}
500
501fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
502 match operation {
503 "list" => Ok(ProducerOperation::List),
504 "run" => Ok(ProducerOperation::Run),
505 "start" => Ok(ProducerOperation::Start),
506 "stop" => Ok(ProducerOperation::Stop),
507 "remove" => Ok(ProducerOperation::Remove),
508 "exec" => Ok(ProducerOperation::Exec),
509 "network-create" => Ok(ProducerOperation::NetworkCreate),
510 "network-connect" => Ok(ProducerOperation::NetworkConnect),
511 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
512 "network-remove" => Ok(ProducerOperation::NetworkRemove),
513 "network-list" => Ok(ProducerOperation::NetworkList),
514 _ => Err(CamelError::ProcessorError(format!(
515 "Unknown container operation: {}",
516 operation
517 ))),
518 }
519}
520
521fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
522 exchange
523 .input
524 .header(HEADER_CONTAINER_NAME)
525 .and_then(|v| v.as_str().map(|s| s.to_string()))
526 .or_else(|| config.name.clone())
527}
528
529async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
530 let images = docker
531 .list_images(None::<ListImagesOptions>)
532 .await
533 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
534
535 Ok(images.iter().any(|img| {
536 img.repo_tags
537 .iter()
538 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
539 }))
540}
541
542async fn pull_image_with_progress(
543 docker: &Docker,
544 image: &str,
545 timeout_secs: u64,
546) -> Result<(), CamelError> {
547 use futures::StreamExt;
548
549 tracing::info!("Pulling image: {}", image);
550
551 let mut stream = docker.create_image(
552 Some(CreateImageOptions {
553 from_image: Some(image.to_string()),
554 ..Default::default()
555 }),
556 None,
557 None,
558 );
559
560 let start = std::time::Instant::now();
561 let mut last_progress = std::time::Instant::now();
562
563 while let Some(item) = stream.next().await {
564 if start.elapsed().as_secs() > timeout_secs {
565 return Err(CamelError::ProcessorError(format!(
566 "Image pull timeout after {}s. Try manually: docker pull {}",
567 timeout_secs, image
568 )));
569 }
570
571 match item {
572 Ok(update) => {
573 if last_progress.elapsed().as_secs() >= 2 {
575 if let Some(status) = update.status {
576 tracing::debug!("Pull progress: {}", status);
577 }
578 last_progress = std::time::Instant::now();
579 }
580 }
581 Err(e) => {
582 let err_str = e.to_string().to_lowercase();
583 if err_str.contains("unauthorized") || err_str.contains("401") {
584 return Err(CamelError::ProcessorError(format!(
585 "Authentication required for image '{}'. Configure Docker credentials: docker login",
586 image
587 )));
588 }
589 if err_str.contains("not found") || err_str.contains("404") {
590 return Err(CamelError::ProcessorError(format!(
591 "Image '{}' not found in registry. Check the image name and tag",
592 image
593 )));
594 }
595 return Err(CamelError::ProcessorError(format!(
596 "Failed to pull image '{}': {}",
597 image, e
598 )));
599 }
600 }
601 }
602
603 tracing::info!("Successfully pulled image: {}", image);
604 Ok(())
605}
606
607async fn ensure_image_available(
608 docker: &Docker,
609 image: &str,
610 auto_pull: bool,
611 timeout_secs: u64,
612) -> Result<(), CamelError> {
613 if image_exists_locally(docker, image).await? {
614 tracing::debug!("Image '{}' already available locally", image);
615 return Ok(());
616 }
617
618 if !auto_pull {
619 return Err(CamelError::ProcessorError(format!(
620 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
621 image, image
622 )));
623 }
624
625 pull_image_with_progress(docker, image, timeout_secs).await
626}
627
628fn format_docker_event(event: &bollard::models::EventMessage) -> String {
629 let action = event.action.as_deref().unwrap_or("unknown");
630 let actor = event.actor.as_ref();
631
632 let container_name = actor
633 .and_then(|a| a.attributes.as_ref())
634 .and_then(|attrs| attrs.get("name"))
635 .map(|s| s.as_str())
636 .unwrap_or("unknown");
637
638 let image = actor
639 .and_then(|a| a.attributes.as_ref())
640 .and_then(|attrs| attrs.get("image"))
641 .map(|s| s.as_str())
642 .unwrap_or("");
643
644 let exit_code = actor
645 .and_then(|a| a.attributes.as_ref())
646 .and_then(|attrs| attrs.get("exitCode"))
647 .map(|s| s.as_str());
648
649 match action {
650 "create" => {
651 if image.is_empty() {
652 format!("[CREATE] Container {}", container_name)
653 } else {
654 format!("[CREATE] Container {} ({})", container_name, image)
655 }
656 }
657 "start" => format!("[START] Container {}", container_name),
658 "die" => {
659 if let Some(code) = exit_code {
660 format!("[DIE] Container {} (exit: {})", container_name, code)
661 } else {
662 format!("[DIE] Container {}", container_name)
663 }
664 }
665 "destroy" => format!("[DESTROY] Container {}", container_name),
666 "stop" => format!("[STOP] Container {}", container_name),
667 "pause" => format!("[PAUSE] Container {}", container_name),
668 "unpause" => format!("[UNPAUSE] Container {}", container_name),
669 "restart" => format!("[RESTART] Container {}", container_name),
670 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
671 }
672}
673
674async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
675 create: CreateFn,
676 start: StartFn,
677 remove: RemoveFn,
678) -> Result<String, CamelError>
679where
680 CreateFn: FnOnce() -> CreateFut,
681 CreateFut: Future<Output = Result<String, CamelError>>,
682 StartFn: FnOnce(String) -> StartFut,
683 StartFut: Future<Output = Result<(), CamelError>>,
684 RemoveFn: FnOnce(String) -> RemoveFut,
685 RemoveFut: Future<Output = Result<(), CamelError>>,
686{
687 let container_id = create().await?;
688 if let Err(start_err) = start(container_id.clone()).await {
689 if let Err(remove_err) = remove(container_id.clone()).await {
690 return Err(CamelError::ProcessorError(format!(
691 "Failed to start container: {}. Cleanup failed: {}",
692 start_err, remove_err
693 )));
694 }
695 return Err(start_err);
696 }
697
698 Ok(container_id)
699}
700
701async fn handle_list(
702 docker: Docker,
703 _config: ContainerConfig,
704 exchange: &mut Exchange,
705) -> Result<(), CamelError> {
706 let containers = docker
707 .list_containers(None::<ListContainersOptions>)
708 .await
709 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
710
711 let json_value = serde_json::to_value(&containers).map_err(|e| {
712 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
713 })?;
714
715 exchange.input.body = Body::Json(json_value);
716 exchange.input.set_header(
717 HEADER_ACTION_RESULT,
718 serde_json::Value::String("success".to_string()),
719 );
720 Ok(())
721}
722
723async fn handle_run(
724 docker: Docker,
725 config: ContainerConfig,
726 exchange: &mut Exchange,
727) -> Result<(), CamelError> {
728 let image = exchange
729 .input
730 .header(HEADER_IMAGE)
731 .and_then(|v| v.as_str().map(|s| s.to_string()))
732 .or(config.image.clone())
733 .ok_or_else(|| {
734 CamelError::ProcessorError(
735 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
736 )
737 })?;
738
739 let image = if !image.contains(':') && !image.contains('@') {
740 format!("{}:latest", image)
741 } else {
742 image
743 };
744
745 let pull_timeout = 300;
746 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
747 .await
748 .map_err(|e| {
749 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
750 })?;
751
752 let container_name = resolve_container_name(exchange, &config);
753 let container_name_ref = container_name.as_deref().unwrap_or("");
754 let cmd_parts: Option<Vec<String>> = config
755 .cmd
756 .as_ref()
757 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
758 let auto_remove = config.auto_remove;
759 let (exposed_ports, port_bindings) = config.parse_ports()?;
760 let env_vars = config.parse_env();
761 let network_mode = config.network.clone();
762
763 let volumes_str = exchange
764 .input
765 .header(HEADER_VOLUMES)
766 .and_then(|v| v.as_str().map(|s| s.to_string()))
767 .or(config.volumes.clone());
768 let (binds, anon_volumes) = volumes_str
769 .as_deref()
770 .and_then(parse_volume_str)
771 .unwrap_or_default();
772
773 let docker_create = docker.clone();
774 let docker_start = docker.clone();
775 let docker_remove = docker.clone();
776
777 let container_id = run_container_with_cleanup(
778 move || async move {
779 let create_options = CreateContainerOptions {
780 name: Some(container_name_ref.to_string()),
781 ..Default::default()
782 };
783 let container_config = ContainerCreateBody {
784 image: Some(image.clone()),
785 cmd: cmd_parts,
786 env: env_vars,
787 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
788 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
789 host_config: Some(HostConfig {
790 auto_remove: Some(auto_remove),
791 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
792 network_mode,
793 binds: if binds.is_empty() { None } else { Some(binds) },
794 ..Default::default()
795 }),
796 ..Default::default()
797 };
798
799 let create_response = docker_create
800 .create_container(Some(create_options), container_config)
801 .await
802 .map_err(|e| {
803 let err_str = e.to_string().to_lowercase();
804 if err_str.contains("409") || err_str.contains("conflict") {
805 CamelError::ProcessorError(format!(
806 "Container name '{}' already exists. Use a unique name or remove the existing container first",
807 container_name_ref
808 ))
809 } else {
810 CamelError::ProcessorError(format!(
811 "Failed to create container: {}",
812 e
813 ))
814 }
815 })?;
816
817 Ok(create_response.id)
818 },
819 move |container_id| async move {
820 docker_start
821 .start_container(&container_id, None::<StartContainerOptions>)
822 .await
823 .map_err(|e| {
824 CamelError::ProcessorError(format!(
825 "Failed to start container: {}",
826 e
827 ))
828 })
829 },
830 move |container_id| async move {
831 docker_remove
832 .remove_container(&container_id, None)
833 .await
834 .map_err(|e| {
835 CamelError::ProcessorError(format!(
836 "Failed to remove container after start failure: {}",
837 e
838 ))
839 })
840 },
841 )
842 .await?;
843
844 track_container(container_id.clone());
845
846 exchange
847 .input
848 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
849 exchange.input.set_header(
850 HEADER_ACTION_RESULT,
851 serde_json::Value::String("success".to_string()),
852 );
853 Ok(())
854}
855
856async fn handle_lifecycle(
857 docker: Docker,
858 _config: ContainerConfig,
859 exchange: &mut Exchange,
860 operation: ProducerOperation,
861 operation_name: &str,
862) -> Result<(), CamelError> {
863 let container_id = exchange
864 .input
865 .header(HEADER_CONTAINER_ID)
866 .and_then(|v| v.as_str().map(|s| s.to_string()))
867 .ok_or_else(|| {
868 CamelError::ProcessorError(format!(
869 "{} header is required for {} operation",
870 HEADER_CONTAINER_ID, operation_name
871 ))
872 })?;
873
874 match operation {
875 ProducerOperation::Start => {
876 docker
877 .start_container(&container_id, None::<StartContainerOptions>)
878 .await
879 .map_err(|e| {
880 CamelError::ProcessorError(format!("Failed to start container: {}", e))
881 })?;
882 }
883 ProducerOperation::Stop => {
884 docker
885 .stop_container(&container_id, None)
886 .await
887 .map_err(|e| {
888 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
889 })?;
890 }
891 ProducerOperation::Remove => {
892 docker
893 .remove_container(&container_id, None)
894 .await
895 .map_err(|e| {
896 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
897 })?;
898 untrack_container(&container_id);
899 }
900 _ => {}
901 }
902
903 exchange.input.set_header(
904 HEADER_ACTION_RESULT,
905 serde_json::Value::String("success".to_string()),
906 );
907 Ok(())
908}
909
910async fn handle_exec(
911 docker: Docker,
912 config: ContainerConfig,
913 exchange: &mut Exchange,
914) -> Result<(), CamelError> {
915 let container_id = exchange
916 .input
917 .header(HEADER_CONTAINER_ID)
918 .and_then(|v| v.as_str().map(|s| s.to_string()))
919 .or(config.container_id.clone())
920 .ok_or_else(|| {
921 CamelError::ProcessorError(format!(
922 "{} header or containerId param is required for exec operation",
923 HEADER_CONTAINER_ID
924 ))
925 })?;
926
927 let cmd = exchange
928 .input
929 .header(HEADER_CMD)
930 .and_then(|v| v.as_str().map(|s| s.to_string()))
931 .or(config.cmd.clone())
932 .ok_or_else(|| {
933 CamelError::ProcessorError(
934 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
935 )
936 })?;
937
938 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
939 let env_vars = config.parse_env();
940
941 let exec_config = bollard::exec::CreateExecOptions {
942 cmd: Some(cmd_parts),
943 env: env_vars,
944 user: config.user.clone(),
945 working_dir: config.workdir.clone(),
946 attach_stdout: Some(true),
947 attach_stderr: Some(true),
948 ..Default::default()
949 };
950
951 let create_result = docker
952 .create_exec(&container_id, exec_config)
953 .await
954 .map_err(|e| {
955 let err_str = e.to_string().to_lowercase();
956 if err_str.contains("404") || err_str.contains("no such") {
957 CamelError::ProcessorError(format!(
958 "Container '{}' not found for exec",
959 container_id
960 ))
961 } else {
962 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
963 }
964 })?;
965
966 let exec_id = create_result.id;
967
968 if config.detach {
969 docker
970 .start_exec(
971 &exec_id,
972 Some(bollard::exec::StartExecOptions {
973 detach: true,
974 ..Default::default()
975 }),
976 )
977 .await
978 .map_err(|e| {
979 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
980 })?;
981
982 exchange
983 .input
984 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
985 exchange
986 .input
987 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
988 } else {
989 let start_result = docker
990 .start_exec(&exec_id, None)
991 .await
992 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
993
994 let mut output = String::new();
995
996 match start_result {
997 bollard::exec::StartExecResults::Attached {
998 output: mut stream, ..
999 } => {
1000 use futures::StreamExt;
1001 while let Some(msg) = stream.next().await {
1002 match msg {
1003 Ok(bollard::container::LogOutput::StdOut { message }) => {
1004 output.push_str(&String::from_utf8_lossy(&message));
1005 }
1006 Ok(bollard::container::LogOutput::StdErr { message }) => {
1007 output.push_str(&String::from_utf8_lossy(&message));
1008 }
1009 Ok(_) => {}
1010 Err(e) => {
1011 output.push_str(&format!("[error reading stream: {}]", e));
1012 }
1013 }
1014 }
1015 }
1016 bollard::exec::StartExecResults::Detached => {}
1017 }
1018
1019 let inspect = docker
1020 .inspect_exec(&exec_id)
1021 .await
1022 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1023
1024 let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1025 CamelError::ProcessorError("container exec returned no exit code".into())
1026 })?;
1027
1028 let output = output.trim_end().to_string();
1029 exchange.input.body = Body::Text(output);
1030 exchange.input.set_header(
1031 HEADER_EXIT_CODE,
1032 serde_json::Value::Number(exit_code.into()),
1033 );
1034 exchange
1035 .input
1036 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1037 }
1038
1039 exchange.input.set_header(
1040 HEADER_ACTION_RESULT,
1041 serde_json::Value::String("success".to_string()),
1042 );
1043 Ok(())
1044}
1045
1046async fn handle_network_create(
1047 docker: Docker,
1048 config: ContainerConfig,
1049 exchange: &mut Exchange,
1050) -> Result<(), CamelError> {
1051 let network_name = exchange
1052 .input
1053 .header(HEADER_CONTAINER_NAME)
1054 .and_then(|v| v.as_str().map(|s| s.to_string()))
1055 .or(config.name.clone())
1056 .ok_or_else(|| {
1057 CamelError::ProcessorError(
1058 "CamelContainerName header or name param is required for network-create"
1059 .to_string(),
1060 )
1061 })?;
1062
1063 let driver = config.driver.as_deref().unwrap_or("bridge");
1064
1065 let options = NetworkCreateRequest {
1066 name: network_name.clone(),
1067 driver: Some(driver.to_string()),
1068 ..Default::default()
1069 };
1070
1071 let result = docker.create_network(options).await.map_err(|e| {
1072 let err_str = e.to_string().to_lowercase();
1073 if err_str.contains("409") || err_str.contains("already exists") {
1074 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1075 } else {
1076 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1077 }
1078 })?;
1079
1080 let network_id = result.id.clone();
1081 let json_value = serde_json::to_value(&result).map_err(|e| {
1082 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1083 })?;
1084
1085 exchange.input.body = Body::Json(json_value);
1086 exchange
1087 .input
1088 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1089 exchange.input.set_header(
1090 HEADER_ACTION_RESULT,
1091 serde_json::Value::String("success".to_string()),
1092 );
1093 Ok(())
1094}
1095
1096async fn handle_network_connect(
1097 docker: Docker,
1098 config: ContainerConfig,
1099 exchange: &mut Exchange,
1100) -> Result<(), CamelError> {
1101 let network = exchange
1102 .input
1103 .header(HEADER_NETWORK)
1104 .and_then(|v| v.as_str().map(|s| s.to_string()))
1105 .or(config.network.clone())
1106 .ok_or_else(|| {
1107 CamelError::ProcessorError(
1108 "CamelContainerNetwork header or network param is required for network-connect"
1109 .to_string(),
1110 )
1111 })?;
1112
1113 let container = exchange
1114 .input
1115 .header(HEADER_CONTAINER_ID)
1116 .and_then(|v| v.as_str().map(|s| s.to_string()))
1117 .or(config.container_id.clone())
1118 .ok_or_else(|| {
1119 CamelError::ProcessorError(
1120 "CamelContainerId header or container param is required for network-connect"
1121 .to_string(),
1122 )
1123 })?;
1124
1125 docker
1126 .connect_network(
1127 &network,
1128 NetworkConnectRequest {
1129 container,
1130 ..Default::default()
1131 },
1132 )
1133 .await
1134 .map_err(|e| {
1135 let err_str = e.to_string().to_lowercase();
1136 if err_str.contains("404") || err_str.contains("not found") {
1137 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1138 } else {
1139 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1140 }
1141 })?;
1142
1143 exchange.input.set_header(
1144 HEADER_ACTION_RESULT,
1145 serde_json::Value::String("success".to_string()),
1146 );
1147 Ok(())
1148}
1149
1150async fn handle_network_disconnect(
1151 docker: Docker,
1152 config: ContainerConfig,
1153 exchange: &mut Exchange,
1154) -> Result<(), CamelError> {
1155 let network = exchange
1156 .input
1157 .header(HEADER_NETWORK)
1158 .and_then(|v| v.as_str().map(|s| s.to_string()))
1159 .or(config.network.clone())
1160 .ok_or_else(|| {
1161 CamelError::ProcessorError(
1162 "CamelContainerNetwork header or network param is required for network-disconnect"
1163 .to_string(),
1164 )
1165 })?;
1166
1167 let container = exchange
1168 .input
1169 .header(HEADER_CONTAINER_ID)
1170 .and_then(|v| v.as_str().map(|s| s.to_string()))
1171 .or(config.container_id.clone())
1172 .ok_or_else(|| {
1173 CamelError::ProcessorError(
1174 "CamelContainerId header or container param is required for network-disconnect"
1175 .to_string(),
1176 )
1177 })?;
1178
1179 docker
1180 .disconnect_network(
1181 &network,
1182 NetworkDisconnectRequest {
1183 container,
1184 force: Some(config.force),
1185 },
1186 )
1187 .await
1188 .map_err(|e| {
1189 let err_str = e.to_string().to_lowercase();
1190 if err_str.contains("404") || err_str.contains("not found") {
1191 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1192 } else {
1193 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1194 }
1195 })?;
1196
1197 exchange.input.set_header(
1198 HEADER_ACTION_RESULT,
1199 serde_json::Value::String("success".to_string()),
1200 );
1201 Ok(())
1202}
1203
1204async fn handle_network_remove(
1205 docker: Docker,
1206 config: ContainerConfig,
1207 exchange: &mut Exchange,
1208) -> Result<(), CamelError> {
1209 let network = exchange
1210 .input
1211 .header(HEADER_NETWORK)
1212 .and_then(|v| v.as_str().map(|s| s.to_string()))
1213 .or(config.network.clone())
1214 .ok_or_else(|| {
1215 CamelError::ProcessorError(
1216 "CamelContainerNetwork header or network param is required for network-remove"
1217 .to_string(),
1218 )
1219 })?;
1220
1221 docker.remove_network(&network).await.map_err(|e| {
1222 let err_str = e.to_string().to_lowercase();
1223 if err_str.contains("404") || err_str.contains("not found") {
1224 CamelError::ProcessorError(format!("Network '{}' not found", network))
1225 } else if err_str.contains("409") || err_str.contains("in use") {
1226 CamelError::ProcessorError(format!(
1227 "Network '{}' is in use and cannot be removed",
1228 network
1229 ))
1230 } else {
1231 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1232 }
1233 })?;
1234
1235 exchange.input.set_header(
1236 HEADER_ACTION_RESULT,
1237 serde_json::Value::String("success".to_string()),
1238 );
1239 Ok(())
1240}
1241
1242async fn handle_network_list(
1243 docker: Docker,
1244 _config: ContainerConfig,
1245 exchange: &mut Exchange,
1246) -> Result<(), CamelError> {
1247 let networks = docker
1248 .list_networks(None::<ListNetworksOptions>)
1249 .await
1250 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1251
1252 let json_value = serde_json::to_value(&networks)
1253 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1254
1255 exchange.input.body = Body::Json(json_value);
1256 exchange.input.set_header(
1257 HEADER_ACTION_RESULT,
1258 serde_json::Value::String("success".to_string()),
1259 );
1260 Ok(())
1261}
1262
1263#[derive(Clone)]
1268pub struct ContainerProducer {
1269 config: ContainerConfig,
1270 docker: Docker,
1271}
1272
1273impl Service<Exchange> for ContainerProducer {
1274 type Response = Exchange;
1275 type Error = CamelError;
1276 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1277
1278 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1279 Poll::Ready(Ok(()))
1280 }
1281
1282 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1283 let config = self.config.clone();
1284 let docker = self.docker.clone();
1285 Box::pin(async move {
1286 let operation_name = exchange
1287 .input
1288 .header(HEADER_ACTION)
1289 .and_then(|v| v.as_str().map(|s| s.to_string()))
1290 .unwrap_or_else(|| config.operation.clone());
1291
1292 let operation = parse_producer_operation(&operation_name)?;
1293
1294 match operation {
1295 ProducerOperation::List => {
1296 handle_list(docker, config, &mut exchange).await?;
1297 }
1298 ProducerOperation::Run => {
1299 handle_run(docker, config, &mut exchange).await?;
1300 }
1301 ProducerOperation::Start => {
1302 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1303 .await?;
1304 }
1305 ProducerOperation::Stop => {
1306 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1307 .await?;
1308 }
1309 ProducerOperation::Remove => {
1310 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1311 .await?;
1312 }
1313 ProducerOperation::Exec => {
1314 handle_exec(docker, config, &mut exchange).await?;
1315 }
1316 ProducerOperation::NetworkCreate => {
1317 handle_network_create(docker, config, &mut exchange).await?;
1318 }
1319 ProducerOperation::NetworkConnect => {
1320 handle_network_connect(docker, config, &mut exchange).await?;
1321 }
1322 ProducerOperation::NetworkDisconnect => {
1323 handle_network_disconnect(docker, config, &mut exchange).await?;
1324 }
1325 ProducerOperation::NetworkRemove => {
1326 handle_network_remove(docker, config, &mut exchange).await?;
1327 }
1328 ProducerOperation::NetworkList => {
1329 handle_network_list(docker, config, &mut exchange).await?;
1330 }
1331 }
1332
1333 Ok(exchange)
1334 })
1335 }
1336}
1337
1338pub struct ContainerConsumer {
1343 config: ContainerConfig,
1344}
1345
1346#[async_trait]
1347impl Consumer for ContainerConsumer {
1348 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1349 match self.config.operation.as_str() {
1350 "events" => self.start_events_consumer(context).await,
1351 "logs" => self.start_logs_consumer(context).await,
1352 _ => Err(CamelError::EndpointCreationFailed(format!(
1353 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1354 self.config.operation
1355 ))),
1356 }
1357 }
1358
1359 async fn stop(&mut self) -> Result<(), CamelError> {
1360 Ok(())
1361 }
1362
1363 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1364 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1365 }
1366}
1367
1368impl ContainerConsumer {
1369 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1370 use futures::StreamExt;
1371
1372 loop {
1373 if context.is_cancelled() {
1374 tracing::info!("Container events consumer shutting down");
1375 return Ok(());
1376 }
1377
1378 let docker = match self.config.connect_docker().await {
1379 Ok(d) => d,
1380 Err(e) => {
1381 tracing::error!(
1382 "Consumer failed to connect to docker: {}. Retrying in 5s...",
1383 e
1384 );
1385 tokio::select! {
1386 _ = context.cancelled() => {
1387 tracing::info!("Container events consumer shutting down");
1388 return Ok(());
1389 }
1390 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1391 }
1392 continue;
1393 }
1394 };
1395
1396 let mut event_stream = docker.events(None::<EventsOptions>);
1397
1398 loop {
1399 tokio::select! {
1400 _ = context.cancelled() => {
1401 tracing::info!("Container events consumer shutting down");
1402 return Ok(());
1403 }
1404
1405 msg = event_stream.next() => {
1406 match msg {
1407 Some(Ok(event)) => {
1408 let formatted = format_docker_event(&event);
1409 let message = Message::new(Body::Text(formatted));
1410 let exchange = Exchange::new(message);
1411
1412 if let Err(e) = context.send(exchange).await {
1413 tracing::error!("Failed to send exchange: {:?}", e);
1414 break;
1415 }
1416 }
1417 Some(Err(e)) => {
1418 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1419 break;
1420 }
1421 None => {
1422 tracing::info!("Docker event stream ended. Reconnecting...");
1423 break;
1424 }
1425 }
1426 }
1427 }
1428 }
1429
1430 tokio::select! {
1431 _ = context.cancelled() => {
1432 tracing::info!("Container events consumer shutting down");
1433 return Ok(());
1434 }
1435 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1436 }
1437 }
1438 }
1439
1440 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1441 use futures::StreamExt;
1442
1443 let container_id = self.config.container_id.clone().ok_or_else(|| {
1444 CamelError::EndpointCreationFailed(
1445 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1446 .to_string(),
1447 )
1448 })?;
1449
1450 loop {
1451 if context.is_cancelled() {
1452 tracing::info!("Container logs consumer shutting down");
1453 return Ok(());
1454 }
1455
1456 let docker = match self.config.connect_docker().await {
1457 Ok(d) => d,
1458 Err(e) => {
1459 tracing::error!(
1460 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1461 e
1462 );
1463 tokio::select! {
1464 _ = context.cancelled() => {
1465 tracing::info!("Container logs consumer shutting down");
1466 return Ok(());
1467 }
1468 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1469 }
1470 continue;
1471 }
1472 };
1473
1474 let tail = self
1475 .config
1476 .tail
1477 .clone()
1478 .unwrap_or_else(|| "all".to_string());
1479
1480 let options = LogsOptions {
1481 follow: self.config.follow,
1482 stdout: true,
1483 stderr: true,
1484 timestamps: self.config.timestamps,
1485 tail,
1486 ..Default::default()
1487 };
1488
1489 let mut log_stream = docker.logs(&container_id, Some(options));
1490 let container_id_header = container_id.clone();
1491
1492 loop {
1493 tokio::select! {
1494 _ = context.cancelled() => {
1495 tracing::info!("Container logs consumer shutting down");
1496 return Ok(());
1497 }
1498
1499 msg = log_stream.next() => {
1500 match msg {
1501 Some(Ok(log_output)) => {
1502 let (stream_type, content) = match log_output {
1503 bollard::container::LogOutput::StdOut { message } => {
1504 ("stdout", String::from_utf8_lossy(&message).into_owned())
1505 }
1506 bollard::container::LogOutput::StdErr { message } => {
1507 ("stderr", String::from_utf8_lossy(&message).into_owned())
1508 }
1509 bollard::container::LogOutput::Console { message } => {
1510 ("console", String::from_utf8_lossy(&message).into_owned())
1511 }
1512 bollard::container::LogOutput::StdIn { message } => {
1513 ("stdin", String::from_utf8_lossy(&message).into_owned())
1514 }
1515 };
1516
1517 let content = content.trim_end();
1518 if content.is_empty() {
1519 continue;
1520 }
1521
1522 let mut message = Message::new(Body::Text(content.to_string()));
1523 message.set_header(
1524 HEADER_CONTAINER_ID,
1525 serde_json::Value::String(container_id_header.clone()),
1526 );
1527 message.set_header(
1528 HEADER_LOG_STREAM,
1529 serde_json::Value::String(stream_type.to_string()),
1530 );
1531
1532 if self.config.timestamps
1533 && let Some(ts) = extract_timestamp(content) {
1534 message.set_header(
1535 HEADER_LOG_TIMESTAMP,
1536 serde_json::Value::String(ts),
1537 );
1538 }
1539
1540 let exchange = Exchange::new(message);
1541
1542 if let Err(e) = context.send(exchange).await {
1543 tracing::error!("Failed to send log exchange: {:?}", e);
1544 break;
1545 }
1546 }
1547 Some(Err(e)) => {
1548 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1549 break;
1550 }
1551 None => {
1552 if self.config.follow {
1553 tracing::info!("Docker log stream ended. Reconnecting...");
1554 break;
1555 } else {
1556 tracing::info!("Container logs consumer finished (follow=false)");
1557 return Ok(());
1558 }
1559 }
1560 }
1561 }
1562 }
1563 }
1564
1565 tokio::select! {
1566 _ = context.cancelled() => {
1567 tracing::info!("Container logs consumer shutting down");
1568 return Ok(());
1569 }
1570 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1571 }
1572 }
1573 }
1574}
1575
1576fn extract_timestamp(log_line: &str) -> Option<String> {
1577 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1578 if parts.len() > 1 && parts[0].contains('T') {
1579 Some(parts[0].to_string())
1580 } else {
1581 None
1582 }
1583}
1584
1585pub struct ContainerComponent {
1593 config: Option<ContainerGlobalConfig>,
1594}
1595
1596impl ContainerComponent {
1597 pub fn new() -> Self {
1599 Self { config: None }
1600 }
1601
1602 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1604 Self {
1605 config: Some(config),
1606 }
1607 }
1608
1609 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1611 Self { config }
1612 }
1613}
1614
1615impl Default for ContainerComponent {
1616 fn default() -> Self {
1617 Self::new()
1618 }
1619}
1620
1621impl Component for ContainerComponent {
1622 fn scheme(&self) -> &str {
1623 "container"
1624 }
1625
1626 fn create_endpoint(
1627 &self,
1628 uri: &str,
1629 _ctx: &dyn camel_component_api::ComponentContext,
1630 ) -> Result<Box<dyn Endpoint>, CamelError> {
1631 let mut config = ContainerConfig::from_uri(uri)?;
1632 if let Some(ref global) = self.config {
1634 config.apply_global_defaults(global);
1635 }
1636 Ok(Box::new(ContainerEndpoint {
1637 uri: uri.to_string(),
1638 config,
1639 }))
1640 }
1641}
1642
1643pub struct ContainerEndpoint {
1650 uri: String,
1651 config: ContainerConfig,
1652}
1653
1654impl ContainerEndpoint {
1655 pub fn docker_host(&self) -> Option<&str> {
1658 self.config.host.as_deref()
1659 }
1660}
1661
1662impl Endpoint for ContainerEndpoint {
1663 fn uri(&self) -> &str {
1664 &self.uri
1665 }
1666
1667 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1668 Ok(Box::new(ContainerConsumer {
1669 config: self.config.clone(),
1670 }))
1671 }
1672
1673 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1674 let docker = self.config.connect_docker_client()?;
1675 Ok(BoxProcessor::new(ContainerProducer {
1676 config: self.config.clone(),
1677 docker,
1678 }))
1679 }
1680}
1681
1682#[cfg(test)]
1683mod tests {
1684 use super::*;
1685 use camel_component_api::NoOpComponentContext;
1686
1687 #[test]
1688 fn test_container_config() {
1689 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1690 assert_eq!(config.operation, "run");
1691 assert_eq!(config.image.as_deref(), Some("alpine"));
1692 assert!(config.host.is_none());
1694 }
1695
1696 #[test]
1697 fn test_global_config_applied_to_endpoint() {
1698 let global =
1701 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1702 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1703 assert!(
1704 config.host.is_none(),
1705 "URI without ?host= should leave host as None"
1706 );
1707 config.apply_global_defaults(&global);
1708 assert_eq!(
1709 config.host.as_deref(),
1710 Some("unix:///custom/docker.sock"),
1711 "global docker_host must be applied when URI did not set host"
1712 );
1713 }
1714
1715 #[test]
1716 fn test_uri_param_wins_over_global_config() {
1717 let global =
1719 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1720 let mut config =
1721 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1722 .unwrap();
1723 assert_eq!(
1724 config.host.as_deref(),
1725 Some("unix:///override.sock"),
1726 "URI-set host should be parsed correctly"
1727 );
1728 config.apply_global_defaults(&global);
1729 assert_eq!(
1730 config.host.as_deref(),
1731 Some("unix:///override.sock"),
1732 "global config must NOT override a host already set by URI"
1733 );
1734 }
1735
1736 #[test]
1737 fn test_container_config_parses_name() {
1738 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1739 assert_eq!(config.name.as_deref(), Some("my-container"));
1740 }
1741
1742 #[test]
1743 fn test_parse_producer_operation_known() {
1744 assert_eq!(
1745 parse_producer_operation("list").unwrap(),
1746 ProducerOperation::List
1747 );
1748 assert_eq!(
1749 parse_producer_operation("run").unwrap(),
1750 ProducerOperation::Run
1751 );
1752 assert_eq!(
1753 parse_producer_operation("start").unwrap(),
1754 ProducerOperation::Start
1755 );
1756 assert_eq!(
1757 parse_producer_operation("stop").unwrap(),
1758 ProducerOperation::Stop
1759 );
1760 assert_eq!(
1761 parse_producer_operation("remove").unwrap(),
1762 ProducerOperation::Remove
1763 );
1764 }
1765
1766 #[test]
1767 fn test_parse_producer_operation_unknown() {
1768 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1769 match err {
1770 CamelError::ProcessorError(msg) => {
1771 assert!(
1772 msg.contains("Unknown container operation"),
1773 "Unexpected error message: {}",
1774 msg
1775 );
1776 }
1777 _ => panic!("Expected ProcessorError for unknown operation"),
1778 }
1779 }
1780
1781 #[test]
1782 fn test_parse_producer_operation_new_variants() {
1783 assert_eq!(
1784 parse_producer_operation("exec").unwrap(),
1785 ProducerOperation::Exec
1786 );
1787 assert_eq!(
1788 parse_producer_operation("network-create").unwrap(),
1789 ProducerOperation::NetworkCreate
1790 );
1791 assert_eq!(
1792 parse_producer_operation("network-connect").unwrap(),
1793 ProducerOperation::NetworkConnect
1794 );
1795 assert_eq!(
1796 parse_producer_operation("network-disconnect").unwrap(),
1797 ProducerOperation::NetworkDisconnect
1798 );
1799 assert_eq!(
1800 parse_producer_operation("network-remove").unwrap(),
1801 ProducerOperation::NetworkRemove
1802 );
1803 assert_eq!(
1804 parse_producer_operation("network-list").unwrap(),
1805 ProducerOperation::NetworkList
1806 );
1807 }
1808
1809 #[test]
1810 fn test_resolve_container_name_header_overrides_config() {
1811 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1812 let mut exchange = Exchange::new(Message::new(""));
1813 exchange.input.set_header(
1814 HEADER_CONTAINER_NAME,
1815 serde_json::Value::String("header-name".to_string()),
1816 );
1817
1818 let resolved = resolve_container_name(&exchange, &config);
1819 assert_eq!(resolved.as_deref(), Some("header-name"));
1820 }
1821
1822 #[test]
1823 fn test_container_config_rejects_tcp_host() {
1824 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1825 let err = config.connect_docker_client().unwrap_err();
1826 match err {
1827 CamelError::ProcessorError(msg) => {
1828 assert!(
1829 msg.to_lowercase().contains("tcp"),
1830 "Expected TCP scheme error, got: {}",
1831 msg
1832 );
1833 }
1834 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1835 }
1836 }
1837
1838 #[tokio::test]
1839 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1840 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1841 let remove_called_clone = remove_called.clone();
1842
1843 let result = run_container_with_cleanup(
1844 || async { Ok("container-123".to_string()) },
1845 |_id| async move {
1846 Err(CamelError::ProcessorError(
1847 "Failed to start container".to_string(),
1848 ))
1849 },
1850 move |_id| {
1851 let remove_called_inner = remove_called_clone.clone();
1852 async move {
1853 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1854 Ok(())
1855 }
1856 },
1857 )
1858 .await;
1859
1860 assert!(result.is_err(), "Expected start failure to bubble up");
1861 assert!(
1862 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1863 "Expected cleanup to remove container"
1864 );
1865 }
1866
1867 #[test]
1868 fn test_container_component_creates_endpoint() {
1869 let component = ContainerComponent::new();
1870 assert_eq!(component.scheme(), "container");
1871 let ctx = NoOpComponentContext;
1872 let endpoint = component
1873 .create_endpoint("container:run?image=alpine", &ctx)
1874 .unwrap();
1875 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1876 }
1877
1878 #[test]
1879 fn test_container_config_parses_ports() {
1880 let config =
1881 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1882 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1883 }
1884
1885 #[test]
1886 fn test_container_config_parses_env() {
1887 let config =
1888 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1889 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1890 }
1891
1892 #[test]
1893 fn test_container_config_parses_logs_options() {
1894 let config = ContainerConfig::from_uri(
1895 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1896 )
1897 .unwrap();
1898 assert_eq!(config.operation, "logs");
1899 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1900 assert!(config.follow);
1901 assert!(config.timestamps);
1902 assert_eq!(config.tail.as_deref(), Some("100"));
1903 }
1904
1905 #[test]
1906 fn test_container_config_logs_defaults() {
1907 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1908 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1912
1913 #[test]
1914 fn test_parse_ports_single() {
1915 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1916 let (exposed, bindings) = config.parse_ports().unwrap();
1917
1918 assert!(exposed.contains(&"80/tcp".to_string()));
1919 assert!(bindings.contains_key("80/tcp"));
1920
1921 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1922 assert_eq!(binding.len(), 1);
1923 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1924 }
1925
1926 #[test]
1927 fn test_parse_ports_multiple() {
1928 let config =
1929 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1930 let (exposed, bindings) = config.parse_ports().unwrap();
1931
1932 assert!(exposed.contains(&"80/tcp".to_string()));
1933 assert!(exposed.contains(&"443/tcp".to_string()));
1934 assert_eq!(bindings.len(), 2);
1935 }
1936
1937 #[test]
1938 fn test_parse_ports_with_protocol() {
1939 let config =
1940 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1941 .unwrap();
1942 let (exposed, _bindings) = config.parse_ports().unwrap();
1943
1944 assert!(exposed.contains(&"80/tcp".to_string()));
1945 assert!(exposed.contains(&"53/udp".to_string()));
1946 }
1947
1948 #[test]
1949 fn test_parse_ports_none() {
1950 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1951 let (exposed, bindings) = config.parse_ports().unwrap();
1952 assert!(exposed.is_empty());
1953 assert!(bindings.is_empty());
1954 }
1955
1956 #[test]
1957 fn test_parse_env_single() {
1958 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1959 let env = config.parse_env().unwrap();
1960
1961 assert_eq!(env.len(), 1);
1962 assert_eq!(env[0], "FOO=bar");
1963 }
1964
1965 #[test]
1966 fn test_parse_env_multiple() {
1967 let config =
1968 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1969 .unwrap();
1970 let env = config.parse_env().unwrap();
1971
1972 assert_eq!(env.len(), 3);
1973 assert!(env.contains(&"FOO=bar".to_string()));
1974 assert!(env.contains(&"BAZ=qux".to_string()));
1975 assert!(env.contains(&"NUM=123".to_string()));
1976 }
1977
1978 #[test]
1979 fn test_parse_env_none() {
1980 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1981 assert!(config.parse_env().is_none());
1982 }
1983
1984 use camel_component_api::Message;
1985 use std::sync::Arc;
1986
1987 #[tokio::test]
1988 async fn test_container_producer_resolves_operation_from_header() {
1989 let docker = match Docker::connect_with_local_defaults() {
1991 Ok(d) => d,
1992 Err(_) => {
1993 eprintln!("Skipping test: Could not connect to Docker daemon");
1994 return;
1995 }
1996 };
1997
1998 if docker.ping().await.is_err() {
1999 eprintln!("Skipping test: Docker daemon not responding to ping");
2000 return;
2001 }
2002
2003 let component = ContainerComponent::new();
2004 let ctx = NoOpComponentContext;
2005 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2006
2007 let ctx = ProducerContext::new();
2008 let mut producer = endpoint.create_producer(&ctx).unwrap();
2009
2010 let mut exchange = Exchange::new(Message::new(""));
2011 exchange
2012 .input
2013 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2014
2015 use tower::ServiceExt;
2016 let result = producer
2017 .ready()
2018 .await
2019 .unwrap()
2020 .call(exchange)
2021 .await
2022 .unwrap();
2023
2024 assert_eq!(
2026 result
2027 .input
2028 .header(HEADER_ACTION_RESULT)
2029 .map(|v| v.as_str().unwrap()),
2030 Some("success")
2031 );
2032 }
2033
2034 #[tokio::test]
2035 async fn test_container_producer_connection_error_on_invalid_host() {
2036 let component = ContainerComponent::new();
2038 let ctx = NoOpComponentContext;
2039 let endpoint = component
2040 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2041 .unwrap();
2042
2043 let ctx = ProducerContext::new();
2044 let result = endpoint.create_producer(&ctx);
2045
2046 assert!(
2048 result.is_err(),
2049 "Expected error when connecting to invalid host"
2050 );
2051 let err = result.unwrap_err();
2052 match &err {
2053 CamelError::ProcessorError(msg) => {
2054 assert!(
2055 msg.to_lowercase().contains("connection")
2056 || msg.to_lowercase().contains("connect")
2057 || msg.to_lowercase().contains("socket")
2058 || msg.contains("docker"),
2059 "Error message should indicate connection failure, got: {}",
2060 msg
2061 );
2062 }
2063 _ => panic!("Expected ProcessorError, got: {:?}", err),
2064 }
2065 }
2066
2067 #[tokio::test]
2069 async fn test_container_producer_lifecycle_operations_missing_id() {
2070 let docker = match Docker::connect_with_local_defaults() {
2072 Ok(d) => d,
2073 Err(_) => {
2074 eprintln!("Skipping test: Could not connect to Docker daemon");
2075 return;
2076 }
2077 };
2078
2079 if docker.ping().await.is_err() {
2080 eprintln!("Skipping test: Docker daemon not responding to ping");
2081 return;
2082 }
2083
2084 let component = ContainerComponent::new();
2085 let ctx = NoOpComponentContext;
2086 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2087 let ctx = ProducerContext::new();
2088 let mut producer = endpoint.create_producer(&ctx).unwrap();
2089
2090 for operation in ["start", "stop", "remove"] {
2092 let mut exchange = Exchange::new(Message::new(""));
2093 exchange.input.set_header(
2094 HEADER_ACTION,
2095 serde_json::Value::String(operation.to_string()),
2096 );
2097 use tower::ServiceExt;
2100 let result = producer.ready().await.unwrap().call(exchange).await;
2101
2102 assert!(
2103 result.is_err(),
2104 "Expected error for {} operation without CamelContainerId",
2105 operation
2106 );
2107 let err = result.unwrap_err();
2108 match &err {
2109 CamelError::ProcessorError(msg) => {
2110 assert!(
2111 msg.contains(HEADER_CONTAINER_ID),
2112 "Error message should mention {}, got: {}",
2113 HEADER_CONTAINER_ID,
2114 msg
2115 );
2116 }
2117 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2118 }
2119 }
2120 }
2121
2122 #[tokio::test]
2124 async fn test_container_producer_stop_nonexistent() {
2125 let docker = match Docker::connect_with_local_defaults() {
2127 Ok(d) => d,
2128 Err(_) => {
2129 eprintln!("Skipping test: Could not connect to Docker daemon");
2130 return;
2131 }
2132 };
2133
2134 if docker.ping().await.is_err() {
2135 eprintln!("Skipping test: Docker daemon not responding to ping");
2136 return;
2137 }
2138
2139 let component = ContainerComponent::new();
2140 let ctx = NoOpComponentContext;
2141 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2142 let ctx = ProducerContext::new();
2143 let mut producer = endpoint.create_producer(&ctx).unwrap();
2144
2145 let mut exchange = Exchange::new(Message::new(""));
2146 exchange
2147 .input
2148 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2149 exchange.input.set_header(
2150 HEADER_CONTAINER_ID,
2151 serde_json::Value::String("nonexistent-container-123".into()),
2152 );
2153
2154 use tower::ServiceExt;
2155 let result = producer.ready().await.unwrap().call(exchange).await;
2156
2157 assert!(
2158 result.is_err(),
2159 "Expected error when stopping nonexistent container"
2160 );
2161 let err = result.unwrap_err();
2162 match &err {
2163 CamelError::ProcessorError(msg) => {
2164 assert!(
2166 msg.to_lowercase().contains("no such container")
2167 || msg.to_lowercase().contains("not found")
2168 || msg.contains("404"),
2169 "Error message should indicate container not found, got: {}",
2170 msg
2171 );
2172 }
2173 _ => panic!("Expected ProcessorError, got: {:?}", err),
2174 }
2175 }
2176
2177 #[tokio::test]
2179 async fn test_container_producer_run_missing_image() {
2180 let docker = match Docker::connect_with_local_defaults() {
2182 Ok(d) => d,
2183 Err(_) => {
2184 eprintln!("Skipping test: Could not connect to Docker daemon");
2185 return;
2186 }
2187 };
2188
2189 if docker.ping().await.is_err() {
2190 eprintln!("Skipping test: Docker daemon not responding to ping");
2191 return;
2192 }
2193
2194 let component = ContainerComponent::new();
2196 let ctx = NoOpComponentContext;
2197 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2198 let ctx = ProducerContext::new();
2199 let mut producer = endpoint.create_producer(&ctx).unwrap();
2200
2201 let mut exchange = Exchange::new(Message::new(""));
2202 exchange
2203 .input
2204 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2205 use tower::ServiceExt;
2208 let result = producer.ready().await.unwrap().call(exchange).await;
2209
2210 assert!(
2211 result.is_err(),
2212 "Expected error for run operation without image"
2213 );
2214 let err = result.unwrap_err();
2215 match &err {
2216 CamelError::ProcessorError(msg) => {
2217 assert!(
2218 msg.to_lowercase().contains("image"),
2219 "Error message should mention 'image', got: {}",
2220 msg
2221 );
2222 }
2223 _ => panic!("Expected ProcessorError, got: {:?}", err),
2224 }
2225 }
2226
2227 #[tokio::test]
2229 async fn test_container_producer_run_image_from_header() {
2230 let docker = match Docker::connect_with_local_defaults() {
2232 Ok(d) => d,
2233 Err(_) => {
2234 eprintln!("Skipping test: Could not connect to Docker daemon");
2235 return;
2236 }
2237 };
2238
2239 if docker.ping().await.is_err() {
2240 eprintln!("Skipping test: Docker daemon not responding to ping");
2241 return;
2242 }
2243
2244 let component = ContainerComponent::new();
2246 let ctx = NoOpComponentContext;
2247 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2248 let ctx = ProducerContext::new();
2249 let mut producer = endpoint.create_producer(&ctx).unwrap();
2250
2251 let mut exchange = Exchange::new(Message::new(""));
2252 exchange
2253 .input
2254 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2255 exchange.input.set_header(
2257 HEADER_IMAGE,
2258 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2259 );
2260
2261 use tower::ServiceExt;
2262 let result = producer.ready().await.unwrap().call(exchange).await;
2263
2264 assert!(
2266 result.is_err(),
2267 "Expected error when running container with nonexistent image"
2268 );
2269 let err = result.unwrap_err();
2270 match &err {
2271 CamelError::ProcessorError(msg) => {
2272 assert!(
2274 msg.to_lowercase().contains("no such image")
2275 || msg.to_lowercase().contains("not found")
2276 || msg.to_lowercase().contains("image")
2277 || msg.to_lowercase().contains("pull")
2278 || msg.contains("404"),
2279 "Error message should indicate image issue, got: {}",
2280 msg
2281 );
2282 }
2283 _ => panic!("Expected ProcessorError, got: {:?}", err),
2284 }
2285 }
2286
2287 #[tokio::test]
2290 async fn test_container_producer_run_alpine_container() {
2291 let docker = match Docker::connect_with_local_defaults() {
2292 Ok(d) => d,
2293 Err(_) => {
2294 eprintln!("Skipping test: Could not connect to Docker daemon");
2295 return;
2296 }
2297 };
2298
2299 if docker.ping().await.is_err() {
2300 eprintln!("Skipping test: Docker daemon not responding to ping");
2301 return;
2302 }
2303
2304 let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2306 let has_alpine = images
2307 .iter()
2308 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2309
2310 if !has_alpine {
2311 eprintln!("Pulling alpine:latest image...");
2312 let mut stream = docker.create_image(
2313 Some(CreateImageOptions {
2314 from_image: Some("alpine:latest".to_string()),
2315 ..Default::default()
2316 }),
2317 None,
2318 None,
2319 );
2320
2321 use futures::StreamExt;
2322 while let Some(_item) = stream.next().await {
2323 }
2325 eprintln!("Image pulled successfully");
2326 }
2327
2328 let component = ContainerComponent::new();
2330 let ctx = NoOpComponentContext;
2331 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2332 let ctx = ProducerContext::new();
2333 let mut producer = endpoint.create_producer(&ctx).unwrap();
2334
2335 let timestamp = std::time::SystemTime::now()
2337 .duration_since(std::time::UNIX_EPOCH)
2338 .unwrap()
2339 .as_millis();
2340 let container_name = format!("test-rust-camel-{}", timestamp);
2341 let mut exchange = Exchange::new(Message::new(""));
2342 exchange.input.set_header(
2343 HEADER_IMAGE,
2344 serde_json::Value::String("alpine:latest".into()),
2345 );
2346 exchange.input.set_header(
2347 HEADER_CONTAINER_NAME,
2348 serde_json::Value::String(container_name.clone()),
2349 );
2350
2351 use tower::ServiceExt;
2352 let result = producer
2353 .ready()
2354 .await
2355 .unwrap()
2356 .call(exchange)
2357 .await
2358 .expect("Container run should succeed");
2359
2360 let container_id = result
2362 .input
2363 .header(HEADER_CONTAINER_ID)
2364 .and_then(|v| v.as_str().map(|s| s.to_string()))
2365 .expect("Expected container ID header");
2366 assert!(!container_id.is_empty(), "Container ID should not be empty");
2367
2368 assert_eq!(
2370 result
2371 .input
2372 .header(HEADER_ACTION_RESULT)
2373 .and_then(|v| v.as_str()),
2374 Some("success")
2375 );
2376
2377 let inspect = docker
2379 .inspect_container(&container_id, None)
2380 .await
2381 .expect("Container should exist");
2382 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2383
2384 docker
2386 .remove_container(
2387 &container_id,
2388 Some(RemoveContainerOptions {
2389 force: true,
2390 ..Default::default()
2391 }),
2392 )
2393 .await
2394 .ok();
2395
2396 eprintln!("✅ Container {} created and cleaned up", container_id);
2397 }
2398
2399 #[tokio::test]
2401 async fn test_container_consumer_unsupported_operation() {
2402 use tokio::sync::mpsc;
2403
2404 let component = ContainerComponent::new();
2405 let ctx = NoOpComponentContext;
2406 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2407 let mut consumer = endpoint.create_consumer().unwrap();
2408
2409 let (tx, _rx) = mpsc::channel(16);
2411 let cancel_token = tokio_util::sync::CancellationToken::new();
2412 let context = ConsumerContext::new(tx, cancel_token);
2413
2414 let result = consumer.start(context).await;
2415
2416 assert!(
2418 result.is_err(),
2419 "Expected error for unsupported consumer operation"
2420 );
2421 let err = result.unwrap_err();
2422 match &err {
2423 CamelError::EndpointCreationFailed(msg) => {
2424 assert!(
2425 msg.contains("Consumer only supports 'events' or 'logs'"),
2426 "Error message should mention events or logs support, got: {}",
2427 msg
2428 );
2429 }
2430 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2431 }
2432 }
2433
2434 #[test]
2435 fn test_container_consumer_concurrency_model_is_concurrent() {
2436 let consumer = ContainerConsumer {
2437 config: ContainerConfig::from_uri("container:events").unwrap(),
2438 };
2439
2440 assert_eq!(
2441 consumer.concurrency_model(),
2442 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2443 );
2444 }
2445
2446 #[tokio::test]
2450 async fn test_container_consumer_cancellation() {
2451 use std::sync::atomic::{AtomicBool, Ordering};
2452 use tokio::sync::mpsc;
2453
2454 let docker = match Docker::connect_with_local_defaults() {
2456 Ok(d) => d,
2457 Err(_) => {
2458 eprintln!("Skipping test: Could not connect to Docker daemon");
2459 return;
2460 }
2461 };
2462
2463 if docker.ping().await.is_err() {
2464 eprintln!("Skipping test: Docker daemon not responding to ping");
2465 return;
2466 }
2467
2468 let component = ContainerComponent::new();
2469 let ctx = NoOpComponentContext;
2470 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2471 let mut consumer = endpoint.create_consumer().unwrap();
2472
2473 let (tx, _rx) = mpsc::channel(16);
2475 let cancel_token = tokio_util::sync::CancellationToken::new();
2476 let context = ConsumerContext::new(tx, cancel_token.clone());
2477
2478 let completed = Arc::new(AtomicBool::new(false));
2480 let completed_clone = completed.clone();
2481
2482 let handle = tokio::spawn(async move {
2484 let result = consumer.start(context).await;
2485 completed_clone.store(true, Ordering::SeqCst);
2487 result
2488 });
2489
2490 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2492
2493 assert!(
2495 !completed.load(Ordering::SeqCst),
2496 "Consumer should still be running before cancellation"
2497 );
2498
2499 cancel_token.cancel();
2501
2502 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2504
2505 assert!(
2507 result.is_ok(),
2508 "Consumer should gracefully shut down after cancellation"
2509 );
2510
2511 assert!(
2513 completed.load(Ordering::SeqCst),
2514 "Consumer should have completed after cancellation"
2515 );
2516 }
2517
2518 #[tokio::test]
2522 async fn test_container_producer_list_containers() {
2523 let docker = match Docker::connect_with_local_defaults() {
2526 Ok(d) => d,
2527 Err(_) => {
2528 eprintln!("Skipping test: Could not connect to Docker daemon");
2529 return;
2530 }
2531 };
2532
2533 if docker.ping().await.is_err() {
2534 eprintln!("Skipping test: Docker daemon not responding to ping");
2535 return;
2536 }
2537
2538 let component = ContainerComponent::new();
2540 let ctx = NoOpComponentContext;
2541 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2542
2543 let ctx = ProducerContext::new();
2544 let mut producer = endpoint.create_producer(&ctx).unwrap();
2545
2546 let mut exchange = Exchange::new(Message::new(""));
2548 exchange
2549 .input
2550 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2551
2552 use tower::ServiceExt;
2554 let result = producer
2555 .ready()
2556 .await
2557 .unwrap()
2558 .call(exchange)
2559 .await
2560 .expect("Producer should succeed when Docker is available");
2561
2562 match &result.input.body {
2565 camel_component_api::Body::Json(json_value) => {
2566 assert!(
2567 json_value.is_array(),
2568 "Expected input body to be a JSON array, got: {:?}",
2569 json_value
2570 );
2571 }
2572 other => panic!("Expected Body::Json with array, got: {:?}", other),
2573 }
2574 }
2575
2576 #[test]
2577 fn test_container_config_parses_volumes() {
2578 let config = ContainerConfig::from_uri(
2579 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2580 )
2581 .unwrap();
2582 assert_eq!(
2583 config.volumes.as_deref(),
2584 Some("./html:/usr/share/nginx/html:ro")
2585 );
2586 }
2587
2588 #[test]
2589 fn test_container_config_parses_exec_params() {
2590 let config = ContainerConfig::from_uri(
2591 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2592 )
2593 .unwrap();
2594 assert_eq!(config.operation, "exec");
2595 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2596 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2597 assert_eq!(config.user.as_deref(), Some("root"));
2598 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2599 assert!(config.detach);
2600 }
2601
2602 #[test]
2603 fn test_container_config_parses_network_create_params() {
2604 let config =
2605 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2606 .unwrap();
2607 assert_eq!(config.operation, "network-create");
2608 assert_eq!(config.name.as_deref(), Some("my-net"));
2609 assert_eq!(config.driver.as_deref(), Some("bridge"));
2610 }
2611
2612 #[test]
2613 fn test_container_config_defaults_new_fields() {
2614 let config = ContainerConfig::from_uri("container:list").unwrap();
2615 assert!(config.volumes.is_none());
2616 assert!(config.user.is_none());
2617 assert!(config.workdir.is_none());
2618 assert!(!config.detach);
2619 assert!(config.driver.is_none());
2620 assert!(!config.force);
2621 }
2622
2623 #[test]
2624 fn test_parse_volumes_bind_mount() {
2625 let config = ContainerConfig::from_uri(
2626 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2627 )
2628 .unwrap();
2629 let (binds, anon) = config.parse_volumes().unwrap();
2630 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2631 assert!(anon.is_empty());
2632 }
2633
2634 #[test]
2635 fn test_parse_volumes_named_volume() {
2636 let config =
2637 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2638 .unwrap();
2639 let (binds, anon) = config.parse_volumes().unwrap();
2640 assert_eq!(binds, vec!["data:/var/lib/data"]);
2641 assert!(anon.is_empty());
2642 }
2643
2644 #[test]
2645 fn test_parse_volumes_anonymous() {
2646 let config =
2647 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2648 let (binds, anon) = config.parse_volumes().unwrap();
2649 assert!(binds.is_empty());
2650 assert!(anon.contains(&"/tmp/app-data".to_string()));
2651 }
2652
2653 #[test]
2654 fn test_parse_volumes_anonymous_with_mode() {
2655 let config =
2656 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2657 .unwrap();
2658 let (binds, anon) = config.parse_volumes().unwrap();
2659 assert!(binds.is_empty());
2660 assert!(anon.contains(&"/tmp/app-data".to_string()));
2661 }
2662
2663 #[test]
2664 fn test_parse_volumes_multiple() {
2665 let config = ContainerConfig::from_uri(
2666 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2667 )
2668 .unwrap();
2669 let (binds, anon) = config.parse_volumes().unwrap();
2670 assert_eq!(binds.len(), 2);
2671 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2672 assert!(binds.contains(&"data:/var/log/app".to_string()));
2673 assert!(anon.is_empty());
2674 }
2675
2676 #[test]
2677 fn test_parse_volumes_mixed() {
2678 let config = ContainerConfig::from_uri(
2679 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2680 )
2681 .unwrap();
2682 let (binds, anon) = config.parse_volumes().unwrap();
2683 assert_eq!(binds.len(), 1);
2684 assert!(anon.contains(&"/tmp/cache".to_string()));
2685 }
2686
2687 #[test]
2688 fn test_parse_volumes_none() {
2689 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2690 assert!(config.parse_volumes().is_none());
2691 }
2692
2693 #[test]
2694 fn test_parse_volumes_empty_entry_skipped() {
2695 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2696 assert!(config.parse_volumes().is_none());
2697 }
2698
2699 #[test]
2700 fn test_parse_volumes_rw_mode() {
2701 let config =
2702 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2703 .unwrap();
2704 let (binds, _) = config.parse_volumes().unwrap();
2705 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2706 }
2707
2708 #[test]
2709 fn test_container_config_from_uri_parses_false_flags() {
2710 let config = ContainerConfig::from_uri(
2711 "container:logs?containerId=a&follow=false×tamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2712 )
2713 .unwrap();
2714 assert!(!config.follow);
2715 assert!(!config.timestamps);
2716 assert!(!config.auto_pull);
2717 assert!(!config.auto_remove);
2718 assert!(config.detach);
2719 assert!(config.force);
2720 }
2721
2722 #[test]
2723 fn test_docker_socket_path_validation() {
2724 let unix_cfg =
2725 ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2726 assert_eq!(
2727 unix_cfg.docker_socket_path().unwrap(),
2728 "unix:///tmp/docker.sock"
2729 );
2730
2731 let npipe_cfg =
2732 ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2733 .unwrap();
2734 assert_eq!(
2735 npipe_cfg.docker_socket_path().unwrap(),
2736 "npipe:////./pipe/docker_engine"
2737 );
2738
2739 let plain_cfg =
2740 ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2741 assert_eq!(
2742 plain_cfg.docker_socket_path().unwrap(),
2743 "/var/run/docker.sock"
2744 );
2745
2746 let bad_cfg =
2747 ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2748 assert!(bad_cfg.docker_socket_path().is_err());
2749 }
2750
2751 #[test]
2752 fn test_parse_ports_invalid_and_whitespace_entries() {
2753 let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2755 let err = cfg.parse_ports().unwrap_err();
2756 assert!(
2757 err.to_string().contains("malformed port mapping"),
2758 "expected malformed port error, got: {}",
2759 err
2760 );
2761
2762 let cfg =
2763 ContainerConfig::from_uri("container:run?ports= 8080:80 , 5353:53/udp ").unwrap();
2764 let (exposed, bindings) = cfg.parse_ports().unwrap();
2765 assert!(exposed.contains(&"80/tcp".to_string()));
2766 assert!(exposed.contains(&"53/udp".to_string()));
2767 assert_eq!(bindings.len(), 2);
2768 }
2769
2770 #[test]
2771 fn test_parse_ports_fail_fast_on_malformed() {
2772 let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2774 let err = cfg.parse_ports().unwrap_err();
2775 assert!(
2776 err.to_string()
2777 .contains("malformed port mapping 'badentry'"),
2778 "expected fail-fast on 'badentry', got: {}",
2779 err
2780 );
2781 }
2782
2783 #[test]
2784 fn test_parse_env_trims_and_filters_empty_items() {
2785 let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2786 let env = cfg.parse_env().unwrap();
2787 assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2788
2789 let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2790 assert!(cfg.parse_env().is_none());
2791 }
2792
2793 #[test]
2794 fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2795 assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2796 let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2797 assert!(binds.contains(&"a:/b:rw".to_string()));
2798 assert!(anon.contains(&"/tmp/cache".to_string()));
2799 assert!(anon.contains(&"/tmp/logs".to_string()));
2800 }
2801
2802 #[test]
2803 fn test_format_docker_event_variants_and_timestamp_extraction() {
2804 let mut attrs = std::collections::HashMap::new();
2805 attrs.insert("name".to_string(), "demo".to_string());
2806 attrs.insert("image".to_string(), "alpine:latest".to_string());
2807 attrs.insert("exitCode".to_string(), "137".to_string());
2808 let actor = bollard::models::EventActor {
2809 id: None,
2810 attributes: Some(attrs),
2811 };
2812
2813 let create_event = bollard::models::EventMessage {
2814 action: Some("create".to_string()),
2815 actor: Some(actor.clone()),
2816 ..Default::default()
2817 };
2818 assert_eq!(
2819 format_docker_event(&create_event),
2820 "[CREATE] Container demo (alpine:latest)"
2821 );
2822
2823 let die_event = bollard::models::EventMessage {
2824 action: Some("die".to_string()),
2825 actor: Some(actor),
2826 ..Default::default()
2827 };
2828 assert_eq!(
2829 format_docker_event(&die_event),
2830 "[DIE] Container demo (exit: 137)"
2831 );
2832
2833 let other_event = bollard::models::EventMessage {
2834 action: Some("oom".to_string()),
2835 actor: None,
2836 ..Default::default()
2837 };
2838 assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2839
2840 assert_eq!(
2841 extract_timestamp("2024-01-01T00:00:00Z hello"),
2842 Some("2024-01-01T00:00:00Z".to_string())
2843 );
2844 assert_eq!(extract_timestamp("hello world"), None);
2845 }
2846
2847 #[tokio::test]
2848 async fn test_run_container_with_cleanup_error_paths() {
2849 let create_fail = run_container_with_cleanup(
2850 || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2851 |_id| async move { Ok(()) },
2852 |_id| async move { Ok(()) },
2853 )
2854 .await;
2855 assert!(
2856 matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2857 );
2858
2859 let cleanup_fail = run_container_with_cleanup(
2860 || async { Ok("cid-1".to_string()) },
2861 |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2862 |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2863 )
2864 .await;
2865 match cleanup_fail {
2866 Err(CamelError::ProcessorError(msg)) => {
2867 assert!(msg.contains("Failed to start container"));
2868 assert!(msg.contains("Cleanup failed"));
2869 }
2870 other => panic!("unexpected result: {:?}", other),
2871 }
2872 }
2873
2874 #[tokio::test]
2875 async fn test_container_producer_network_lifecycle() {
2876 let docker = match Docker::connect_with_local_defaults() {
2877 Ok(d) => d,
2878 Err(_) => {
2879 eprintln!("Skipping test: Could not connect to Docker daemon");
2880 return;
2881 }
2882 };
2883 if docker.ping().await.is_err() {
2884 eprintln!("Skipping test: Docker daemon not responding to ping");
2885 return;
2886 }
2887
2888 let network_name = format!("camel-test-{}", std::process::id());
2889
2890 let component = ContainerComponent::new();
2891 let component_ctx = NoOpComponentContext;
2892
2893 let endpoint = component
2895 .create_endpoint(
2896 &format!("container:network-create?name={}", network_name),
2897 &component_ctx,
2898 )
2899 .unwrap();
2900 let ctx = ProducerContext::new();
2901 let mut producer = endpoint.create_producer(&ctx).unwrap();
2902
2903 let mut exchange = Exchange::new(Message::new(""));
2904 exchange.input.set_header(
2905 HEADER_ACTION,
2906 serde_json::Value::String("network-create".into()),
2907 );
2908
2909 use tower::ServiceExt;
2910 let result = producer
2911 .ready()
2912 .await
2913 .unwrap()
2914 .call(exchange)
2915 .await
2916 .expect("Network create should succeed");
2917
2918 let network_id = result
2919 .input
2920 .header(HEADER_NETWORK)
2921 .and_then(|v| v.as_str().map(|s| s.to_string()))
2922 .expect("Should have network ID");
2923
2924 assert!(!network_id.is_empty());
2925
2926 let endpoint = component
2928 .create_endpoint("container:network-list", &component_ctx)
2929 .unwrap();
2930 let mut producer = endpoint.create_producer(&ctx).unwrap();
2931
2932 let mut exchange = Exchange::new(Message::new(""));
2933 exchange.input.set_header(
2934 HEADER_ACTION,
2935 serde_json::Value::String("network-list".into()),
2936 );
2937
2938 let list_result = producer
2939 .ready()
2940 .await
2941 .unwrap()
2942 .call(exchange)
2943 .await
2944 .expect("Network list should succeed");
2945
2946 match &list_result.input.body {
2947 Body::Json(json_value) => {
2948 assert!(json_value.is_array(), "Expected JSON array");
2949 }
2950 other => panic!("Expected Body::Json, got: {:?}", other),
2951 }
2952
2953 let endpoint = component
2955 .create_endpoint(
2956 &format!("container:network-remove?network={}", network_name),
2957 &component_ctx,
2958 )
2959 .unwrap();
2960 let mut producer = endpoint.create_producer(&ctx).unwrap();
2961
2962 let mut exchange = Exchange::new(Message::new(""));
2963 exchange.input.set_header(
2964 HEADER_ACTION,
2965 serde_json::Value::String("network-remove".into()),
2966 );
2967
2968 let remove_result = producer
2969 .ready()
2970 .await
2971 .unwrap()
2972 .call(exchange)
2973 .await
2974 .expect("Network remove should succeed");
2975
2976 let action_result = remove_result
2977 .input
2978 .header(HEADER_ACTION_RESULT)
2979 .and_then(|v| v.as_str());
2980 assert_eq!(action_result, Some("success"));
2981 }
2982
2983 #[tokio::test]
2984 async fn test_logs_consumer_requires_container_id() {
2985 use tokio::sync::mpsc;
2986
2987 let mut consumer = ContainerConsumer {
2988 config: ContainerConfig::from_uri("container:logs").unwrap(),
2989 };
2990 let (tx, _rx) = mpsc::channel(4);
2991 let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
2992
2993 let err = consumer.start(context).await.unwrap_err();
2994 match err {
2995 CamelError::EndpointCreationFailed(msg) => {
2996 assert!(msg.contains("containerId is required for logs consumer"));
2997 }
2998 other => panic!("unexpected error: {:?}", other),
2999 }
3000 }
3001
3002 #[tokio::test]
3003 async fn test_events_consumer_stops_immediately_when_cancelled() {
3004 use tokio::sync::mpsc;
3005
3006 let mut consumer = ContainerConsumer {
3007 config: ContainerConfig::from_uri("container:events").unwrap(),
3008 };
3009
3010 let (tx, _rx) = mpsc::channel(4);
3011 let cancel = tokio_util::sync::CancellationToken::new();
3012 cancel.cancel();
3013 let context = ConsumerContext::new(tx, cancel);
3014
3015 let result = consumer.start(context).await;
3016 assert!(result.is_ok());
3017 }
3018
3019 #[test]
3020 fn test_global_config_constructors_and_endpoint_docker_host() {
3021 let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3022 let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3023 cfg.apply_global_defaults(&global);
3024
3025 let endpoint = ContainerEndpoint {
3026 uri: "container:list".to_string(),
3027 config: cfg,
3028 };
3029
3030 assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3031
3032 let component = ContainerComponent::with_config(global);
3033 assert_eq!(component.scheme(), "container");
3034 }
3035}