1pub mod bundle;
7pub mod health;
8
9pub use bundle::ContainerBundle;
10pub use health::ContainerHealthCheck;
11
12use std::collections::{HashMap, HashSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex};
16use std::task::{Context, Poll};
17
18use async_trait::async_trait;
19use bollard::Docker;
20use bollard::models::{
21 ContainerCreateBody, NetworkConnectRequest, NetworkCreateRequest, NetworkDisconnectRequest,
22};
23use bollard::query_parameters::{
24 CreateContainerOptions, CreateImageOptions, EventsOptions, ListContainersOptions,
25 ListImagesOptions, ListNetworksOptions, LogsOptions, RemoveContainerOptions,
26 StartContainerOptions,
27};
28use bollard::service::{HostConfig, PortBinding};
29use camel_component_api::parse_uri;
30use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
31use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
32use tower::Service;
33
34static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
37 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
38
39fn track_container(id: String) {
41 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
42 tracker.insert(id);
43 }
44}
45
46fn untrack_container(id: &str) {
48 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
49 tracker.remove(id);
50 }
51}
52
53pub async fn cleanup_tracked_containers() {
55 let ids: Vec<String> = {
56 match CONTAINER_TRACKER.lock() {
57 Ok(tracker) => tracker.iter().cloned().collect(),
58 Err(_) => return,
59 }
60 };
61
62 if ids.is_empty() {
63 return;
64 }
65
66 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
67
68 let docker = match Docker::connect_with_local_defaults() {
69 Ok(d) => d,
70 Err(e) => {
71 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
72 return;
73 }
74 };
75
76 for id in ids {
77 match docker
78 .remove_container(
79 &id,
80 Some(RemoveContainerOptions {
81 force: true,
82 ..Default::default()
83 }),
84 )
85 .await
86 {
87 Ok(_) => {
88 tracing::debug!("Cleaned up container {}", id);
89 untrack_container(&id);
90 }
91 Err(e) => {
92 tracing::warn!("Failed to cleanup container {}: {}", id, e);
93 }
94 }
95 }
96}
97
98const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
102
103pub const HEADER_ACTION: &str = "CamelContainerAction";
105
106pub const HEADER_IMAGE: &str = "CamelContainerImage";
108
109pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
111
112pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
114
115pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
117
118pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
120
121pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
123
124pub const HEADER_CMD: &str = "CamelContainerCmd";
126
127pub const HEADER_NETWORK: &str = "CamelContainerNetwork";
129
130pub const HEADER_EXIT_CODE: &str = "CamelContainerExitCode";
132
133pub const HEADER_VOLUMES: &str = "CamelContainerVolumes";
135
136pub const HEADER_EXEC_ID: &str = "CamelContainerExecId";
138
139#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
147#[serde(default)]
148pub struct ContainerGlobalConfig {
149 pub docker_host: String,
151}
152
153impl Default for ContainerGlobalConfig {
154 fn default() -> Self {
155 Self {
156 docker_host: "unix:///var/run/docker.sock".to_string(),
157 }
158 }
159}
160
161impl ContainerGlobalConfig {
162 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
167 self.docker_host = v.into();
168 self
169 }
170}
171
172#[derive(Debug, Clone)]
181pub struct ContainerConfig {
182 pub operation: String,
184 pub image: Option<String>,
186 pub name: Option<String>,
188 pub host: Option<String>,
190 pub cmd: Option<String>,
192 pub ports: Option<String>,
194 pub env: Option<String>,
196 pub network: Option<String>,
198 pub container_id: Option<String>,
200 pub follow: bool,
202 pub timestamps: bool,
204 pub tail: Option<String>,
206 pub auto_pull: bool,
208 pub auto_remove: bool,
210 pub volumes: Option<String>,
212 pub user: Option<String>,
214 pub workdir: Option<String>,
216 pub detach: bool,
218 pub driver: Option<String>,
220 pub force: bool,
222}
223
224impl ContainerConfig {
225 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
233 let parts = parse_uri(uri)?;
234 if parts.scheme != "container" {
235 return Err(CamelError::InvalidUri(format!(
236 "expected scheme 'container', got '{}'",
237 parts.scheme
238 )));
239 }
240
241 let image = parts.params.get("image").cloned();
242 let name = parts.params.get("name").cloned();
243 let cmd = parts.params.get("cmd").cloned();
244 let ports = parts.params.get("ports").cloned();
245 let env = parts.params.get("env").cloned();
246 let network = parts.params.get("network").cloned();
247 let container_id = parts.params.get("containerId").cloned();
248 let follow = parts
249 .params
250 .get("follow")
251 .map(|v| v.eq_ignore_ascii_case("true"))
252 .unwrap_or(true);
253 let timestamps = parts
254 .params
255 .get("timestamps")
256 .map(|v| v.eq_ignore_ascii_case("true"))
257 .unwrap_or(false);
258 let tail = parts.params.get("tail").cloned();
259 let auto_pull = parts
260 .params
261 .get("autoPull")
262 .map(|v| v.eq_ignore_ascii_case("true"))
263 .unwrap_or(true);
264 let auto_remove = parts
265 .params
266 .get("autoRemove")
267 .map(|v| v.eq_ignore_ascii_case("true"))
268 .unwrap_or(true);
269 let host = parts.params.get("host").cloned();
271 let volumes = parts.params.get("volumes").cloned();
272 let user = parts.params.get("user").cloned();
273 let workdir = parts.params.get("workdir").cloned();
274 let detach = parts
275 .params
276 .get("detach")
277 .map(|v| v.eq_ignore_ascii_case("true"))
278 .unwrap_or(false);
279 let driver = parts.params.get("driver").cloned();
280 let force = parts
281 .params
282 .get("force")
283 .map(|v| v.eq_ignore_ascii_case("true"))
284 .unwrap_or(false);
285
286 Ok(Self {
287 operation: parts.path,
288 image,
289 name,
290 host,
291 cmd,
292 ports,
293 env,
294 network,
295 container_id,
296 follow,
297 timestamps,
298 tail,
299 auto_pull,
300 auto_remove,
301 volumes,
302 user,
303 workdir,
304 detach,
305 driver,
306 force,
307 })
308 }
309
310 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
313 if self.host.is_none() {
314 self.host = Some(global.docker_host.clone());
315 }
316 }
317
318 fn docker_socket_path(&self) -> Result<&str, CamelError> {
319 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
320 "npipe:////./pipe/docker_engine"
321 } else {
322 "unix:///var/run/docker.sock"
323 });
324
325 if host.starts_with("unix://") || host.starts_with("npipe://") {
326 return Ok(host);
327 }
328
329 if host.contains("://") {
330 return Err(CamelError::ProcessorError(format!(
331 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
332 host
333 )));
334 }
335
336 Ok(host)
337 }
338
339 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
340 let socket_path = self.docker_socket_path()?;
341 Docker::connect_with_socket(
342 socket_path,
343 DOCKER_CONNECT_TIMEOUT_SECS,
344 bollard::API_DEFAULT_VERSION,
345 )
346 .map_err(|e| {
347 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
348 })
349 }
350
351 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
359 let docker = self.connect_docker_client()?;
360 docker
361 .ping()
362 .await
363 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
364 Ok(docker)
365 }
366
367 #[allow(clippy::type_complexity)]
368 fn parse_ports(
369 &self,
370 ) -> Result<(Vec<String>, HashMap<String, Option<Vec<PortBinding>>>), CamelError> {
371 let ports_str = match self.ports.as_ref() {
372 Some(s) => s,
373 None => return Ok((Vec::new(), HashMap::new())),
374 };
375
376 let mut exposed_ports: Vec<String> = Vec::new();
377 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
378
379 for mapping in ports_str.split(',') {
380 let mapping = mapping.trim();
381 if mapping.is_empty() {
382 continue;
383 }
384
385 let (host_port, container_spec) = mapping.split_once(':').ok_or_else(|| {
386 CamelError::ProcessorError(format!(
387 "malformed port mapping '{}': expected hostPort:containerPort",
388 mapping
389 ))
390 })?;
391
392 let (container_port, protocol) = if container_spec.contains('/') {
393 let parts: Vec<&str> = container_spec.split('/').collect();
394 (parts[0], parts[1])
395 } else {
396 (container_spec, "tcp")
397 };
398
399 let container_key = format!("{}/{}", container_port, protocol);
400
401 exposed_ports.push(container_key.clone());
402
403 port_bindings.insert(
404 container_key,
405 Some(vec![PortBinding {
406 host_ip: None,
407 host_port: Some(host_port.to_string()),
408 }]),
409 );
410 }
411
412 Ok((exposed_ports, port_bindings))
413 }
414
415 fn parse_env(&self) -> Option<Vec<String>> {
416 let env_str = self.env.as_ref()?;
417
418 let env_vars: Vec<String> = env_str
419 .split(',')
420 .map(|s| s.trim().to_string())
421 .filter(|s| !s.is_empty())
422 .collect();
423
424 if env_vars.is_empty() {
425 None
426 } else {
427 Some(env_vars)
428 }
429 }
430
431 #[cfg(test)]
432 #[allow(clippy::type_complexity)]
433 fn parse_volumes(&self) -> Option<(Vec<String>, Vec<String>)> {
434 self.volumes.as_deref().and_then(parse_volume_str)
435 }
436}
437
438#[allow(clippy::type_complexity)]
443fn parse_volume_str(volumes_str: &str) -> Option<(Vec<String>, Vec<String>)> {
444 let mut binds: Vec<String> = Vec::new();
445 let mut anonymous_volumes: Vec<String> = Vec::new();
446
447 for entry in volumes_str.split(',') {
448 let entry = entry.trim();
449 if entry.is_empty() {
450 continue;
451 }
452
453 let segments: Vec<&str> = entry.split(':').collect();
454
455 match segments.len() {
456 3 => {
457 let source = segments[0];
458 let target = segments[1];
459 let mode = segments[2];
460 if mode != "ro" && mode != "rw" {
461 continue;
462 }
463 binds.push(format!("{}:{}:{}", source, target, mode));
464 }
465 2 => {
466 let a = segments[0];
467 let b = segments[1];
468 if b == "ro" || b == "rw" {
469 anonymous_volumes.push(a.to_string());
470 } else {
471 binds.push(format!("{}:{}", a, b));
472 }
473 }
474 1 => {
475 anonymous_volumes.push(segments[0].to_string());
476 }
477 _ => continue,
478 }
479 }
480
481 if binds.is_empty() && anonymous_volumes.is_empty() {
482 None
483 } else {
484 Some((binds, anonymous_volumes))
485 }
486}
487
488#[derive(Debug, Clone, Copy, PartialEq, Eq)]
489enum ProducerOperation {
490 List,
491 Run,
492 Start,
493 Stop,
494 Remove,
495 Exec,
496 NetworkCreate,
497 NetworkConnect,
498 NetworkDisconnect,
499 NetworkRemove,
500 NetworkList,
501}
502
503fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
504 match operation {
505 "list" => Ok(ProducerOperation::List),
506 "run" => Ok(ProducerOperation::Run),
507 "start" => Ok(ProducerOperation::Start),
508 "stop" => Ok(ProducerOperation::Stop),
509 "remove" => Ok(ProducerOperation::Remove),
510 "exec" => Ok(ProducerOperation::Exec),
511 "network-create" => Ok(ProducerOperation::NetworkCreate),
512 "network-connect" => Ok(ProducerOperation::NetworkConnect),
513 "network-disconnect" => Ok(ProducerOperation::NetworkDisconnect),
514 "network-remove" => Ok(ProducerOperation::NetworkRemove),
515 "network-list" => Ok(ProducerOperation::NetworkList),
516 _ => Err(CamelError::ProcessorError(format!(
517 "Unknown container operation: {}",
518 operation
519 ))),
520 }
521}
522
523fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
524 exchange
525 .input
526 .header(HEADER_CONTAINER_NAME)
527 .and_then(|v| v.as_str().map(|s| s.to_string()))
528 .or_else(|| config.name.clone())
529}
530
531async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
532 let images = docker
533 .list_images(None::<ListImagesOptions>)
534 .await
535 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
536
537 Ok(images.iter().any(|img| {
538 img.repo_tags
539 .iter()
540 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
541 }))
542}
543
544async fn pull_image_with_progress(
545 docker: &Docker,
546 image: &str,
547 timeout_secs: u64,
548) -> Result<(), CamelError> {
549 use futures::StreamExt;
550
551 tracing::info!("Pulling image: {}", image);
552
553 let mut stream = docker.create_image(
554 Some(CreateImageOptions {
555 from_image: Some(image.to_string()),
556 ..Default::default()
557 }),
558 None,
559 None,
560 );
561
562 let start = std::time::Instant::now();
563 let mut last_progress = std::time::Instant::now();
564
565 while let Some(item) = stream.next().await {
566 if start.elapsed().as_secs() > timeout_secs {
567 return Err(CamelError::ProcessorError(format!(
568 "Image pull timeout after {}s. Try manually: docker pull {}",
569 timeout_secs, image
570 )));
571 }
572
573 match item {
574 Ok(update) => {
575 if last_progress.elapsed().as_secs() >= 2 {
577 if let Some(status) = update.status {
578 tracing::debug!("Pull progress: {}", status);
579 }
580 last_progress = std::time::Instant::now();
581 }
582 }
583 Err(e) => {
584 let err_str = e.to_string().to_lowercase();
585 if err_str.contains("unauthorized") || err_str.contains("401") {
586 return Err(CamelError::ProcessorError(format!(
587 "Authentication required for image '{}'. Configure Docker credentials: docker login",
588 image
589 )));
590 }
591 if err_str.contains("not found") || err_str.contains("404") {
592 return Err(CamelError::ProcessorError(format!(
593 "Image '{}' not found in registry. Check the image name and tag",
594 image
595 )));
596 }
597 return Err(CamelError::ProcessorError(format!(
598 "Failed to pull image '{}': {}",
599 image, e
600 )));
601 }
602 }
603 }
604
605 tracing::info!("Successfully pulled image: {}", image);
606 Ok(())
607}
608
609async fn ensure_image_available(
610 docker: &Docker,
611 image: &str,
612 auto_pull: bool,
613 timeout_secs: u64,
614) -> Result<(), CamelError> {
615 if image_exists_locally(docker, image).await? {
616 tracing::debug!("Image '{}' already available locally", image);
617 return Ok(());
618 }
619
620 if !auto_pull {
621 return Err(CamelError::ProcessorError(format!(
622 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
623 image, image
624 )));
625 }
626
627 pull_image_with_progress(docker, image, timeout_secs).await
628}
629
630fn format_docker_event(event: &bollard::models::EventMessage) -> String {
631 let action = event.action.as_deref().unwrap_or("unknown");
632 let actor = event.actor.as_ref();
633
634 let container_name = actor
635 .and_then(|a| a.attributes.as_ref())
636 .and_then(|attrs| attrs.get("name"))
637 .map(|s| s.as_str())
638 .unwrap_or("unknown");
639
640 let image = actor
641 .and_then(|a| a.attributes.as_ref())
642 .and_then(|attrs| attrs.get("image"))
643 .map(|s| s.as_str())
644 .unwrap_or("");
645
646 let exit_code = actor
647 .and_then(|a| a.attributes.as_ref())
648 .and_then(|attrs| attrs.get("exitCode"))
649 .map(|s| s.as_str());
650
651 match action {
652 "create" => {
653 if image.is_empty() {
654 format!("[CREATE] Container {}", container_name)
655 } else {
656 format!("[CREATE] Container {} ({})", container_name, image)
657 }
658 }
659 "start" => format!("[START] Container {}", container_name),
660 "die" => {
661 if let Some(code) = exit_code {
662 format!("[DIE] Container {} (exit: {})", container_name, code)
663 } else {
664 format!("[DIE] Container {}", container_name)
665 }
666 }
667 "destroy" => format!("[DESTROY] Container {}", container_name),
668 "stop" => format!("[STOP] Container {}", container_name),
669 "pause" => format!("[PAUSE] Container {}", container_name),
670 "unpause" => format!("[UNPAUSE] Container {}", container_name),
671 "restart" => format!("[RESTART] Container {}", container_name),
672 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
673 }
674}
675
676async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
677 create: CreateFn,
678 start: StartFn,
679 remove: RemoveFn,
680) -> Result<String, CamelError>
681where
682 CreateFn: FnOnce() -> CreateFut,
683 CreateFut: Future<Output = Result<String, CamelError>>,
684 StartFn: FnOnce(String) -> StartFut,
685 StartFut: Future<Output = Result<(), CamelError>>,
686 RemoveFn: FnOnce(String) -> RemoveFut,
687 RemoveFut: Future<Output = Result<(), CamelError>>,
688{
689 let container_id = create().await?;
690 if let Err(start_err) = start(container_id.clone()).await {
691 if let Err(remove_err) = remove(container_id.clone()).await {
692 return Err(CamelError::ProcessorError(format!(
693 "Failed to start container: {}. Cleanup failed: {}",
694 start_err, remove_err
695 )));
696 }
697 return Err(start_err);
698 }
699
700 Ok(container_id)
701}
702
703async fn handle_list(
704 docker: Docker,
705 _config: ContainerConfig,
706 exchange: &mut Exchange,
707) -> Result<(), CamelError> {
708 let containers = docker
709 .list_containers(None::<ListContainersOptions>)
710 .await
711 .map_err(|e| CamelError::ProcessorError(format!("Failed to list containers: {}", e)))?;
712
713 let json_value = serde_json::to_value(&containers).map_err(|e| {
714 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
715 })?;
716
717 exchange.input.body = Body::Json(json_value);
718 exchange.input.set_header(
719 HEADER_ACTION_RESULT,
720 serde_json::Value::String("success".to_string()),
721 );
722 Ok(())
723}
724
725async fn handle_run(
726 docker: Docker,
727 config: ContainerConfig,
728 exchange: &mut Exchange,
729) -> Result<(), CamelError> {
730 let image = exchange
731 .input
732 .header(HEADER_IMAGE)
733 .and_then(|v| v.as_str().map(|s| s.to_string()))
734 .or(config.image.clone())
735 .ok_or_else(|| {
736 CamelError::ProcessorError(
737 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
738 )
739 })?;
740
741 let image = if !image.contains(':') && !image.contains('@') {
742 format!("{}:latest", image)
743 } else {
744 image
745 };
746
747 let pull_timeout = 300;
748 ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
749 .await
750 .map_err(|e| {
751 CamelError::ProcessorError(format!("Image '{}' not available: {}", image, e))
752 })?;
753
754 let container_name = resolve_container_name(exchange, &config);
755 let container_name_ref = container_name.as_deref().unwrap_or("");
756 let cmd_parts: Option<Vec<String>> = config
757 .cmd
758 .as_ref()
759 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
760 let auto_remove = config.auto_remove;
761 let (exposed_ports, port_bindings) = config.parse_ports()?;
762 let env_vars = config.parse_env();
763 let network_mode = config.network.clone();
764
765 let volumes_str = exchange
766 .input
767 .header(HEADER_VOLUMES)
768 .and_then(|v| v.as_str().map(|s| s.to_string()))
769 .or(config.volumes.clone());
770 let (binds, anon_volumes) = volumes_str
771 .as_deref()
772 .and_then(parse_volume_str)
773 .unwrap_or_default();
774
775 let docker_create = docker.clone();
776 let docker_start = docker.clone();
777 let docker_remove = docker.clone();
778
779 let container_id = run_container_with_cleanup(
780 move || async move {
781 let create_options = CreateContainerOptions {
782 name: Some(container_name_ref.to_string()),
783 ..Default::default()
784 };
785 let container_config = ContainerCreateBody {
786 image: Some(image.clone()),
787 cmd: cmd_parts,
788 env: env_vars,
789 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
790 volumes: if anon_volumes.is_empty() { None } else { Some(anon_volumes) },
791 host_config: Some(HostConfig {
792 auto_remove: Some(auto_remove),
793 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
794 network_mode,
795 binds: if binds.is_empty() { None } else { Some(binds) },
796 ..Default::default()
797 }),
798 ..Default::default()
799 };
800
801 let create_response = docker_create
802 .create_container(Some(create_options), container_config)
803 .await
804 .map_err(|e| {
805 let err_str = e.to_string().to_lowercase();
806 if err_str.contains("409") || err_str.contains("conflict") {
807 CamelError::ProcessorError(format!(
808 "Container name '{}' already exists. Use a unique name or remove the existing container first",
809 container_name_ref
810 ))
811 } else {
812 CamelError::ProcessorError(format!(
813 "Failed to create container: {}",
814 e
815 ))
816 }
817 })?;
818
819 Ok(create_response.id)
820 },
821 move |container_id| async move {
822 docker_start
823 .start_container(&container_id, None::<StartContainerOptions>)
824 .await
825 .map_err(|e| {
826 CamelError::ProcessorError(format!(
827 "Failed to start container: {}",
828 e
829 ))
830 })
831 },
832 move |container_id| async move {
833 docker_remove
834 .remove_container(&container_id, None)
835 .await
836 .map_err(|e| {
837 CamelError::ProcessorError(format!(
838 "Failed to remove container after start failure: {}",
839 e
840 ))
841 })
842 },
843 )
844 .await?;
845
846 track_container(container_id.clone());
847
848 exchange
849 .input
850 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
851 exchange.input.set_header(
852 HEADER_ACTION_RESULT,
853 serde_json::Value::String("success".to_string()),
854 );
855 Ok(())
856}
857
858async fn handle_lifecycle(
859 docker: Docker,
860 _config: ContainerConfig,
861 exchange: &mut Exchange,
862 operation: ProducerOperation,
863 operation_name: &str,
864) -> Result<(), CamelError> {
865 let container_id = exchange
866 .input
867 .header(HEADER_CONTAINER_ID)
868 .and_then(|v| v.as_str().map(|s| s.to_string()))
869 .ok_or_else(|| {
870 CamelError::ProcessorError(format!(
871 "{} header is required for {} operation",
872 HEADER_CONTAINER_ID, operation_name
873 ))
874 })?;
875
876 match operation {
877 ProducerOperation::Start => {
878 docker
879 .start_container(&container_id, None::<StartContainerOptions>)
880 .await
881 .map_err(|e| {
882 CamelError::ProcessorError(format!("Failed to start container: {}", e))
883 })?;
884 }
885 ProducerOperation::Stop => {
886 docker
887 .stop_container(&container_id, None)
888 .await
889 .map_err(|e| {
890 CamelError::ProcessorError(format!("Failed to stop container: {}", e))
891 })?;
892 }
893 ProducerOperation::Remove => {
894 docker
895 .remove_container(&container_id, None)
896 .await
897 .map_err(|e| {
898 CamelError::ProcessorError(format!("Failed to remove container: {}", e))
899 })?;
900 untrack_container(&container_id);
901 }
902 _ => {}
903 }
904
905 exchange.input.set_header(
906 HEADER_ACTION_RESULT,
907 serde_json::Value::String("success".to_string()),
908 );
909 Ok(())
910}
911
912async fn handle_exec(
913 docker: Docker,
914 config: ContainerConfig,
915 exchange: &mut Exchange,
916) -> Result<(), CamelError> {
917 let container_id = exchange
918 .input
919 .header(HEADER_CONTAINER_ID)
920 .and_then(|v| v.as_str().map(|s| s.to_string()))
921 .or(config.container_id.clone())
922 .ok_or_else(|| {
923 CamelError::ProcessorError(format!(
924 "{} header or containerId param is required for exec operation",
925 HEADER_CONTAINER_ID
926 ))
927 })?;
928
929 let cmd = exchange
930 .input
931 .header(HEADER_CMD)
932 .and_then(|v| v.as_str().map(|s| s.to_string()))
933 .or(config.cmd.clone())
934 .ok_or_else(|| {
935 CamelError::ProcessorError(
936 "CamelContainerCmd header or cmd param is required for exec operation".to_string(),
937 )
938 })?;
939
940 let cmd_parts: Vec<String> = cmd.split_whitespace().map(|s| s.to_string()).collect();
941 let env_vars = config.parse_env();
942
943 let exec_config = bollard::exec::CreateExecOptions {
944 cmd: Some(cmd_parts),
945 env: env_vars,
946 user: config.user.clone(),
947 working_dir: config.workdir.clone(),
948 attach_stdout: Some(true),
949 attach_stderr: Some(true),
950 ..Default::default()
951 };
952
953 let create_result = docker
954 .create_exec(&container_id, exec_config)
955 .await
956 .map_err(|e| {
957 let err_str = e.to_string().to_lowercase();
958 if err_str.contains("404") || err_str.contains("no such") {
959 CamelError::ProcessorError(format!(
960 "Container '{}' not found for exec",
961 container_id
962 ))
963 } else {
964 CamelError::ProcessorError(format!("Failed to create exec: {}", e))
965 }
966 })?;
967
968 let exec_id = create_result.id;
969
970 if config.detach {
971 docker
972 .start_exec(
973 &exec_id,
974 Some(bollard::exec::StartExecOptions {
975 detach: true,
976 ..Default::default()
977 }),
978 )
979 .await
980 .map_err(|e| {
981 CamelError::ProcessorError(format!("Failed to start exec (detached): {}", e))
982 })?;
983
984 exchange
985 .input
986 .set_header(HEADER_EXEC_ID, serde_json::Value::String(exec_id));
987 exchange
988 .input
989 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
990 } else {
991 let start_result = docker
992 .start_exec(&exec_id, None)
993 .await
994 .map_err(|e| CamelError::ProcessorError(format!("Failed to start exec: {}", e)))?;
995
996 let mut output = String::new();
997
998 match start_result {
999 bollard::exec::StartExecResults::Attached {
1000 output: mut stream, ..
1001 } => {
1002 use futures::StreamExt;
1003 while let Some(msg) = stream.next().await {
1004 match msg {
1005 Ok(bollard::container::LogOutput::StdOut { message }) => {
1006 output.push_str(&String::from_utf8_lossy(&message));
1007 }
1008 Ok(bollard::container::LogOutput::StdErr { message }) => {
1009 output.push_str(&String::from_utf8_lossy(&message));
1010 }
1011 Ok(_) => {}
1012 Err(e) => {
1013 output.push_str(&format!("[error reading stream: {}]", e));
1014 }
1015 }
1016 }
1017 }
1018 bollard::exec::StartExecResults::Detached => {}
1019 }
1020
1021 let inspect = docker
1022 .inspect_exec(&exec_id)
1023 .await
1024 .map_err(|e| CamelError::ProcessorError(format!("Failed to inspect exec: {}", e)))?;
1025
1026 let exit_code: i64 = inspect.exit_code.ok_or_else(|| {
1027 CamelError::ProcessorError("container exec returned no exit code".into())
1028 })?;
1029
1030 let output = output.trim_end().to_string();
1031 exchange.input.body = Body::Text(output);
1032 exchange.input.set_header(
1033 HEADER_EXIT_CODE,
1034 serde_json::Value::Number(exit_code.into()),
1035 );
1036 exchange
1037 .input
1038 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
1039 }
1040
1041 exchange.input.set_header(
1042 HEADER_ACTION_RESULT,
1043 serde_json::Value::String("success".to_string()),
1044 );
1045 Ok(())
1046}
1047
1048async fn handle_network_create(
1049 docker: Docker,
1050 config: ContainerConfig,
1051 exchange: &mut Exchange,
1052) -> Result<(), CamelError> {
1053 let network_name = exchange
1054 .input
1055 .header(HEADER_CONTAINER_NAME)
1056 .and_then(|v| v.as_str().map(|s| s.to_string()))
1057 .or(config.name.clone())
1058 .ok_or_else(|| {
1059 CamelError::ProcessorError(
1060 "CamelContainerName header or name param is required for network-create"
1061 .to_string(),
1062 )
1063 })?;
1064
1065 let driver = config.driver.as_deref().unwrap_or("bridge");
1066
1067 let options = NetworkCreateRequest {
1068 name: network_name.clone(),
1069 driver: Some(driver.to_string()),
1070 ..Default::default()
1071 };
1072
1073 let result = docker.create_network(options).await.map_err(|e| {
1074 let err_str = e.to_string().to_lowercase();
1075 if err_str.contains("409") || err_str.contains("already exists") {
1076 CamelError::ProcessorError(format!("Network '{}' already exists", network_name))
1077 } else {
1078 CamelError::ProcessorError(format!("Failed to create network: {}", e))
1079 }
1080 })?;
1081
1082 let network_id = result.id.clone();
1083 let json_value = serde_json::to_value(&result).map_err(|e| {
1084 CamelError::ProcessorError(format!("Failed to serialize network response: {}", e))
1085 })?;
1086
1087 exchange.input.body = Body::Json(json_value);
1088 exchange
1089 .input
1090 .set_header(HEADER_NETWORK, serde_json::Value::String(network_id));
1091 exchange.input.set_header(
1092 HEADER_ACTION_RESULT,
1093 serde_json::Value::String("success".to_string()),
1094 );
1095 Ok(())
1096}
1097
1098async fn handle_network_connect(
1099 docker: Docker,
1100 config: ContainerConfig,
1101 exchange: &mut Exchange,
1102) -> Result<(), CamelError> {
1103 let network = exchange
1104 .input
1105 .header(HEADER_NETWORK)
1106 .and_then(|v| v.as_str().map(|s| s.to_string()))
1107 .or(config.network.clone())
1108 .ok_or_else(|| {
1109 CamelError::ProcessorError(
1110 "CamelContainerNetwork header or network param is required for network-connect"
1111 .to_string(),
1112 )
1113 })?;
1114
1115 let container = exchange
1116 .input
1117 .header(HEADER_CONTAINER_ID)
1118 .and_then(|v| v.as_str().map(|s| s.to_string()))
1119 .or(config.container_id.clone())
1120 .ok_or_else(|| {
1121 CamelError::ProcessorError(
1122 "CamelContainerId header or container param is required for network-connect"
1123 .to_string(),
1124 )
1125 })?;
1126
1127 docker
1128 .connect_network(
1129 &network,
1130 NetworkConnectRequest {
1131 container,
1132 ..Default::default()
1133 },
1134 )
1135 .await
1136 .map_err(|e| {
1137 let err_str = e.to_string().to_lowercase();
1138 if err_str.contains("404") || err_str.contains("not found") {
1139 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1140 } else {
1141 CamelError::ProcessorError(format!("Failed to connect to network: {}", e))
1142 }
1143 })?;
1144
1145 exchange.input.set_header(
1146 HEADER_ACTION_RESULT,
1147 serde_json::Value::String("success".to_string()),
1148 );
1149 Ok(())
1150}
1151
1152async fn handle_network_disconnect(
1153 docker: Docker,
1154 config: ContainerConfig,
1155 exchange: &mut Exchange,
1156) -> Result<(), CamelError> {
1157 let network = exchange
1158 .input
1159 .header(HEADER_NETWORK)
1160 .and_then(|v| v.as_str().map(|s| s.to_string()))
1161 .or(config.network.clone())
1162 .ok_or_else(|| {
1163 CamelError::ProcessorError(
1164 "CamelContainerNetwork header or network param is required for network-disconnect"
1165 .to_string(),
1166 )
1167 })?;
1168
1169 let container = exchange
1170 .input
1171 .header(HEADER_CONTAINER_ID)
1172 .and_then(|v| v.as_str().map(|s| s.to_string()))
1173 .or(config.container_id.clone())
1174 .ok_or_else(|| {
1175 CamelError::ProcessorError(
1176 "CamelContainerId header or container param is required for network-disconnect"
1177 .to_string(),
1178 )
1179 })?;
1180
1181 docker
1182 .disconnect_network(
1183 &network,
1184 NetworkDisconnectRequest {
1185 container,
1186 force: Some(config.force),
1187 },
1188 )
1189 .await
1190 .map_err(|e| {
1191 let err_str = e.to_string().to_lowercase();
1192 if err_str.contains("404") || err_str.contains("not found") {
1193 CamelError::ProcessorError(format!("Network '{}' or container not found", network))
1194 } else {
1195 CamelError::ProcessorError(format!("Failed to disconnect from network: {}", e))
1196 }
1197 })?;
1198
1199 exchange.input.set_header(
1200 HEADER_ACTION_RESULT,
1201 serde_json::Value::String("success".to_string()),
1202 );
1203 Ok(())
1204}
1205
1206async fn handle_network_remove(
1207 docker: Docker,
1208 config: ContainerConfig,
1209 exchange: &mut Exchange,
1210) -> Result<(), CamelError> {
1211 let network = exchange
1212 .input
1213 .header(HEADER_NETWORK)
1214 .and_then(|v| v.as_str().map(|s| s.to_string()))
1215 .or(config.network.clone())
1216 .ok_or_else(|| {
1217 CamelError::ProcessorError(
1218 "CamelContainerNetwork header or network param is required for network-remove"
1219 .to_string(),
1220 )
1221 })?;
1222
1223 docker.remove_network(&network).await.map_err(|e| {
1224 let err_str = e.to_string().to_lowercase();
1225 if err_str.contains("404") || err_str.contains("not found") {
1226 CamelError::ProcessorError(format!("Network '{}' not found", network))
1227 } else if err_str.contains("409") || err_str.contains("in use") {
1228 CamelError::ProcessorError(format!(
1229 "Network '{}' is in use and cannot be removed",
1230 network
1231 ))
1232 } else {
1233 CamelError::ProcessorError(format!("Failed to remove network: {}", e))
1234 }
1235 })?;
1236
1237 exchange.input.set_header(
1238 HEADER_ACTION_RESULT,
1239 serde_json::Value::String("success".to_string()),
1240 );
1241 Ok(())
1242}
1243
1244async fn handle_network_list(
1245 docker: Docker,
1246 _config: ContainerConfig,
1247 exchange: &mut Exchange,
1248) -> Result<(), CamelError> {
1249 let networks = docker
1250 .list_networks(None::<ListNetworksOptions>)
1251 .await
1252 .map_err(|e| CamelError::ProcessorError(format!("Failed to list networks: {}", e)))?;
1253
1254 let json_value = serde_json::to_value(&networks)
1255 .map_err(|e| CamelError::ProcessorError(format!("Failed to serialize networks: {}", e)))?;
1256
1257 exchange.input.body = Body::Json(json_value);
1258 exchange.input.set_header(
1259 HEADER_ACTION_RESULT,
1260 serde_json::Value::String("success".to_string()),
1261 );
1262 Ok(())
1263}
1264
1265#[derive(Clone)]
1270pub struct ContainerProducer {
1271 config: ContainerConfig,
1272 docker: Docker,
1273}
1274
1275impl Service<Exchange> for ContainerProducer {
1276 type Response = Exchange;
1277 type Error = CamelError;
1278 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1279
1280 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1281 Poll::Ready(Ok(()))
1282 }
1283
1284 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
1285 let config = self.config.clone();
1286 let docker = self.docker.clone();
1287 Box::pin(async move {
1288 let operation_name = exchange
1289 .input
1290 .header(HEADER_ACTION)
1291 .and_then(|v| v.as_str().map(|s| s.to_string()))
1292 .unwrap_or_else(|| config.operation.clone());
1293
1294 let operation = parse_producer_operation(&operation_name)?;
1295
1296 match operation {
1297 ProducerOperation::List => {
1298 handle_list(docker, config, &mut exchange).await?;
1299 }
1300 ProducerOperation::Run => {
1301 handle_run(docker, config, &mut exchange).await?;
1302 }
1303 ProducerOperation::Start => {
1304 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1305 .await?;
1306 }
1307 ProducerOperation::Stop => {
1308 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1309 .await?;
1310 }
1311 ProducerOperation::Remove => {
1312 handle_lifecycle(docker, config, &mut exchange, operation, &operation_name)
1313 .await?;
1314 }
1315 ProducerOperation::Exec => {
1316 handle_exec(docker, config, &mut exchange).await?;
1317 }
1318 ProducerOperation::NetworkCreate => {
1319 handle_network_create(docker, config, &mut exchange).await?;
1320 }
1321 ProducerOperation::NetworkConnect => {
1322 handle_network_connect(docker, config, &mut exchange).await?;
1323 }
1324 ProducerOperation::NetworkDisconnect => {
1325 handle_network_disconnect(docker, config, &mut exchange).await?;
1326 }
1327 ProducerOperation::NetworkRemove => {
1328 handle_network_remove(docker, config, &mut exchange).await?;
1329 }
1330 ProducerOperation::NetworkList => {
1331 handle_network_list(docker, config, &mut exchange).await?;
1332 }
1333 }
1334
1335 Ok(exchange)
1336 })
1337 }
1338}
1339
1340pub struct ContainerConsumer {
1345 config: ContainerConfig,
1346}
1347
1348#[async_trait]
1349impl Consumer for ContainerConsumer {
1350 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1351 match self.config.operation.as_str() {
1352 "events" => self.start_events_consumer(context).await,
1353 "logs" => self.start_logs_consumer(context).await,
1354 _ => Err(CamelError::EndpointCreationFailed(format!(
1355 "Consumer only supports 'events' or 'logs' operations, got '{}'",
1356 self.config.operation
1357 ))),
1358 }
1359 }
1360
1361 async fn stop(&mut self) -> Result<(), CamelError> {
1362 Ok(())
1363 }
1364
1365 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
1366 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1367 }
1368}
1369
1370impl ContainerConsumer {
1371 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1372 use futures::StreamExt;
1373
1374 loop {
1375 if context.is_cancelled() {
1376 tracing::info!("Container events consumer shutting down");
1377 return Ok(());
1378 }
1379
1380 let docker = match self.config.connect_docker().await {
1381 Ok(d) => d,
1382 Err(e) => {
1383 tracing::error!(
1384 "Consumer failed to connect to docker: {}. Retrying in 5s...",
1385 e
1386 );
1387 tokio::select! {
1388 _ = context.cancelled() => {
1389 tracing::info!("Container events consumer shutting down");
1390 return Ok(());
1391 }
1392 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1393 }
1394 continue;
1395 }
1396 };
1397
1398 let mut event_stream = docker.events(None::<EventsOptions>);
1399
1400 loop {
1401 tokio::select! {
1402 _ = context.cancelled() => {
1403 tracing::info!("Container events consumer shutting down");
1404 return Ok(());
1405 }
1406
1407 msg = event_stream.next() => {
1408 match msg {
1409 Some(Ok(event)) => {
1410 let formatted = format_docker_event(&event);
1411 let message = Message::new(Body::Text(formatted));
1412 let exchange = Exchange::new(message);
1413
1414 if let Err(e) = context.send(exchange).await {
1415 tracing::error!("Failed to send exchange: {:?}", e);
1416 break;
1417 }
1418 }
1419 Some(Err(e)) => {
1420 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
1421 break;
1422 }
1423 None => {
1424 tracing::info!("Docker event stream ended. Reconnecting...");
1425 break;
1426 }
1427 }
1428 }
1429 }
1430 }
1431
1432 tokio::select! {
1433 _ = context.cancelled() => {
1434 tracing::info!("Container events consumer shutting down");
1435 return Ok(());
1436 }
1437 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1438 }
1439 }
1440 }
1441
1442 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1443 use futures::StreamExt;
1444
1445 let container_id = self.config.container_id.clone().ok_or_else(|| {
1446 CamelError::EndpointCreationFailed(
1447 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
1448 .to_string(),
1449 )
1450 })?;
1451
1452 loop {
1453 if context.is_cancelled() {
1454 tracing::info!("Container logs consumer shutting down");
1455 return Ok(());
1456 }
1457
1458 let docker = match self.config.connect_docker().await {
1459 Ok(d) => d,
1460 Err(e) => {
1461 tracing::error!(
1462 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
1463 e
1464 );
1465 tokio::select! {
1466 _ = context.cancelled() => {
1467 tracing::info!("Container logs consumer shutting down");
1468 return Ok(());
1469 }
1470 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
1471 }
1472 continue;
1473 }
1474 };
1475
1476 let tail = self
1477 .config
1478 .tail
1479 .clone()
1480 .unwrap_or_else(|| "all".to_string());
1481
1482 let options = LogsOptions {
1483 follow: self.config.follow,
1484 stdout: true,
1485 stderr: true,
1486 timestamps: self.config.timestamps,
1487 tail,
1488 ..Default::default()
1489 };
1490
1491 let mut log_stream = docker.logs(&container_id, Some(options));
1492 let container_id_header = container_id.clone();
1493
1494 loop {
1495 tokio::select! {
1496 _ = context.cancelled() => {
1497 tracing::info!("Container logs consumer shutting down");
1498 return Ok(());
1499 }
1500
1501 msg = log_stream.next() => {
1502 match msg {
1503 Some(Ok(log_output)) => {
1504 let (stream_type, content) = match log_output {
1505 bollard::container::LogOutput::StdOut { message } => {
1506 ("stdout", String::from_utf8_lossy(&message).into_owned())
1507 }
1508 bollard::container::LogOutput::StdErr { message } => {
1509 ("stderr", String::from_utf8_lossy(&message).into_owned())
1510 }
1511 bollard::container::LogOutput::Console { message } => {
1512 ("console", String::from_utf8_lossy(&message).into_owned())
1513 }
1514 bollard::container::LogOutput::StdIn { message } => {
1515 ("stdin", String::from_utf8_lossy(&message).into_owned())
1516 }
1517 };
1518
1519 let content = content.trim_end();
1520 if content.is_empty() {
1521 continue;
1522 }
1523
1524 let mut message = Message::new(Body::Text(content.to_string()));
1525 message.set_header(
1526 HEADER_CONTAINER_ID,
1527 serde_json::Value::String(container_id_header.clone()),
1528 );
1529 message.set_header(
1530 HEADER_LOG_STREAM,
1531 serde_json::Value::String(stream_type.to_string()),
1532 );
1533
1534 if self.config.timestamps
1535 && let Some(ts) = extract_timestamp(content) {
1536 message.set_header(
1537 HEADER_LOG_TIMESTAMP,
1538 serde_json::Value::String(ts),
1539 );
1540 }
1541
1542 let exchange = Exchange::new(message);
1543
1544 if let Err(e) = context.send(exchange).await {
1545 tracing::error!("Failed to send log exchange: {:?}", e);
1546 break;
1547 }
1548 }
1549 Some(Err(e)) => {
1550 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1551 break;
1552 }
1553 None => {
1554 if self.config.follow {
1555 tracing::info!("Docker log stream ended. Reconnecting...");
1556 break;
1557 } else {
1558 tracing::info!("Container logs consumer finished (follow=false)");
1559 return Ok(());
1560 }
1561 }
1562 }
1563 }
1564 }
1565 }
1566
1567 tokio::select! {
1568 _ = context.cancelled() => {
1569 tracing::info!("Container logs consumer shutting down");
1570 return Ok(());
1571 }
1572 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1573 }
1574 }
1575 }
1576}
1577
1578fn extract_timestamp(log_line: &str) -> Option<String> {
1579 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1580 if parts.len() > 1 && parts[0].contains('T') {
1581 Some(parts[0].to_string())
1582 } else {
1583 None
1584 }
1585}
1586
1587pub struct ContainerComponent {
1595 config: Option<ContainerGlobalConfig>,
1596}
1597
1598impl ContainerComponent {
1599 pub fn new() -> Self {
1601 Self { config: None }
1602 }
1603
1604 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1606 Self {
1607 config: Some(config),
1608 }
1609 }
1610
1611 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1613 Self { config }
1614 }
1615}
1616
1617impl Default for ContainerComponent {
1618 fn default() -> Self {
1619 Self::new()
1620 }
1621}
1622
1623impl Component for ContainerComponent {
1624 fn scheme(&self) -> &str {
1625 "container"
1626 }
1627
1628 fn create_endpoint(
1629 &self,
1630 uri: &str,
1631 ctx: &dyn camel_component_api::ComponentContext,
1632 ) -> Result<Box<dyn Endpoint>, CamelError> {
1633 let mut config = ContainerConfig::from_uri(uri)?;
1634 if let Some(ref global) = self.config {
1636 config.apply_global_defaults(global);
1637 }
1638 let health_check = ContainerHealthCheck::new(&config);
1639 ctx.register_current_route_health_check(Arc::new(health_check));
1640 Ok(Box::new(ContainerEndpoint {
1641 uri: uri.to_string(),
1642 config,
1643 }))
1644 }
1645}
1646
1647pub struct ContainerEndpoint {
1654 uri: String,
1655 config: ContainerConfig,
1656}
1657
1658impl ContainerEndpoint {
1659 pub fn docker_host(&self) -> Option<&str> {
1662 self.config.host.as_deref()
1663 }
1664}
1665
1666impl Endpoint for ContainerEndpoint {
1667 fn uri(&self) -> &str {
1668 &self.uri
1669 }
1670
1671 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1672 Ok(Box::new(ContainerConsumer {
1673 config: self.config.clone(),
1674 }))
1675 }
1676
1677 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1678 let docker = self.config.connect_docker_client()?;
1679 Ok(BoxProcessor::new(ContainerProducer {
1680 config: self.config.clone(),
1681 docker,
1682 }))
1683 }
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688 use super::*;
1689 use camel_component_api::NoOpComponentContext;
1690
1691 #[test]
1692 fn test_container_config() {
1693 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1694 assert_eq!(config.operation, "run");
1695 assert_eq!(config.image.as_deref(), Some("alpine"));
1696 assert!(config.host.is_none());
1698 }
1699
1700 #[test]
1701 fn test_global_config_applied_to_endpoint() {
1702 let global =
1705 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1706 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1707 assert!(
1708 config.host.is_none(),
1709 "URI without ?host= should leave host as None"
1710 );
1711 config.apply_global_defaults(&global);
1712 assert_eq!(
1713 config.host.as_deref(),
1714 Some("unix:///custom/docker.sock"),
1715 "global docker_host must be applied when URI did not set host"
1716 );
1717 }
1718
1719 #[test]
1720 fn test_uri_param_wins_over_global_config() {
1721 let global =
1723 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1724 let mut config =
1725 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1726 .unwrap();
1727 assert_eq!(
1728 config.host.as_deref(),
1729 Some("unix:///override.sock"),
1730 "URI-set host should be parsed correctly"
1731 );
1732 config.apply_global_defaults(&global);
1733 assert_eq!(
1734 config.host.as_deref(),
1735 Some("unix:///override.sock"),
1736 "global config must NOT override a host already set by URI"
1737 );
1738 }
1739
1740 #[test]
1741 fn test_container_config_parses_name() {
1742 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1743 assert_eq!(config.name.as_deref(), Some("my-container"));
1744 }
1745
1746 #[test]
1747 fn test_parse_producer_operation_known() {
1748 assert_eq!(
1749 parse_producer_operation("list").unwrap(),
1750 ProducerOperation::List
1751 );
1752 assert_eq!(
1753 parse_producer_operation("run").unwrap(),
1754 ProducerOperation::Run
1755 );
1756 assert_eq!(
1757 parse_producer_operation("start").unwrap(),
1758 ProducerOperation::Start
1759 );
1760 assert_eq!(
1761 parse_producer_operation("stop").unwrap(),
1762 ProducerOperation::Stop
1763 );
1764 assert_eq!(
1765 parse_producer_operation("remove").unwrap(),
1766 ProducerOperation::Remove
1767 );
1768 }
1769
1770 #[test]
1771 fn test_parse_producer_operation_unknown() {
1772 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1773 match err {
1774 CamelError::ProcessorError(msg) => {
1775 assert!(
1776 msg.contains("Unknown container operation"),
1777 "Unexpected error message: {}",
1778 msg
1779 );
1780 }
1781 _ => panic!("Expected ProcessorError for unknown operation"),
1782 }
1783 }
1784
1785 #[test]
1786 fn test_parse_producer_operation_new_variants() {
1787 assert_eq!(
1788 parse_producer_operation("exec").unwrap(),
1789 ProducerOperation::Exec
1790 );
1791 assert_eq!(
1792 parse_producer_operation("network-create").unwrap(),
1793 ProducerOperation::NetworkCreate
1794 );
1795 assert_eq!(
1796 parse_producer_operation("network-connect").unwrap(),
1797 ProducerOperation::NetworkConnect
1798 );
1799 assert_eq!(
1800 parse_producer_operation("network-disconnect").unwrap(),
1801 ProducerOperation::NetworkDisconnect
1802 );
1803 assert_eq!(
1804 parse_producer_operation("network-remove").unwrap(),
1805 ProducerOperation::NetworkRemove
1806 );
1807 assert_eq!(
1808 parse_producer_operation("network-list").unwrap(),
1809 ProducerOperation::NetworkList
1810 );
1811 }
1812
1813 #[test]
1814 fn test_resolve_container_name_header_overrides_config() {
1815 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1816 let mut exchange = Exchange::new(Message::new(""));
1817 exchange.input.set_header(
1818 HEADER_CONTAINER_NAME,
1819 serde_json::Value::String("header-name".to_string()),
1820 );
1821
1822 let resolved = resolve_container_name(&exchange, &config);
1823 assert_eq!(resolved.as_deref(), Some("header-name"));
1824 }
1825
1826 #[test]
1827 fn test_container_config_rejects_tcp_host() {
1828 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1829 let err = config.connect_docker_client().unwrap_err();
1830 match err {
1831 CamelError::ProcessorError(msg) => {
1832 assert!(
1833 msg.to_lowercase().contains("tcp"),
1834 "Expected TCP scheme error, got: {}",
1835 msg
1836 );
1837 }
1838 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1839 }
1840 }
1841
1842 #[tokio::test]
1843 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1844 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1845 let remove_called_clone = remove_called.clone();
1846
1847 let result = run_container_with_cleanup(
1848 || async { Ok("container-123".to_string()) },
1849 |_id| async move {
1850 Err(CamelError::ProcessorError(
1851 "Failed to start container".to_string(),
1852 ))
1853 },
1854 move |_id| {
1855 let remove_called_inner = remove_called_clone.clone();
1856 async move {
1857 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1858 Ok(())
1859 }
1860 },
1861 )
1862 .await;
1863
1864 assert!(result.is_err(), "Expected start failure to bubble up");
1865 assert!(
1866 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1867 "Expected cleanup to remove container"
1868 );
1869 }
1870
1871 #[test]
1872 fn test_container_component_creates_endpoint() {
1873 let component = ContainerComponent::new();
1874 assert_eq!(component.scheme(), "container");
1875 let ctx = NoOpComponentContext;
1876 let endpoint = component
1877 .create_endpoint("container:run?image=alpine", &ctx)
1878 .unwrap();
1879 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1880 }
1881
1882 #[test]
1883 fn test_container_config_parses_ports() {
1884 let config =
1885 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1886 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1887 }
1888
1889 #[test]
1890 fn test_container_config_parses_env() {
1891 let config =
1892 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1893 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1894 }
1895
1896 #[test]
1897 fn test_container_config_parses_logs_options() {
1898 let config = ContainerConfig::from_uri(
1899 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1900 )
1901 .unwrap();
1902 assert_eq!(config.operation, "logs");
1903 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1904 assert!(config.follow);
1905 assert!(config.timestamps);
1906 assert_eq!(config.tail.as_deref(), Some("100"));
1907 }
1908
1909 #[test]
1910 fn test_container_config_logs_defaults() {
1911 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1912 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1916
1917 #[test]
1918 fn test_parse_ports_single() {
1919 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1920 let (exposed, bindings) = config.parse_ports().unwrap();
1921
1922 assert!(exposed.contains(&"80/tcp".to_string()));
1923 assert!(bindings.contains_key("80/tcp"));
1924
1925 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1926 assert_eq!(binding.len(), 1);
1927 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1928 }
1929
1930 #[test]
1931 fn test_parse_ports_multiple() {
1932 let config =
1933 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1934 let (exposed, bindings) = config.parse_ports().unwrap();
1935
1936 assert!(exposed.contains(&"80/tcp".to_string()));
1937 assert!(exposed.contains(&"443/tcp".to_string()));
1938 assert_eq!(bindings.len(), 2);
1939 }
1940
1941 #[test]
1942 fn test_parse_ports_with_protocol() {
1943 let config =
1944 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1945 .unwrap();
1946 let (exposed, _bindings) = config.parse_ports().unwrap();
1947
1948 assert!(exposed.contains(&"80/tcp".to_string()));
1949 assert!(exposed.contains(&"53/udp".to_string()));
1950 }
1951
1952 #[test]
1953 fn test_parse_ports_none() {
1954 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1955 let (exposed, bindings) = config.parse_ports().unwrap();
1956 assert!(exposed.is_empty());
1957 assert!(bindings.is_empty());
1958 }
1959
1960 #[test]
1961 fn test_parse_env_single() {
1962 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1963 let env = config.parse_env().unwrap();
1964
1965 assert_eq!(env.len(), 1);
1966 assert_eq!(env[0], "FOO=bar");
1967 }
1968
1969 #[test]
1970 fn test_parse_env_multiple() {
1971 let config =
1972 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1973 .unwrap();
1974 let env = config.parse_env().unwrap();
1975
1976 assert_eq!(env.len(), 3);
1977 assert!(env.contains(&"FOO=bar".to_string()));
1978 assert!(env.contains(&"BAZ=qux".to_string()));
1979 assert!(env.contains(&"NUM=123".to_string()));
1980 }
1981
1982 #[test]
1983 fn test_parse_env_none() {
1984 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1985 assert!(config.parse_env().is_none());
1986 }
1987
1988 use camel_component_api::Message;
1989 use std::sync::Arc;
1990
1991 #[tokio::test]
1992 async fn test_container_producer_resolves_operation_from_header() {
1993 let docker = match Docker::connect_with_local_defaults() {
1995 Ok(d) => d,
1996 Err(_) => {
1997 eprintln!("Skipping test: Could not connect to Docker daemon");
1998 return;
1999 }
2000 };
2001
2002 if docker.ping().await.is_err() {
2003 eprintln!("Skipping test: Docker daemon not responding to ping");
2004 return;
2005 }
2006
2007 let component = ContainerComponent::new();
2008 let ctx = NoOpComponentContext;
2009 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2010
2011 let ctx = ProducerContext::new();
2012 let mut producer = endpoint.create_producer(&ctx).unwrap();
2013
2014 let mut exchange = Exchange::new(Message::new(""));
2015 exchange
2016 .input
2017 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2018
2019 use tower::ServiceExt;
2020 let result = producer
2021 .ready()
2022 .await
2023 .unwrap()
2024 .call(exchange)
2025 .await
2026 .unwrap();
2027
2028 assert_eq!(
2030 result
2031 .input
2032 .header(HEADER_ACTION_RESULT)
2033 .map(|v| v.as_str().unwrap()),
2034 Some("success")
2035 );
2036 }
2037
2038 #[tokio::test]
2039 async fn test_container_producer_connection_error_on_invalid_host() {
2040 let component = ContainerComponent::new();
2042 let ctx = NoOpComponentContext;
2043 let endpoint = component
2044 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
2045 .unwrap();
2046
2047 let ctx = ProducerContext::new();
2048 let result = endpoint.create_producer(&ctx);
2049
2050 assert!(
2052 result.is_err(),
2053 "Expected error when connecting to invalid host"
2054 );
2055 let err = result.unwrap_err();
2056 match &err {
2057 CamelError::ProcessorError(msg) => {
2058 assert!(
2059 msg.to_lowercase().contains("connection")
2060 || msg.to_lowercase().contains("connect")
2061 || msg.to_lowercase().contains("socket")
2062 || msg.contains("docker"),
2063 "Error message should indicate connection failure, got: {}",
2064 msg
2065 );
2066 }
2067 _ => panic!("Expected ProcessorError, got: {:?}", err),
2068 }
2069 }
2070
2071 #[tokio::test]
2073 async fn test_container_producer_lifecycle_operations_missing_id() {
2074 let docker = match Docker::connect_with_local_defaults() {
2076 Ok(d) => d,
2077 Err(_) => {
2078 eprintln!("Skipping test: Could not connect to Docker daemon");
2079 return;
2080 }
2081 };
2082
2083 if docker.ping().await.is_err() {
2084 eprintln!("Skipping test: Docker daemon not responding to ping");
2085 return;
2086 }
2087
2088 let component = ContainerComponent::new();
2089 let ctx = NoOpComponentContext;
2090 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
2091 let ctx = ProducerContext::new();
2092 let mut producer = endpoint.create_producer(&ctx).unwrap();
2093
2094 for operation in ["start", "stop", "remove"] {
2096 let mut exchange = Exchange::new(Message::new(""));
2097 exchange.input.set_header(
2098 HEADER_ACTION,
2099 serde_json::Value::String(operation.to_string()),
2100 );
2101 use tower::ServiceExt;
2104 let result = producer.ready().await.unwrap().call(exchange).await;
2105
2106 assert!(
2107 result.is_err(),
2108 "Expected error for {} operation without CamelContainerId",
2109 operation
2110 );
2111 let err = result.unwrap_err();
2112 match &err {
2113 CamelError::ProcessorError(msg) => {
2114 assert!(
2115 msg.contains(HEADER_CONTAINER_ID),
2116 "Error message should mention {}, got: {}",
2117 HEADER_CONTAINER_ID,
2118 msg
2119 );
2120 }
2121 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
2122 }
2123 }
2124 }
2125
2126 #[tokio::test]
2128 async fn test_container_producer_stop_nonexistent() {
2129 let docker = match Docker::connect_with_local_defaults() {
2131 Ok(d) => d,
2132 Err(_) => {
2133 eprintln!("Skipping test: Could not connect to Docker daemon");
2134 return;
2135 }
2136 };
2137
2138 if docker.ping().await.is_err() {
2139 eprintln!("Skipping test: Docker daemon not responding to ping");
2140 return;
2141 }
2142
2143 let component = ContainerComponent::new();
2144 let ctx = NoOpComponentContext;
2145 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
2146 let ctx = ProducerContext::new();
2147 let mut producer = endpoint.create_producer(&ctx).unwrap();
2148
2149 let mut exchange = Exchange::new(Message::new(""));
2150 exchange
2151 .input
2152 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
2153 exchange.input.set_header(
2154 HEADER_CONTAINER_ID,
2155 serde_json::Value::String("nonexistent-container-123".into()),
2156 );
2157
2158 use tower::ServiceExt;
2159 let result = producer.ready().await.unwrap().call(exchange).await;
2160
2161 assert!(
2162 result.is_err(),
2163 "Expected error when stopping nonexistent container"
2164 );
2165 let err = result.unwrap_err();
2166 match &err {
2167 CamelError::ProcessorError(msg) => {
2168 assert!(
2170 msg.to_lowercase().contains("no such container")
2171 || msg.to_lowercase().contains("not found")
2172 || msg.contains("404"),
2173 "Error message should indicate container not found, got: {}",
2174 msg
2175 );
2176 }
2177 _ => panic!("Expected ProcessorError, got: {:?}", err),
2178 }
2179 }
2180
2181 #[tokio::test]
2183 async fn test_container_producer_run_missing_image() {
2184 let docker = match Docker::connect_with_local_defaults() {
2186 Ok(d) => d,
2187 Err(_) => {
2188 eprintln!("Skipping test: Could not connect to Docker daemon");
2189 return;
2190 }
2191 };
2192
2193 if docker.ping().await.is_err() {
2194 eprintln!("Skipping test: Docker daemon not responding to ping");
2195 return;
2196 }
2197
2198 let component = ContainerComponent::new();
2200 let ctx = NoOpComponentContext;
2201 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2202 let ctx = ProducerContext::new();
2203 let mut producer = endpoint.create_producer(&ctx).unwrap();
2204
2205 let mut exchange = Exchange::new(Message::new(""));
2206 exchange
2207 .input
2208 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2209 use tower::ServiceExt;
2212 let result = producer.ready().await.unwrap().call(exchange).await;
2213
2214 assert!(
2215 result.is_err(),
2216 "Expected error for run operation without image"
2217 );
2218 let err = result.unwrap_err();
2219 match &err {
2220 CamelError::ProcessorError(msg) => {
2221 assert!(
2222 msg.to_lowercase().contains("image"),
2223 "Error message should mention 'image', got: {}",
2224 msg
2225 );
2226 }
2227 _ => panic!("Expected ProcessorError, got: {:?}", err),
2228 }
2229 }
2230
2231 #[tokio::test]
2233 async fn test_container_producer_run_image_from_header() {
2234 let docker = match Docker::connect_with_local_defaults() {
2236 Ok(d) => d,
2237 Err(_) => {
2238 eprintln!("Skipping test: Could not connect to Docker daemon");
2239 return;
2240 }
2241 };
2242
2243 if docker.ping().await.is_err() {
2244 eprintln!("Skipping test: Docker daemon not responding to ping");
2245 return;
2246 }
2247
2248 let component = ContainerComponent::new();
2250 let ctx = NoOpComponentContext;
2251 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2252 let ctx = ProducerContext::new();
2253 let mut producer = endpoint.create_producer(&ctx).unwrap();
2254
2255 let mut exchange = Exchange::new(Message::new(""));
2256 exchange
2257 .input
2258 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
2259 exchange.input.set_header(
2261 HEADER_IMAGE,
2262 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
2263 );
2264
2265 use tower::ServiceExt;
2266 let result = producer.ready().await.unwrap().call(exchange).await;
2267
2268 assert!(
2270 result.is_err(),
2271 "Expected error when running container with nonexistent image"
2272 );
2273 let err = result.unwrap_err();
2274 match &err {
2275 CamelError::ProcessorError(msg) => {
2276 assert!(
2278 msg.to_lowercase().contains("no such image")
2279 || msg.to_lowercase().contains("not found")
2280 || msg.to_lowercase().contains("image")
2281 || msg.to_lowercase().contains("pull")
2282 || msg.contains("404"),
2283 "Error message should indicate image issue, got: {}",
2284 msg
2285 );
2286 }
2287 _ => panic!("Expected ProcessorError, got: {:?}", err),
2288 }
2289 }
2290
2291 #[tokio::test]
2294 async fn test_container_producer_run_alpine_container() {
2295 let docker = match Docker::connect_with_local_defaults() {
2296 Ok(d) => d,
2297 Err(_) => {
2298 eprintln!("Skipping test: Could not connect to Docker daemon");
2299 return;
2300 }
2301 };
2302
2303 if docker.ping().await.is_err() {
2304 eprintln!("Skipping test: Docker daemon not responding to ping");
2305 return;
2306 }
2307
2308 let images = docker.list_images(None::<ListImagesOptions>).await.unwrap();
2310 let has_alpine = images
2311 .iter()
2312 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
2313
2314 if !has_alpine {
2315 eprintln!("Pulling alpine:latest image...");
2316 let mut stream = docker.create_image(
2317 Some(CreateImageOptions {
2318 from_image: Some("alpine:latest".to_string()),
2319 ..Default::default()
2320 }),
2321 None,
2322 None,
2323 );
2324
2325 use futures::StreamExt;
2326 while let Some(_item) = stream.next().await {
2327 }
2329 eprintln!("Image pulled successfully");
2330 }
2331
2332 let component = ContainerComponent::new();
2334 let ctx = NoOpComponentContext;
2335 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2336 let ctx = ProducerContext::new();
2337 let mut producer = endpoint.create_producer(&ctx).unwrap();
2338
2339 let timestamp = std::time::SystemTime::now()
2341 .duration_since(std::time::UNIX_EPOCH)
2342 .unwrap()
2343 .as_millis();
2344 let container_name = format!("test-rust-camel-{}", timestamp);
2345 let mut exchange = Exchange::new(Message::new(""));
2346 exchange.input.set_header(
2347 HEADER_IMAGE,
2348 serde_json::Value::String("alpine:latest".into()),
2349 );
2350 exchange.input.set_header(
2351 HEADER_CONTAINER_NAME,
2352 serde_json::Value::String(container_name.clone()),
2353 );
2354
2355 use tower::ServiceExt;
2356 let result = producer
2357 .ready()
2358 .await
2359 .unwrap()
2360 .call(exchange)
2361 .await
2362 .expect("Container run should succeed");
2363
2364 let container_id = result
2366 .input
2367 .header(HEADER_CONTAINER_ID)
2368 .and_then(|v| v.as_str().map(|s| s.to_string()))
2369 .expect("Expected container ID header");
2370 assert!(!container_id.is_empty(), "Container ID should not be empty");
2371
2372 assert_eq!(
2374 result
2375 .input
2376 .header(HEADER_ACTION_RESULT)
2377 .and_then(|v| v.as_str()),
2378 Some("success")
2379 );
2380
2381 let inspect = docker
2383 .inspect_container(&container_id, None)
2384 .await
2385 .expect("Container should exist");
2386 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
2387
2388 docker
2390 .remove_container(
2391 &container_id,
2392 Some(RemoveContainerOptions {
2393 force: true,
2394 ..Default::default()
2395 }),
2396 )
2397 .await
2398 .ok();
2399
2400 eprintln!("✅ Container {} created and cleaned up", container_id);
2401 }
2402
2403 #[tokio::test]
2405 async fn test_container_consumer_unsupported_operation() {
2406 use tokio::sync::mpsc;
2407
2408 let component = ContainerComponent::new();
2409 let ctx = NoOpComponentContext;
2410 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
2411 let mut consumer = endpoint.create_consumer().unwrap();
2412
2413 let (tx, _rx) = mpsc::channel(16);
2415 let cancel_token = tokio_util::sync::CancellationToken::new();
2416 let context = ConsumerContext::new(tx, cancel_token);
2417
2418 let result = consumer.start(context).await;
2419
2420 assert!(
2422 result.is_err(),
2423 "Expected error for unsupported consumer operation"
2424 );
2425 let err = result.unwrap_err();
2426 match &err {
2427 CamelError::EndpointCreationFailed(msg) => {
2428 assert!(
2429 msg.contains("Consumer only supports 'events' or 'logs'"),
2430 "Error message should mention events or logs support, got: {}",
2431 msg
2432 );
2433 }
2434 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
2435 }
2436 }
2437
2438 #[test]
2439 fn test_container_consumer_concurrency_model_is_concurrent() {
2440 let consumer = ContainerConsumer {
2441 config: ContainerConfig::from_uri("container:events").unwrap(),
2442 };
2443
2444 assert_eq!(
2445 consumer.concurrency_model(),
2446 camel_component_api::ConcurrencyModel::Concurrent { max: None }
2447 );
2448 }
2449
2450 #[tokio::test]
2454 async fn test_container_consumer_cancellation() {
2455 use std::sync::atomic::{AtomicBool, Ordering};
2456 use tokio::sync::mpsc;
2457
2458 let docker = match Docker::connect_with_local_defaults() {
2460 Ok(d) => d,
2461 Err(_) => {
2462 eprintln!("Skipping test: Could not connect to Docker daemon");
2463 return;
2464 }
2465 };
2466
2467 if docker.ping().await.is_err() {
2468 eprintln!("Skipping test: Docker daemon not responding to ping");
2469 return;
2470 }
2471
2472 let component = ContainerComponent::new();
2473 let ctx = NoOpComponentContext;
2474 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
2475 let mut consumer = endpoint.create_consumer().unwrap();
2476
2477 let (tx, _rx) = mpsc::channel(16);
2479 let cancel_token = tokio_util::sync::CancellationToken::new();
2480 let context = ConsumerContext::new(tx, cancel_token.clone());
2481
2482 let completed = Arc::new(AtomicBool::new(false));
2484 let completed_clone = completed.clone();
2485
2486 let handle = tokio::spawn(async move {
2488 let result = consumer.start(context).await;
2489 completed_clone.store(true, Ordering::SeqCst);
2491 result
2492 });
2493
2494 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
2496
2497 assert!(
2499 !completed.load(Ordering::SeqCst),
2500 "Consumer should still be running before cancellation"
2501 );
2502
2503 cancel_token.cancel();
2505
2506 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
2508
2509 assert!(
2511 result.is_ok(),
2512 "Consumer should gracefully shut down after cancellation"
2513 );
2514
2515 assert!(
2517 completed.load(Ordering::SeqCst),
2518 "Consumer should have completed after cancellation"
2519 );
2520 }
2521
2522 #[tokio::test]
2526 async fn test_container_producer_list_containers() {
2527 let docker = match Docker::connect_with_local_defaults() {
2530 Ok(d) => d,
2531 Err(_) => {
2532 eprintln!("Skipping test: Could not connect to Docker daemon");
2533 return;
2534 }
2535 };
2536
2537 if docker.ping().await.is_err() {
2538 eprintln!("Skipping test: Docker daemon not responding to ping");
2539 return;
2540 }
2541
2542 let component = ContainerComponent::new();
2544 let ctx = NoOpComponentContext;
2545 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
2546
2547 let ctx = ProducerContext::new();
2548 let mut producer = endpoint.create_producer(&ctx).unwrap();
2549
2550 let mut exchange = Exchange::new(Message::new(""));
2552 exchange
2553 .input
2554 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
2555
2556 use tower::ServiceExt;
2558 let result = producer
2559 .ready()
2560 .await
2561 .unwrap()
2562 .call(exchange)
2563 .await
2564 .expect("Producer should succeed when Docker is available");
2565
2566 match &result.input.body {
2569 camel_component_api::Body::Json(json_value) => {
2570 assert!(
2571 json_value.is_array(),
2572 "Expected input body to be a JSON array, got: {:?}",
2573 json_value
2574 );
2575 }
2576 other => panic!("Expected Body::Json with array, got: {:?}", other),
2577 }
2578 }
2579
2580 #[test]
2581 fn test_container_config_parses_volumes() {
2582 let config = ContainerConfig::from_uri(
2583 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2584 )
2585 .unwrap();
2586 assert_eq!(
2587 config.volumes.as_deref(),
2588 Some("./html:/usr/share/nginx/html:ro")
2589 );
2590 }
2591
2592 #[test]
2593 fn test_container_config_parses_exec_params() {
2594 let config = ContainerConfig::from_uri(
2595 "container:exec?containerId=my-app&cmd=ls /app&user=root&workdir=/tmp&detach=true",
2596 )
2597 .unwrap();
2598 assert_eq!(config.operation, "exec");
2599 assert_eq!(config.container_id.as_deref(), Some("my-app"));
2600 assert_eq!(config.cmd.as_deref(), Some("ls /app"));
2601 assert_eq!(config.user.as_deref(), Some("root"));
2602 assert_eq!(config.workdir.as_deref(), Some("/tmp"));
2603 assert!(config.detach);
2604 }
2605
2606 #[test]
2607 fn test_container_config_parses_network_create_params() {
2608 let config =
2609 ContainerConfig::from_uri("container:network-create?name=my-net&driver=bridge")
2610 .unwrap();
2611 assert_eq!(config.operation, "network-create");
2612 assert_eq!(config.name.as_deref(), Some("my-net"));
2613 assert_eq!(config.driver.as_deref(), Some("bridge"));
2614 }
2615
2616 #[test]
2617 fn test_container_config_defaults_new_fields() {
2618 let config = ContainerConfig::from_uri("container:list").unwrap();
2619 assert!(config.volumes.is_none());
2620 assert!(config.user.is_none());
2621 assert!(config.workdir.is_none());
2622 assert!(!config.detach);
2623 assert!(config.driver.is_none());
2624 assert!(!config.force);
2625 }
2626
2627 #[test]
2628 fn test_parse_volumes_bind_mount() {
2629 let config = ContainerConfig::from_uri(
2630 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro",
2631 )
2632 .unwrap();
2633 let (binds, anon) = config.parse_volumes().unwrap();
2634 assert_eq!(binds, vec!["./html:/usr/share/nginx/html:ro"]);
2635 assert!(anon.is_empty());
2636 }
2637
2638 #[test]
2639 fn test_parse_volumes_named_volume() {
2640 let config =
2641 ContainerConfig::from_uri("container:run?image=postgres&volumes=data:/var/lib/data")
2642 .unwrap();
2643 let (binds, anon) = config.parse_volumes().unwrap();
2644 assert_eq!(binds, vec!["data:/var/lib/data"]);
2645 assert!(anon.is_empty());
2646 }
2647
2648 #[test]
2649 fn test_parse_volumes_anonymous() {
2650 let config =
2651 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data").unwrap();
2652 let (binds, anon) = config.parse_volumes().unwrap();
2653 assert!(binds.is_empty());
2654 assert!(anon.contains(&"/tmp/app-data".to_string()));
2655 }
2656
2657 #[test]
2658 fn test_parse_volumes_anonymous_with_mode() {
2659 let config =
2660 ContainerConfig::from_uri("container:run?image=alpine&volumes=/tmp/app-data:ro")
2661 .unwrap();
2662 let (binds, anon) = config.parse_volumes().unwrap();
2663 assert!(binds.is_empty());
2664 assert!(anon.contains(&"/tmp/app-data".to_string()));
2665 }
2666
2667 #[test]
2668 fn test_parse_volumes_multiple() {
2669 let config = ContainerConfig::from_uri(
2670 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,data:/var/log/app",
2671 )
2672 .unwrap();
2673 let (binds, anon) = config.parse_volumes().unwrap();
2674 assert_eq!(binds.len(), 2);
2675 assert!(binds.contains(&"./html:/usr/share/nginx/html:ro".to_string()));
2676 assert!(binds.contains(&"data:/var/log/app".to_string()));
2677 assert!(anon.is_empty());
2678 }
2679
2680 #[test]
2681 fn test_parse_volumes_mixed() {
2682 let config = ContainerConfig::from_uri(
2683 "container:run?image=nginx&volumes=./html:/usr/share/nginx/html:ro,/tmp/cache",
2684 )
2685 .unwrap();
2686 let (binds, anon) = config.parse_volumes().unwrap();
2687 assert_eq!(binds.len(), 1);
2688 assert!(anon.contains(&"/tmp/cache".to_string()));
2689 }
2690
2691 #[test]
2692 fn test_parse_volumes_none() {
2693 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
2694 assert!(config.parse_volumes().is_none());
2695 }
2696
2697 #[test]
2698 fn test_parse_volumes_empty_entry_skipped() {
2699 let config = ContainerConfig::from_uri("container:run?image=nginx&volumes=,,").unwrap();
2700 assert!(config.parse_volumes().is_none());
2701 }
2702
2703 #[test]
2704 fn test_parse_volumes_rw_mode() {
2705 let config =
2706 ContainerConfig::from_uri("container:run?image=nginx&volumes=./data:/app/data:rw")
2707 .unwrap();
2708 let (binds, _) = config.parse_volumes().unwrap();
2709 assert_eq!(binds, vec!["./data:/app/data:rw"]);
2710 }
2711
2712 #[test]
2713 fn test_container_config_from_uri_parses_false_flags() {
2714 let config = ContainerConfig::from_uri(
2715 "container:logs?containerId=a&follow=false×tamps=FALSE&autoPull=false&autoRemove=False&detach=TRUE&force=true",
2716 )
2717 .unwrap();
2718 assert!(!config.follow);
2719 assert!(!config.timestamps);
2720 assert!(!config.auto_pull);
2721 assert!(!config.auto_remove);
2722 assert!(config.detach);
2723 assert!(config.force);
2724 }
2725
2726 #[test]
2727 fn test_docker_socket_path_validation() {
2728 let unix_cfg =
2729 ContainerConfig::from_uri("container:list?host=unix:///tmp/docker.sock").unwrap();
2730 assert_eq!(
2731 unix_cfg.docker_socket_path().unwrap(),
2732 "unix:///tmp/docker.sock"
2733 );
2734
2735 let npipe_cfg =
2736 ContainerConfig::from_uri("container:list?host=npipe:////./pipe/docker_engine")
2737 .unwrap();
2738 assert_eq!(
2739 npipe_cfg.docker_socket_path().unwrap(),
2740 "npipe:////./pipe/docker_engine"
2741 );
2742
2743 let plain_cfg =
2744 ContainerConfig::from_uri("container:list?host=/var/run/docker.sock").unwrap();
2745 assert_eq!(
2746 plain_cfg.docker_socket_path().unwrap(),
2747 "/var/run/docker.sock"
2748 );
2749
2750 let bad_cfg =
2751 ContainerConfig::from_uri("container:list?host=http://localhost:2375").unwrap();
2752 assert!(bad_cfg.docker_socket_path().is_err());
2753 }
2754
2755 #[test]
2756 fn test_parse_ports_invalid_and_whitespace_entries() {
2757 let cfg = ContainerConfig::from_uri("container:run?ports= , ,8080").unwrap();
2759 let err = cfg.parse_ports().unwrap_err();
2760 assert!(
2761 err.to_string().contains("malformed port mapping"),
2762 "expected malformed port error, got: {}",
2763 err
2764 );
2765
2766 let cfg =
2767 ContainerConfig::from_uri("container:run?ports= 8080:80 , 5353:53/udp ").unwrap();
2768 let (exposed, bindings) = cfg.parse_ports().unwrap();
2769 assert!(exposed.contains(&"80/tcp".to_string()));
2770 assert!(exposed.contains(&"53/udp".to_string()));
2771 assert_eq!(bindings.len(), 2);
2772 }
2773
2774 #[test]
2775 fn test_parse_ports_fail_fast_on_malformed() {
2776 let cfg = ContainerConfig::from_uri("container:run?ports=8080:80,badentry").unwrap();
2778 let err = cfg.parse_ports().unwrap_err();
2779 assert!(
2780 err.to_string()
2781 .contains("malformed port mapping 'badentry'"),
2782 "expected fail-fast on 'badentry', got: {}",
2783 err
2784 );
2785 }
2786
2787 #[test]
2788 fn test_parse_env_trims_and_filters_empty_items() {
2789 let cfg = ContainerConfig::from_uri("container:run?env= FOO=bar , ,BAZ=qux, ").unwrap();
2790 let env = cfg.parse_env().unwrap();
2791 assert_eq!(env, vec!["FOO=bar".to_string(), "BAZ=qux".to_string()]);
2792
2793 let cfg = ContainerConfig::from_uri("container:run?env= , , ").unwrap();
2794 assert!(cfg.parse_env().is_none());
2795 }
2796
2797 #[test]
2798 fn test_parse_volume_str_rejects_invalid_mode_and_accepts_mixed() {
2799 assert!(parse_volume_str("/host:/ctr:badmode").is_none());
2800 let (binds, anon) = parse_volume_str("a:/b:rw,/tmp/cache,/tmp/logs:ro").unwrap();
2801 assert!(binds.contains(&"a:/b:rw".to_string()));
2802 assert!(anon.contains(&"/tmp/cache".to_string()));
2803 assert!(anon.contains(&"/tmp/logs".to_string()));
2804 }
2805
2806 #[test]
2807 fn test_format_docker_event_variants_and_timestamp_extraction() {
2808 let mut attrs = std::collections::HashMap::new();
2809 attrs.insert("name".to_string(), "demo".to_string());
2810 attrs.insert("image".to_string(), "alpine:latest".to_string());
2811 attrs.insert("exitCode".to_string(), "137".to_string());
2812 let actor = bollard::models::EventActor {
2813 id: None,
2814 attributes: Some(attrs),
2815 };
2816
2817 let create_event = bollard::models::EventMessage {
2818 action: Some("create".to_string()),
2819 actor: Some(actor.clone()),
2820 ..Default::default()
2821 };
2822 assert_eq!(
2823 format_docker_event(&create_event),
2824 "[CREATE] Container demo (alpine:latest)"
2825 );
2826
2827 let die_event = bollard::models::EventMessage {
2828 action: Some("die".to_string()),
2829 actor: Some(actor),
2830 ..Default::default()
2831 };
2832 assert_eq!(
2833 format_docker_event(&die_event),
2834 "[DIE] Container demo (exit: 137)"
2835 );
2836
2837 let other_event = bollard::models::EventMessage {
2838 action: Some("oom".to_string()),
2839 actor: None,
2840 ..Default::default()
2841 };
2842 assert_eq!(format_docker_event(&other_event), "[OOM] Container unknown");
2843
2844 assert_eq!(
2845 extract_timestamp("2024-01-01T00:00:00Z hello"),
2846 Some("2024-01-01T00:00:00Z".to_string())
2847 );
2848 assert_eq!(extract_timestamp("hello world"), None);
2849 }
2850
2851 #[tokio::test]
2852 async fn test_run_container_with_cleanup_error_paths() {
2853 let create_fail = run_container_with_cleanup(
2854 || async { Err(CamelError::ProcessorError("create-fail".to_string())) },
2855 |_id| async move { Ok(()) },
2856 |_id| async move { Ok(()) },
2857 )
2858 .await;
2859 assert!(
2860 matches!(create_fail, Err(CamelError::ProcessorError(msg)) if msg == "create-fail")
2861 );
2862
2863 let cleanup_fail = run_container_with_cleanup(
2864 || async { Ok("cid-1".to_string()) },
2865 |_id| async move { Err(CamelError::ProcessorError("start-fail".to_string())) },
2866 |_id| async move { Err(CamelError::ProcessorError("remove-fail".to_string())) },
2867 )
2868 .await;
2869 match cleanup_fail {
2870 Err(CamelError::ProcessorError(msg)) => {
2871 assert!(msg.contains("Failed to start container"));
2872 assert!(msg.contains("Cleanup failed"));
2873 }
2874 other => panic!("unexpected result: {:?}", other),
2875 }
2876 }
2877
2878 #[tokio::test]
2879 async fn test_container_producer_network_lifecycle() {
2880 let docker = match Docker::connect_with_local_defaults() {
2881 Ok(d) => d,
2882 Err(_) => {
2883 eprintln!("Skipping test: Could not connect to Docker daemon");
2884 return;
2885 }
2886 };
2887 if docker.ping().await.is_err() {
2888 eprintln!("Skipping test: Docker daemon not responding to ping");
2889 return;
2890 }
2891
2892 let network_name = format!("camel-test-{}", std::process::id());
2893
2894 let component = ContainerComponent::new();
2895 let component_ctx = NoOpComponentContext;
2896
2897 let endpoint = component
2899 .create_endpoint(
2900 &format!("container:network-create?name={}", network_name),
2901 &component_ctx,
2902 )
2903 .unwrap();
2904 let ctx = ProducerContext::new();
2905 let mut producer = endpoint.create_producer(&ctx).unwrap();
2906
2907 let mut exchange = Exchange::new(Message::new(""));
2908 exchange.input.set_header(
2909 HEADER_ACTION,
2910 serde_json::Value::String("network-create".into()),
2911 );
2912
2913 use tower::ServiceExt;
2914 let result = producer
2915 .ready()
2916 .await
2917 .unwrap()
2918 .call(exchange)
2919 .await
2920 .expect("Network create should succeed");
2921
2922 let network_id = result
2923 .input
2924 .header(HEADER_NETWORK)
2925 .and_then(|v| v.as_str().map(|s| s.to_string()))
2926 .expect("Should have network ID");
2927
2928 assert!(!network_id.is_empty());
2929
2930 let endpoint = component
2932 .create_endpoint("container:network-list", &component_ctx)
2933 .unwrap();
2934 let mut producer = endpoint.create_producer(&ctx).unwrap();
2935
2936 let mut exchange = Exchange::new(Message::new(""));
2937 exchange.input.set_header(
2938 HEADER_ACTION,
2939 serde_json::Value::String("network-list".into()),
2940 );
2941
2942 let list_result = producer
2943 .ready()
2944 .await
2945 .unwrap()
2946 .call(exchange)
2947 .await
2948 .expect("Network list should succeed");
2949
2950 match &list_result.input.body {
2951 Body::Json(json_value) => {
2952 assert!(json_value.is_array(), "Expected JSON array");
2953 }
2954 other => panic!("Expected Body::Json, got: {:?}", other),
2955 }
2956
2957 let endpoint = component
2959 .create_endpoint(
2960 &format!("container:network-remove?network={}", network_name),
2961 &component_ctx,
2962 )
2963 .unwrap();
2964 let mut producer = endpoint.create_producer(&ctx).unwrap();
2965
2966 let mut exchange = Exchange::new(Message::new(""));
2967 exchange.input.set_header(
2968 HEADER_ACTION,
2969 serde_json::Value::String("network-remove".into()),
2970 );
2971
2972 let remove_result = producer
2973 .ready()
2974 .await
2975 .unwrap()
2976 .call(exchange)
2977 .await
2978 .expect("Network remove should succeed");
2979
2980 let action_result = remove_result
2981 .input
2982 .header(HEADER_ACTION_RESULT)
2983 .and_then(|v| v.as_str());
2984 assert_eq!(action_result, Some("success"));
2985 }
2986
2987 #[tokio::test]
2988 async fn test_logs_consumer_requires_container_id() {
2989 use tokio::sync::mpsc;
2990
2991 let mut consumer = ContainerConsumer {
2992 config: ContainerConfig::from_uri("container:logs").unwrap(),
2993 };
2994 let (tx, _rx) = mpsc::channel(4);
2995 let context = ConsumerContext::new(tx, tokio_util::sync::CancellationToken::new());
2996
2997 let err = consumer.start(context).await.unwrap_err();
2998 match err {
2999 CamelError::EndpointCreationFailed(msg) => {
3000 assert!(msg.contains("containerId is required for logs consumer"));
3001 }
3002 other => panic!("unexpected error: {:?}", other),
3003 }
3004 }
3005
3006 #[tokio::test]
3007 async fn test_events_consumer_stops_immediately_when_cancelled() {
3008 use tokio::sync::mpsc;
3009
3010 let mut consumer = ContainerConsumer {
3011 config: ContainerConfig::from_uri("container:events").unwrap(),
3012 };
3013
3014 let (tx, _rx) = mpsc::channel(4);
3015 let cancel = tokio_util::sync::CancellationToken::new();
3016 cancel.cancel();
3017 let context = ConsumerContext::new(tx, cancel);
3018
3019 let result = consumer.start(context).await;
3020 assert!(result.is_ok());
3021 }
3022
3023 #[test]
3024 fn test_global_config_constructors_and_endpoint_docker_host() {
3025 let global = ContainerGlobalConfig::new().with_docker_host("unix:///tmp/docker.sock");
3026 let mut cfg = ContainerConfig::from_uri("container:list").unwrap();
3027 cfg.apply_global_defaults(&global);
3028
3029 let endpoint = ContainerEndpoint {
3030 uri: "container:list".to_string(),
3031 config: cfg,
3032 };
3033
3034 assert_eq!(endpoint.docker_host(), Some("unix:///tmp/docker.sock"));
3035
3036 let component = ContainerComponent::with_config(global);
3037 assert_eq!(component.scheme(), "container");
3038 }
3039}