1pub mod bundle;
7
8pub use bundle::ContainerBundle;
9
10use std::collections::{HashMap, HashSet};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14use std::task::{Context, Poll};
15
16use async_trait::async_trait;
17use bollard::Docker;
18use bollard::service::{HostConfig, PortBinding};
19use camel_component_api::parse_uri;
20use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, Message};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tower::Service;
23
24static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
27 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
28
29fn track_container(id: String) {
31 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
32 tracker.insert(id);
33 }
34}
35
36fn untrack_container(id: &str) {
38 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
39 tracker.remove(id);
40 }
41}
42
43pub async fn cleanup_tracked_containers() {
45 let ids: Vec<String> = {
46 match CONTAINER_TRACKER.lock() {
47 Ok(tracker) => tracker.iter().cloned().collect(),
48 Err(_) => return,
49 }
50 };
51
52 if ids.is_empty() {
53 return;
54 }
55
56 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
57
58 let docker = match Docker::connect_with_local_defaults() {
59 Ok(d) => d,
60 Err(e) => {
61 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
62 return;
63 }
64 };
65
66 for id in ids {
67 match docker
68 .remove_container(
69 &id,
70 Some(bollard::container::RemoveContainerOptions {
71 force: true,
72 ..Default::default()
73 }),
74 )
75 .await
76 {
77 Ok(_) => {
78 tracing::debug!("Cleaned up container {}", id);
79 untrack_container(&id);
80 }
81 Err(e) => {
82 tracing::warn!("Failed to cleanup container {}: {}", id, e);
83 }
84 }
85 }
86}
87
88const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
92
93pub const HEADER_ACTION: &str = "CamelContainerAction";
95
96pub const HEADER_IMAGE: &str = "CamelContainerImage";
98
99pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
101
102pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
104
105pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
107
108pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
110
111pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
113
114#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
122#[serde(default)]
123pub struct ContainerGlobalConfig {
124 pub docker_host: String,
126}
127
128impl Default for ContainerGlobalConfig {
129 fn default() -> Self {
130 Self {
131 docker_host: "unix:///var/run/docker.sock".to_string(),
132 }
133 }
134}
135
136impl ContainerGlobalConfig {
137 pub fn new() -> Self {
138 Self::default()
139 }
140
141 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
142 self.docker_host = v.into();
143 self
144 }
145}
146
147#[derive(Debug, Clone)]
156pub struct ContainerConfig {
157 pub operation: String,
159 pub image: Option<String>,
161 pub name: Option<String>,
163 pub host: Option<String>,
165 pub cmd: Option<String>,
167 pub ports: Option<String>,
169 pub env: Option<String>,
171 pub network: Option<String>,
173 pub container_id: Option<String>,
175 pub follow: bool,
177 pub timestamps: bool,
179 pub tail: Option<String>,
181 pub auto_pull: bool,
183 pub auto_remove: bool,
185}
186
187impl ContainerConfig {
188 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
196 let parts = parse_uri(uri)?;
197 if parts.scheme != "container" {
198 return Err(CamelError::InvalidUri(format!(
199 "expected scheme 'container', got '{}'",
200 parts.scheme
201 )));
202 }
203
204 let image = parts.params.get("image").cloned();
205 let name = parts.params.get("name").cloned();
206 let cmd = parts.params.get("cmd").cloned();
207 let ports = parts.params.get("ports").cloned();
208 let env = parts.params.get("env").cloned();
209 let network = parts.params.get("network").cloned();
210 let container_id = parts.params.get("containerId").cloned();
211 let follow = parts
212 .params
213 .get("follow")
214 .map(|v| v.eq_ignore_ascii_case("true"))
215 .unwrap_or(true);
216 let timestamps = parts
217 .params
218 .get("timestamps")
219 .map(|v| v.eq_ignore_ascii_case("true"))
220 .unwrap_or(false);
221 let tail = parts.params.get("tail").cloned();
222 let auto_pull = parts
223 .params
224 .get("autoPull")
225 .map(|v| v.eq_ignore_ascii_case("true"))
226 .unwrap_or(true);
227 let auto_remove = parts
228 .params
229 .get("autoRemove")
230 .map(|v| v.eq_ignore_ascii_case("true"))
231 .unwrap_or(true);
232 let host = parts.params.get("host").cloned();
234
235 Ok(Self {
236 operation: parts.path,
237 image,
238 name,
239 host,
240 cmd,
241 ports,
242 env,
243 network,
244 container_id,
245 follow,
246 timestamps,
247 tail,
248 auto_pull,
249 auto_remove,
250 })
251 }
252
253 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
256 if self.host.is_none() {
257 self.host = Some(global.docker_host.clone());
258 }
259 }
260
261 fn docker_socket_path(&self) -> Result<&str, CamelError> {
262 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
263 "npipe:////./pipe/docker_engine"
264 } else {
265 "unix:///var/run/docker.sock"
266 });
267
268 if host.starts_with("unix://") || host.starts_with("npipe://") {
269 return Ok(host);
270 }
271
272 if host.contains("://") {
273 return Err(CamelError::ProcessorError(format!(
274 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
275 host
276 )));
277 }
278
279 Ok(host)
280 }
281
282 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
283 let socket_path = self.docker_socket_path()?;
284 Docker::connect_with_socket(
285 socket_path,
286 DOCKER_CONNECT_TIMEOUT_SECS,
287 bollard::API_DEFAULT_VERSION,
288 )
289 .map_err(|e| {
290 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
291 })
292 }
293
294 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
302 let docker = self.connect_docker_client()?;
303 docker
304 .ping()
305 .await
306 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
307 Ok(docker)
308 }
309
310 #[allow(clippy::type_complexity)]
311 fn parse_ports(
312 &self,
313 ) -> Option<(
314 HashMap<String, HashMap<(), ()>>,
315 HashMap<String, Option<Vec<PortBinding>>>,
316 )> {
317 let ports_str = self.ports.as_ref()?;
318
319 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
320 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
321
322 for mapping in ports_str.split(',') {
323 let mapping = mapping.trim();
324 if mapping.is_empty() {
325 continue;
326 }
327
328 let (host_port, container_spec) = mapping.split_once(':')?;
329
330 let (container_port, protocol) = if container_spec.contains('/') {
331 let parts: Vec<&str> = container_spec.split('/').collect();
332 (parts[0], parts[1])
333 } else {
334 (container_spec, "tcp")
335 };
336
337 let container_key = format!("{}/{}", container_port, protocol);
338
339 exposed_ports.insert(container_key.clone(), HashMap::new());
340
341 port_bindings.insert(
342 container_key,
343 Some(vec![PortBinding {
344 host_ip: None,
345 host_port: Some(host_port.to_string()),
346 }]),
347 );
348 }
349
350 if exposed_ports.is_empty() {
351 None
352 } else {
353 Some((exposed_ports, port_bindings))
354 }
355 }
356
357 fn parse_env(&self) -> Option<Vec<String>> {
358 let env_str = self.env.as_ref()?;
359
360 let env_vars: Vec<String> = env_str
361 .split(',')
362 .map(|s| s.trim().to_string())
363 .filter(|s| !s.is_empty())
364 .collect();
365
366 if env_vars.is_empty() {
367 None
368 } else {
369 Some(env_vars)
370 }
371 }
372}
373
374#[derive(Debug, Clone, Copy, PartialEq, Eq)]
375enum ProducerOperation {
376 List,
377 Run,
378 Start,
379 Stop,
380 Remove,
381}
382
383fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
384 match operation {
385 "list" => Ok(ProducerOperation::List),
386 "run" => Ok(ProducerOperation::Run),
387 "start" => Ok(ProducerOperation::Start),
388 "stop" => Ok(ProducerOperation::Stop),
389 "remove" => Ok(ProducerOperation::Remove),
390 _ => Err(CamelError::ProcessorError(format!(
391 "Unknown container operation: {}",
392 operation
393 ))),
394 }
395}
396
397fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
398 exchange
399 .input
400 .header(HEADER_CONTAINER_NAME)
401 .and_then(|v| v.as_str().map(|s| s.to_string()))
402 .or_else(|| config.name.clone())
403}
404
405async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
406 let images = docker
407 .list_images::<&str>(None)
408 .await
409 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
410
411 Ok(images.iter().any(|img| {
412 img.repo_tags
413 .iter()
414 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
415 }))
416}
417
418async fn pull_image_with_progress(
419 docker: &Docker,
420 image: &str,
421 timeout_secs: u64,
422) -> Result<(), CamelError> {
423 use futures::StreamExt;
424
425 tracing::info!("Pulling image: {}", image);
426
427 let mut stream = docker.create_image(
428 Some(bollard::image::CreateImageOptions {
429 from_image: image,
430 ..Default::default()
431 }),
432 None,
433 None,
434 );
435
436 let start = std::time::Instant::now();
437 let mut last_progress = std::time::Instant::now();
438
439 while let Some(item) = stream.next().await {
440 if start.elapsed().as_secs() > timeout_secs {
441 return Err(CamelError::ProcessorError(format!(
442 "Image pull timeout after {}s. Try manually: docker pull {}",
443 timeout_secs, image
444 )));
445 }
446
447 match item {
448 Ok(update) => {
449 if last_progress.elapsed().as_secs() >= 2 {
451 if let Some(status) = update.status {
452 tracing::debug!("Pull progress: {}", status);
453 }
454 last_progress = std::time::Instant::now();
455 }
456 }
457 Err(e) => {
458 let err_str = e.to_string().to_lowercase();
459 if err_str.contains("unauthorized") || err_str.contains("401") {
460 return Err(CamelError::ProcessorError(format!(
461 "Authentication required for image '{}'. Configure Docker credentials: docker login",
462 image
463 )));
464 }
465 if err_str.contains("not found") || err_str.contains("404") {
466 return Err(CamelError::ProcessorError(format!(
467 "Image '{}' not found in registry. Check the image name and tag",
468 image
469 )));
470 }
471 return Err(CamelError::ProcessorError(format!(
472 "Failed to pull image '{}': {}",
473 image, e
474 )));
475 }
476 }
477 }
478
479 tracing::info!("Successfully pulled image: {}", image);
480 Ok(())
481}
482
483async fn ensure_image_available(
484 docker: &Docker,
485 image: &str,
486 auto_pull: bool,
487 timeout_secs: u64,
488) -> Result<(), CamelError> {
489 if image_exists_locally(docker, image).await? {
490 tracing::debug!("Image '{}' already available locally", image);
491 return Ok(());
492 }
493
494 if !auto_pull {
495 return Err(CamelError::ProcessorError(format!(
496 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
497 image, image
498 )));
499 }
500
501 pull_image_with_progress(docker, image, timeout_secs).await
502}
503
504fn format_docker_event(event: &bollard::models::EventMessage) -> String {
505 let action = event.action.as_deref().unwrap_or("unknown");
506 let actor = event.actor.as_ref();
507
508 let container_name = actor
509 .and_then(|a| a.attributes.as_ref())
510 .and_then(|attrs| attrs.get("name"))
511 .map(|s| s.as_str())
512 .unwrap_or("unknown");
513
514 let image = actor
515 .and_then(|a| a.attributes.as_ref())
516 .and_then(|attrs| attrs.get("image"))
517 .map(|s| s.as_str())
518 .unwrap_or("");
519
520 let exit_code = actor
521 .and_then(|a| a.attributes.as_ref())
522 .and_then(|attrs| attrs.get("exitCode"))
523 .map(|s| s.as_str());
524
525 match action {
526 "create" => {
527 if image.is_empty() {
528 format!("[CREATE] Container {}", container_name)
529 } else {
530 format!("[CREATE] Container {} ({})", container_name, image)
531 }
532 }
533 "start" => format!("[START] Container {}", container_name),
534 "die" => {
535 if let Some(code) = exit_code {
536 format!("[DIE] Container {} (exit: {})", container_name, code)
537 } else {
538 format!("[DIE] Container {}", container_name)
539 }
540 }
541 "destroy" => format!("[DESTROY] Container {}", container_name),
542 "stop" => format!("[STOP] Container {}", container_name),
543 "pause" => format!("[PAUSE] Container {}", container_name),
544 "unpause" => format!("[UNPAUSE] Container {}", container_name),
545 "restart" => format!("[RESTART] Container {}", container_name),
546 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
547 }
548}
549
550async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
551 create: CreateFn,
552 start: StartFn,
553 remove: RemoveFn,
554) -> Result<String, CamelError>
555where
556 CreateFn: FnOnce() -> CreateFut,
557 CreateFut: Future<Output = Result<String, CamelError>>,
558 StartFn: FnOnce(String) -> StartFut,
559 StartFut: Future<Output = Result<(), CamelError>>,
560 RemoveFn: FnOnce(String) -> RemoveFut,
561 RemoveFut: Future<Output = Result<(), CamelError>>,
562{
563 let container_id = create().await?;
564 if let Err(start_err) = start(container_id.clone()).await {
565 if let Err(remove_err) = remove(container_id.clone()).await {
566 return Err(CamelError::ProcessorError(format!(
567 "Failed to start container: {}. Cleanup failed: {}",
568 start_err, remove_err
569 )));
570 }
571 return Err(start_err);
572 }
573
574 Ok(container_id)
575}
576
577#[derive(Clone)]
582pub struct ContainerProducer {
583 config: ContainerConfig,
584 docker: Docker,
585}
586
587impl Service<Exchange> for ContainerProducer {
588 type Response = Exchange;
589 type Error = CamelError;
590 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
591
592 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
593 Poll::Ready(Ok(()))
594 }
595
596 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
597 let config = self.config.clone();
598 let docker = self.docker.clone();
599 Box::pin(async move {
600 let operation_name = exchange
602 .input
603 .header(HEADER_ACTION)
604 .and_then(|v| v.as_str().map(|s| s.to_string()))
605 .unwrap_or_else(|| config.operation.clone());
606
607 let operation = parse_producer_operation(&operation_name)?;
608
609 match operation {
611 ProducerOperation::List => {
612 let containers = docker.list_containers::<String>(None).await.map_err(|e| {
613 CamelError::ProcessorError(format!("Failed to list containers: {}", e))
614 })?;
615
616 let json_value = serde_json::to_value(&containers).map_err(|e| {
617 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
618 })?;
619
620 exchange.input.body = Body::Json(json_value);
621 exchange.input.set_header(
622 HEADER_ACTION_RESULT,
623 serde_json::Value::String("success".to_string()),
624 );
625 }
626 ProducerOperation::Run => {
627 let image = exchange
629 .input
630 .header(HEADER_IMAGE)
631 .and_then(|v| v.as_str().map(|s| s.to_string()))
632 .or(config.image.clone())
633 .ok_or_else(|| {
634 CamelError::ProcessorError(
635 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
636 )
637 })?;
638
639 let pull_timeout = 300; ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
642 .await
643 .map_err(|e| {
644 CamelError::ProcessorError(format!(
645 "Image '{}' not available: {}",
646 image, e
647 ))
648 })?;
649
650 let container_name = resolve_container_name(&exchange, &config);
651 let container_name_ref = container_name.as_deref().unwrap_or("");
652 let cmd_parts: Option<Vec<String>> = config
653 .cmd
654 .as_ref()
655 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
656 let auto_remove = config.auto_remove;
657 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
658 let env_vars = config.parse_env();
659 let network_mode = config.network.clone();
660
661 let docker_create = docker.clone();
662 let docker_start = docker.clone();
663 let docker_remove = docker.clone();
664
665 let container_id = run_container_with_cleanup(
666 move || async move {
667 let create_options = bollard::container::CreateContainerOptions {
668 name: container_name_ref,
669 ..Default::default()
670 };
671 let container_config = bollard::container::Config::<String> {
672 image: Some(image.clone()),
673 cmd: cmd_parts,
674 env: env_vars,
675 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
676 host_config: Some(HostConfig {
677 auto_remove: Some(auto_remove),
678 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
679 network_mode,
680 ..Default::default()
681 }),
682 ..Default::default()
683 };
684
685 let create_response = docker_create
686 .create_container(Some(create_options), container_config)
687 .await
688 .map_err(|e| {
689 let err_str = e.to_string().to_lowercase();
690 if err_str.contains("409") || err_str.contains("conflict") {
691 CamelError::ProcessorError(format!(
692 "Container name '{}' already exists. Use a unique name or remove the existing container first",
693 container_name_ref
694 ))
695 } else {
696 CamelError::ProcessorError(format!(
697 "Failed to create container: {}",
698 e
699 ))
700 }
701 })?;
702
703 Ok(create_response.id)
704 },
705 move |container_id| async move {
706 docker_start
707 .start_container::<String>(&container_id, None)
708 .await
709 .map_err(|e| {
710 CamelError::ProcessorError(format!(
711 "Failed to start container: {}",
712 e
713 ))
714 })
715 },
716 move |container_id| async move {
717 docker_remove
718 .remove_container(&container_id, None)
719 .await
720 .map_err(|e| {
721 CamelError::ProcessorError(format!(
722 "Failed to remove container after start failure: {}",
723 e
724 ))
725 })
726 },
727 )
728 .await?;
729
730 track_container(container_id.clone());
731
732 exchange
733 .input
734 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
735 exchange.input.set_header(
736 HEADER_ACTION_RESULT,
737 serde_json::Value::String("success".to_string()),
738 );
739 }
740 ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
741 let container_id = exchange
743 .input
744 .header(HEADER_CONTAINER_ID)
745 .and_then(|v| v.as_str().map(|s| s.to_string()))
746 .ok_or_else(|| {
747 CamelError::ProcessorError(format!(
748 "{} header is required for {} operation",
749 HEADER_CONTAINER_ID, operation_name
750 ))
751 })?;
752
753 match operation {
754 ProducerOperation::Start => {
755 docker
756 .start_container::<String>(&container_id, None)
757 .await
758 .map_err(|e| {
759 CamelError::ProcessorError(format!(
760 "Failed to start container: {}",
761 e
762 ))
763 })?;
764 }
765 ProducerOperation::Stop => {
766 docker
767 .stop_container(&container_id, None)
768 .await
769 .map_err(|e| {
770 CamelError::ProcessorError(format!(
771 "Failed to stop container: {}",
772 e
773 ))
774 })?;
775 }
776 ProducerOperation::Remove => {
777 docker
778 .remove_container(&container_id, None)
779 .await
780 .map_err(|e| {
781 CamelError::ProcessorError(format!(
782 "Failed to remove container: {}",
783 e
784 ))
785 })?;
786 untrack_container(&container_id);
787 }
788 _ => {}
789 }
790
791 exchange.input.set_header(
793 HEADER_ACTION_RESULT,
794 serde_json::Value::String("success".to_string()),
795 );
796 }
797 }
798
799 Ok(exchange)
800 })
801 }
802}
803
804pub struct ContainerConsumer {
809 config: ContainerConfig,
810}
811
812#[async_trait]
813impl Consumer for ContainerConsumer {
814 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
815 match self.config.operation.as_str() {
816 "events" => self.start_events_consumer(context).await,
817 "logs" => self.start_logs_consumer(context).await,
818 _ => Err(CamelError::EndpointCreationFailed(format!(
819 "Consumer only supports 'events' or 'logs' operations, got '{}'",
820 self.config.operation
821 ))),
822 }
823 }
824
825 async fn stop(&mut self) -> Result<(), CamelError> {
826 Ok(())
827 }
828
829 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
830 camel_component_api::ConcurrencyModel::Concurrent { max: None }
831 }
832}
833
834impl ContainerConsumer {
835 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
836 use futures::StreamExt;
837
838 loop {
839 if context.is_cancelled() {
840 tracing::info!("Container events consumer shutting down");
841 return Ok(());
842 }
843
844 let docker = match self.config.connect_docker().await {
845 Ok(d) => d,
846 Err(e) => {
847 tracing::error!(
848 "Consumer failed to connect to docker: {}. Retrying in 5s...",
849 e
850 );
851 tokio::select! {
852 _ = context.cancelled() => {
853 tracing::info!("Container events consumer shutting down");
854 return Ok(());
855 }
856 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
857 }
858 continue;
859 }
860 };
861
862 let mut event_stream = docker.events::<String>(None);
863
864 loop {
865 tokio::select! {
866 _ = context.cancelled() => {
867 tracing::info!("Container events consumer shutting down");
868 return Ok(());
869 }
870
871 msg = event_stream.next() => {
872 match msg {
873 Some(Ok(event)) => {
874 let formatted = format_docker_event(&event);
875 let message = Message::new(Body::Text(formatted));
876 let exchange = Exchange::new(message);
877
878 if let Err(e) = context.send(exchange).await {
879 tracing::error!("Failed to send exchange: {:?}", e);
880 break;
881 }
882 }
883 Some(Err(e)) => {
884 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
885 break;
886 }
887 None => {
888 tracing::info!("Docker event stream ended. Reconnecting...");
889 break;
890 }
891 }
892 }
893 }
894 }
895
896 tokio::select! {
897 _ = context.cancelled() => {
898 tracing::info!("Container events consumer shutting down");
899 return Ok(());
900 }
901 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
902 }
903 }
904 }
905
906 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
907 use futures::StreamExt;
908
909 let container_id = self.config.container_id.clone().ok_or_else(|| {
910 CamelError::EndpointCreationFailed(
911 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
912 .to_string(),
913 )
914 })?;
915
916 loop {
917 if context.is_cancelled() {
918 tracing::info!("Container logs consumer shutting down");
919 return Ok(());
920 }
921
922 let docker = match self.config.connect_docker().await {
923 Ok(d) => d,
924 Err(e) => {
925 tracing::error!(
926 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
927 e
928 );
929 tokio::select! {
930 _ = context.cancelled() => {
931 tracing::info!("Container logs consumer shutting down");
932 return Ok(());
933 }
934 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
935 }
936 continue;
937 }
938 };
939
940 let tail = self
941 .config
942 .tail
943 .clone()
944 .unwrap_or_else(|| "all".to_string());
945
946 let options = bollard::container::LogsOptions::<String> {
947 follow: self.config.follow,
948 stdout: true,
949 stderr: true,
950 timestamps: self.config.timestamps,
951 tail,
952 ..Default::default()
953 };
954
955 let mut log_stream = docker.logs(&container_id, Some(options));
956 let container_id_header = container_id.clone();
957
958 loop {
959 tokio::select! {
960 _ = context.cancelled() => {
961 tracing::info!("Container logs consumer shutting down");
962 return Ok(());
963 }
964
965 msg = log_stream.next() => {
966 match msg {
967 Some(Ok(log_output)) => {
968 let (stream_type, content) = match log_output {
969 bollard::container::LogOutput::StdOut { message } => {
970 ("stdout", String::from_utf8_lossy(&message).into_owned())
971 }
972 bollard::container::LogOutput::StdErr { message } => {
973 ("stderr", String::from_utf8_lossy(&message).into_owned())
974 }
975 bollard::container::LogOutput::Console { message } => {
976 ("console", String::from_utf8_lossy(&message).into_owned())
977 }
978 bollard::container::LogOutput::StdIn { message } => {
979 ("stdin", String::from_utf8_lossy(&message).into_owned())
980 }
981 };
982
983 let content = content.trim_end();
984 if content.is_empty() {
985 continue;
986 }
987
988 let mut message = Message::new(Body::Text(content.to_string()));
989 message.set_header(
990 HEADER_CONTAINER_ID,
991 serde_json::Value::String(container_id_header.clone()),
992 );
993 message.set_header(
994 HEADER_LOG_STREAM,
995 serde_json::Value::String(stream_type.to_string()),
996 );
997
998 if self.config.timestamps
999 && let Some(ts) = extract_timestamp(content) {
1000 message.set_header(
1001 HEADER_LOG_TIMESTAMP,
1002 serde_json::Value::String(ts),
1003 );
1004 }
1005
1006 let exchange = Exchange::new(message);
1007
1008 if let Err(e) = context.send(exchange).await {
1009 tracing::error!("Failed to send log exchange: {:?}", e);
1010 break;
1011 }
1012 }
1013 Some(Err(e)) => {
1014 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1015 break;
1016 }
1017 None => {
1018 if self.config.follow {
1019 tracing::info!("Docker log stream ended. Reconnecting...");
1020 break;
1021 } else {
1022 tracing::info!("Container logs consumer finished (follow=false)");
1023 return Ok(());
1024 }
1025 }
1026 }
1027 }
1028 }
1029 }
1030
1031 tokio::select! {
1032 _ = context.cancelled() => {
1033 tracing::info!("Container logs consumer shutting down");
1034 return Ok(());
1035 }
1036 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1037 }
1038 }
1039 }
1040}
1041
1042fn extract_timestamp(log_line: &str) -> Option<String> {
1043 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1044 if parts.len() > 1 && parts[0].contains('T') {
1045 Some(parts[0].to_string())
1046 } else {
1047 None
1048 }
1049}
1050
1051pub struct ContainerComponent {
1059 config: Option<ContainerGlobalConfig>,
1060}
1061
1062impl ContainerComponent {
1063 pub fn new() -> Self {
1065 Self { config: None }
1066 }
1067
1068 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1070 Self {
1071 config: Some(config),
1072 }
1073 }
1074
1075 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1077 Self { config }
1078 }
1079}
1080
1081impl Default for ContainerComponent {
1082 fn default() -> Self {
1083 Self::new()
1084 }
1085}
1086
1087impl Component for ContainerComponent {
1088 fn scheme(&self) -> &str {
1089 "container"
1090 }
1091
1092 fn create_endpoint(
1093 &self,
1094 uri: &str,
1095 _ctx: &dyn camel_component_api::ComponentContext,
1096 ) -> Result<Box<dyn Endpoint>, CamelError> {
1097 let mut config = ContainerConfig::from_uri(uri)?;
1098 if let Some(ref global) = self.config {
1100 config.apply_global_defaults(global);
1101 }
1102 Ok(Box::new(ContainerEndpoint {
1103 uri: uri.to_string(),
1104 config,
1105 }))
1106 }
1107}
1108
1109pub struct ContainerEndpoint {
1114 uri: String,
1115 config: ContainerConfig,
1116}
1117
1118impl ContainerEndpoint {
1119 pub fn docker_host(&self) -> Option<&str> {
1122 self.config.host.as_deref()
1123 }
1124}
1125
1126impl Endpoint for ContainerEndpoint {
1127 fn uri(&self) -> &str {
1128 &self.uri
1129 }
1130
1131 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1132 Ok(Box::new(ContainerConsumer {
1133 config: self.config.clone(),
1134 }))
1135 }
1136
1137 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1138 let docker = self.config.connect_docker_client()?;
1139 Ok(BoxProcessor::new(ContainerProducer {
1140 config: self.config.clone(),
1141 docker,
1142 }))
1143 }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use super::*;
1149 use camel_component_api::NoOpComponentContext;
1150
1151 #[test]
1152 fn test_container_config() {
1153 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1154 assert_eq!(config.operation, "run");
1155 assert_eq!(config.image.as_deref(), Some("alpine"));
1156 assert!(config.host.is_none());
1158 }
1159
1160 #[test]
1161 fn test_global_config_applied_to_endpoint() {
1162 let global =
1165 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1166 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1167 assert!(
1168 config.host.is_none(),
1169 "URI without ?host= should leave host as None"
1170 );
1171 config.apply_global_defaults(&global);
1172 assert_eq!(
1173 config.host.as_deref(),
1174 Some("unix:///custom/docker.sock"),
1175 "global docker_host must be applied when URI did not set host"
1176 );
1177 }
1178
1179 #[test]
1180 fn test_uri_param_wins_over_global_config() {
1181 let global =
1183 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1184 let mut config =
1185 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1186 .unwrap();
1187 assert_eq!(
1188 config.host.as_deref(),
1189 Some("unix:///override.sock"),
1190 "URI-set host should be parsed correctly"
1191 );
1192 config.apply_global_defaults(&global);
1193 assert_eq!(
1194 config.host.as_deref(),
1195 Some("unix:///override.sock"),
1196 "global config must NOT override a host already set by URI"
1197 );
1198 }
1199
1200 #[test]
1201 fn test_container_config_parses_name() {
1202 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1203 assert_eq!(config.name.as_deref(), Some("my-container"));
1204 }
1205
1206 #[test]
1207 fn test_parse_producer_operation_known() {
1208 assert_eq!(
1209 parse_producer_operation("list").unwrap(),
1210 ProducerOperation::List
1211 );
1212 assert_eq!(
1213 parse_producer_operation("run").unwrap(),
1214 ProducerOperation::Run
1215 );
1216 assert_eq!(
1217 parse_producer_operation("start").unwrap(),
1218 ProducerOperation::Start
1219 );
1220 assert_eq!(
1221 parse_producer_operation("stop").unwrap(),
1222 ProducerOperation::Stop
1223 );
1224 assert_eq!(
1225 parse_producer_operation("remove").unwrap(),
1226 ProducerOperation::Remove
1227 );
1228 }
1229
1230 #[test]
1231 fn test_parse_producer_operation_unknown() {
1232 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1233 match err {
1234 CamelError::ProcessorError(msg) => {
1235 assert!(
1236 msg.contains("Unknown container operation"),
1237 "Unexpected error message: {}",
1238 msg
1239 );
1240 }
1241 _ => panic!("Expected ProcessorError for unknown operation"),
1242 }
1243 }
1244
1245 #[test]
1246 fn test_resolve_container_name_header_overrides_config() {
1247 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1248 let mut exchange = Exchange::new(Message::new(""));
1249 exchange.input.set_header(
1250 HEADER_CONTAINER_NAME,
1251 serde_json::Value::String("header-name".to_string()),
1252 );
1253
1254 let resolved = resolve_container_name(&exchange, &config);
1255 assert_eq!(resolved.as_deref(), Some("header-name"));
1256 }
1257
1258 #[test]
1259 fn test_container_config_rejects_tcp_host() {
1260 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1261 let err = config.connect_docker_client().unwrap_err();
1262 match err {
1263 CamelError::ProcessorError(msg) => {
1264 assert!(
1265 msg.to_lowercase().contains("tcp"),
1266 "Expected TCP scheme error, got: {}",
1267 msg
1268 );
1269 }
1270 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1276 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1277 let remove_called_clone = remove_called.clone();
1278
1279 let result = run_container_with_cleanup(
1280 || async { Ok("container-123".to_string()) },
1281 |_id| async move {
1282 Err(CamelError::ProcessorError(
1283 "Failed to start container".to_string(),
1284 ))
1285 },
1286 move |_id| {
1287 let remove_called_inner = remove_called_clone.clone();
1288 async move {
1289 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1290 Ok(())
1291 }
1292 },
1293 )
1294 .await;
1295
1296 assert!(result.is_err(), "Expected start failure to bubble up");
1297 assert!(
1298 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1299 "Expected cleanup to remove container"
1300 );
1301 }
1302
1303 #[test]
1304 fn test_container_component_creates_endpoint() {
1305 let component = ContainerComponent::new();
1306 assert_eq!(component.scheme(), "container");
1307 let ctx = NoOpComponentContext;
1308 let endpoint = component
1309 .create_endpoint("container:run?image=alpine", &ctx)
1310 .unwrap();
1311 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1312 }
1313
1314 #[test]
1315 fn test_container_config_parses_ports() {
1316 let config =
1317 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1318 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1319 }
1320
1321 #[test]
1322 fn test_container_config_parses_env() {
1323 let config =
1324 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1325 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1326 }
1327
1328 #[test]
1329 fn test_container_config_parses_logs_options() {
1330 let config = ContainerConfig::from_uri(
1331 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1332 )
1333 .unwrap();
1334 assert_eq!(config.operation, "logs");
1335 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1336 assert!(config.follow);
1337 assert!(config.timestamps);
1338 assert_eq!(config.tail.as_deref(), Some("100"));
1339 }
1340
1341 #[test]
1342 fn test_container_config_logs_defaults() {
1343 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1344 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1348
1349 #[test]
1350 fn test_parse_ports_single() {
1351 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1352 let (exposed, bindings) = config.parse_ports().unwrap();
1353
1354 assert!(exposed.contains_key("80/tcp"));
1355 assert!(bindings.contains_key("80/tcp"));
1356
1357 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1358 assert_eq!(binding.len(), 1);
1359 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1360 }
1361
1362 #[test]
1363 fn test_parse_ports_multiple() {
1364 let config =
1365 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1366 let (exposed, bindings) = config.parse_ports().unwrap();
1367
1368 assert!(exposed.contains_key("80/tcp"));
1369 assert!(exposed.contains_key("443/tcp"));
1370 assert_eq!(bindings.len(), 2);
1371 }
1372
1373 #[test]
1374 fn test_parse_ports_with_protocol() {
1375 let config =
1376 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1377 .unwrap();
1378 let (exposed, _bindings) = config.parse_ports().unwrap();
1379
1380 assert!(exposed.contains_key("80/tcp"));
1381 assert!(exposed.contains_key("53/udp"));
1382 }
1383
1384 #[test]
1385 fn test_parse_ports_none() {
1386 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1387 assert!(config.parse_ports().is_none());
1388 }
1389
1390 #[test]
1391 fn test_parse_env_single() {
1392 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1393 let env = config.parse_env().unwrap();
1394
1395 assert_eq!(env.len(), 1);
1396 assert_eq!(env[0], "FOO=bar");
1397 }
1398
1399 #[test]
1400 fn test_parse_env_multiple() {
1401 let config =
1402 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1403 .unwrap();
1404 let env = config.parse_env().unwrap();
1405
1406 assert_eq!(env.len(), 3);
1407 assert!(env.contains(&"FOO=bar".to_string()));
1408 assert!(env.contains(&"BAZ=qux".to_string()));
1409 assert!(env.contains(&"NUM=123".to_string()));
1410 }
1411
1412 #[test]
1413 fn test_parse_env_none() {
1414 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1415 assert!(config.parse_env().is_none());
1416 }
1417
1418 use camel_component_api::Message;
1419 use std::sync::Arc;
1420
1421 #[tokio::test]
1422 async fn test_container_producer_resolves_operation_from_header() {
1423 let docker = match Docker::connect_with_local_defaults() {
1425 Ok(d) => d,
1426 Err(_) => {
1427 eprintln!("Skipping test: Could not connect to Docker daemon");
1428 return;
1429 }
1430 };
1431
1432 if docker.ping().await.is_err() {
1433 eprintln!("Skipping test: Docker daemon not responding to ping");
1434 return;
1435 }
1436
1437 let component = ContainerComponent::new();
1438 let ctx = NoOpComponentContext;
1439 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1440
1441 let ctx = ProducerContext::new();
1442 let mut producer = endpoint.create_producer(&ctx).unwrap();
1443
1444 let mut exchange = Exchange::new(Message::new(""));
1445 exchange
1446 .input
1447 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1448
1449 use tower::ServiceExt;
1450 let result = producer
1451 .ready()
1452 .await
1453 .unwrap()
1454 .call(exchange)
1455 .await
1456 .unwrap();
1457
1458 assert_eq!(
1460 result
1461 .input
1462 .header(HEADER_ACTION_RESULT)
1463 .map(|v| v.as_str().unwrap()),
1464 Some("success")
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn test_container_producer_connection_error_on_invalid_host() {
1470 let component = ContainerComponent::new();
1472 let ctx = NoOpComponentContext;
1473 let endpoint = component
1474 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock", &ctx)
1475 .unwrap();
1476
1477 let ctx = ProducerContext::new();
1478 let result = endpoint.create_producer(&ctx);
1479
1480 assert!(
1482 result.is_err(),
1483 "Expected error when connecting to invalid host"
1484 );
1485 let err = result.unwrap_err();
1486 match &err {
1487 CamelError::ProcessorError(msg) => {
1488 assert!(
1489 msg.to_lowercase().contains("connection")
1490 || msg.to_lowercase().contains("connect")
1491 || msg.to_lowercase().contains("socket")
1492 || msg.contains("docker"),
1493 "Error message should indicate connection failure, got: {}",
1494 msg
1495 );
1496 }
1497 _ => panic!("Expected ProcessorError, got: {:?}", err),
1498 }
1499 }
1500
1501 #[tokio::test]
1503 async fn test_container_producer_lifecycle_operations_missing_id() {
1504 let docker = match Docker::connect_with_local_defaults() {
1506 Ok(d) => d,
1507 Err(_) => {
1508 eprintln!("Skipping test: Could not connect to Docker daemon");
1509 return;
1510 }
1511 };
1512
1513 if docker.ping().await.is_err() {
1514 eprintln!("Skipping test: Docker daemon not responding to ping");
1515 return;
1516 }
1517
1518 let component = ContainerComponent::new();
1519 let ctx = NoOpComponentContext;
1520 let endpoint = component.create_endpoint("container:start", &ctx).unwrap();
1521 let ctx = ProducerContext::new();
1522 let mut producer = endpoint.create_producer(&ctx).unwrap();
1523
1524 for operation in ["start", "stop", "remove"] {
1526 let mut exchange = Exchange::new(Message::new(""));
1527 exchange.input.set_header(
1528 HEADER_ACTION,
1529 serde_json::Value::String(operation.to_string()),
1530 );
1531 use tower::ServiceExt;
1534 let result = producer.ready().await.unwrap().call(exchange).await;
1535
1536 assert!(
1537 result.is_err(),
1538 "Expected error for {} operation without CamelContainerId",
1539 operation
1540 );
1541 let err = result.unwrap_err();
1542 match &err {
1543 CamelError::ProcessorError(msg) => {
1544 assert!(
1545 msg.contains(HEADER_CONTAINER_ID),
1546 "Error message should mention {}, got: {}",
1547 HEADER_CONTAINER_ID,
1548 msg
1549 );
1550 }
1551 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1552 }
1553 }
1554 }
1555
1556 #[tokio::test]
1558 async fn test_container_producer_stop_nonexistent() {
1559 let docker = match Docker::connect_with_local_defaults() {
1561 Ok(d) => d,
1562 Err(_) => {
1563 eprintln!("Skipping test: Could not connect to Docker daemon");
1564 return;
1565 }
1566 };
1567
1568 if docker.ping().await.is_err() {
1569 eprintln!("Skipping test: Docker daemon not responding to ping");
1570 return;
1571 }
1572
1573 let component = ContainerComponent::new();
1574 let ctx = NoOpComponentContext;
1575 let endpoint = component.create_endpoint("container:stop", &ctx).unwrap();
1576 let ctx = ProducerContext::new();
1577 let mut producer = endpoint.create_producer(&ctx).unwrap();
1578
1579 let mut exchange = Exchange::new(Message::new(""));
1580 exchange
1581 .input
1582 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1583 exchange.input.set_header(
1584 HEADER_CONTAINER_ID,
1585 serde_json::Value::String("nonexistent-container-123".into()),
1586 );
1587
1588 use tower::ServiceExt;
1589 let result = producer.ready().await.unwrap().call(exchange).await;
1590
1591 assert!(
1592 result.is_err(),
1593 "Expected error when stopping nonexistent container"
1594 );
1595 let err = result.unwrap_err();
1596 match &err {
1597 CamelError::ProcessorError(msg) => {
1598 assert!(
1600 msg.to_lowercase().contains("no such container")
1601 || msg.to_lowercase().contains("not found")
1602 || msg.contains("404"),
1603 "Error message should indicate container not found, got: {}",
1604 msg
1605 );
1606 }
1607 _ => panic!("Expected ProcessorError, got: {:?}", err),
1608 }
1609 }
1610
1611 #[tokio::test]
1613 async fn test_container_producer_run_missing_image() {
1614 let docker = match Docker::connect_with_local_defaults() {
1616 Ok(d) => d,
1617 Err(_) => {
1618 eprintln!("Skipping test: Could not connect to Docker daemon");
1619 return;
1620 }
1621 };
1622
1623 if docker.ping().await.is_err() {
1624 eprintln!("Skipping test: Docker daemon not responding to ping");
1625 return;
1626 }
1627
1628 let component = ContainerComponent::new();
1630 let ctx = NoOpComponentContext;
1631 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1632 let ctx = ProducerContext::new();
1633 let mut producer = endpoint.create_producer(&ctx).unwrap();
1634
1635 let mut exchange = Exchange::new(Message::new(""));
1636 exchange
1637 .input
1638 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1639 use tower::ServiceExt;
1642 let result = producer.ready().await.unwrap().call(exchange).await;
1643
1644 assert!(
1645 result.is_err(),
1646 "Expected error for run operation without image"
1647 );
1648 let err = result.unwrap_err();
1649 match &err {
1650 CamelError::ProcessorError(msg) => {
1651 assert!(
1652 msg.to_lowercase().contains("image"),
1653 "Error message should mention 'image', got: {}",
1654 msg
1655 );
1656 }
1657 _ => panic!("Expected ProcessorError, got: {:?}", err),
1658 }
1659 }
1660
1661 #[tokio::test]
1663 async fn test_container_producer_run_image_from_header() {
1664 let docker = match Docker::connect_with_local_defaults() {
1666 Ok(d) => d,
1667 Err(_) => {
1668 eprintln!("Skipping test: Could not connect to Docker daemon");
1669 return;
1670 }
1671 };
1672
1673 if docker.ping().await.is_err() {
1674 eprintln!("Skipping test: Docker daemon not responding to ping");
1675 return;
1676 }
1677
1678 let component = ContainerComponent::new();
1680 let ctx = NoOpComponentContext;
1681 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1682 let ctx = ProducerContext::new();
1683 let mut producer = endpoint.create_producer(&ctx).unwrap();
1684
1685 let mut exchange = Exchange::new(Message::new(""));
1686 exchange
1687 .input
1688 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1689 exchange.input.set_header(
1691 HEADER_IMAGE,
1692 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1693 );
1694
1695 use tower::ServiceExt;
1696 let result = producer.ready().await.unwrap().call(exchange).await;
1697
1698 assert!(
1700 result.is_err(),
1701 "Expected error when running container with nonexistent image"
1702 );
1703 let err = result.unwrap_err();
1704 match &err {
1705 CamelError::ProcessorError(msg) => {
1706 assert!(
1708 msg.to_lowercase().contains("no such image")
1709 || msg.to_lowercase().contains("not found")
1710 || msg.to_lowercase().contains("image")
1711 || msg.to_lowercase().contains("pull")
1712 || msg.contains("404"),
1713 "Error message should indicate image issue, got: {}",
1714 msg
1715 );
1716 }
1717 _ => panic!("Expected ProcessorError, got: {:?}", err),
1718 }
1719 }
1720
1721 #[tokio::test]
1724 async fn test_container_producer_run_alpine_container() {
1725 let docker = match Docker::connect_with_local_defaults() {
1726 Ok(d) => d,
1727 Err(_) => {
1728 eprintln!("Skipping test: Could not connect to Docker daemon");
1729 return;
1730 }
1731 };
1732
1733 if docker.ping().await.is_err() {
1734 eprintln!("Skipping test: Docker daemon not responding to ping");
1735 return;
1736 }
1737
1738 let images = docker.list_images::<&str>(None).await.unwrap();
1740 let has_alpine = images
1741 .iter()
1742 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1743
1744 if !has_alpine {
1745 eprintln!("Pulling alpine:latest image...");
1746 let mut stream = docker.create_image(
1747 Some(bollard::image::CreateImageOptions {
1748 from_image: "alpine:latest",
1749 ..Default::default()
1750 }),
1751 None,
1752 None,
1753 );
1754
1755 use futures::StreamExt;
1756 while let Some(_item) = stream.next().await {
1757 }
1759 eprintln!("Image pulled successfully");
1760 }
1761
1762 let component = ContainerComponent::new();
1764 let ctx = NoOpComponentContext;
1765 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1766 let ctx = ProducerContext::new();
1767 let mut producer = endpoint.create_producer(&ctx).unwrap();
1768
1769 let timestamp = std::time::SystemTime::now()
1771 .duration_since(std::time::UNIX_EPOCH)
1772 .unwrap()
1773 .as_millis();
1774 let container_name = format!("test-rust-camel-{}", timestamp);
1775 let mut exchange = Exchange::new(Message::new(""));
1776 exchange.input.set_header(
1777 HEADER_IMAGE,
1778 serde_json::Value::String("alpine:latest".into()),
1779 );
1780 exchange.input.set_header(
1781 HEADER_CONTAINER_NAME,
1782 serde_json::Value::String(container_name.clone()),
1783 );
1784
1785 use tower::ServiceExt;
1786 let result = producer
1787 .ready()
1788 .await
1789 .unwrap()
1790 .call(exchange)
1791 .await
1792 .expect("Container run should succeed");
1793
1794 let container_id = result
1796 .input
1797 .header(HEADER_CONTAINER_ID)
1798 .and_then(|v| v.as_str().map(|s| s.to_string()))
1799 .expect("Expected container ID header");
1800 assert!(!container_id.is_empty(), "Container ID should not be empty");
1801
1802 assert_eq!(
1804 result
1805 .input
1806 .header(HEADER_ACTION_RESULT)
1807 .and_then(|v| v.as_str()),
1808 Some("success")
1809 );
1810
1811 let inspect = docker
1813 .inspect_container(&container_id, None)
1814 .await
1815 .expect("Container should exist");
1816 assert_eq!(inspect.id.as_deref(), Some(container_id.as_str()));
1817
1818 docker
1820 .remove_container(
1821 &container_id,
1822 Some(bollard::container::RemoveContainerOptions {
1823 force: true,
1824 ..Default::default()
1825 }),
1826 )
1827 .await
1828 .ok();
1829
1830 eprintln!("✅ Container {} created and cleaned up", container_id);
1831 }
1832
1833 #[tokio::test]
1835 async fn test_container_consumer_unsupported_operation() {
1836 use tokio::sync::mpsc;
1837
1838 let component = ContainerComponent::new();
1839 let ctx = NoOpComponentContext;
1840 let endpoint = component.create_endpoint("container:run", &ctx).unwrap();
1841 let mut consumer = endpoint.create_consumer().unwrap();
1842
1843 let (tx, _rx) = mpsc::channel(16);
1845 let cancel_token = tokio_util::sync::CancellationToken::new();
1846 let context = ConsumerContext::new(tx, cancel_token);
1847
1848 let result = consumer.start(context).await;
1849
1850 assert!(
1852 result.is_err(),
1853 "Expected error for unsupported consumer operation"
1854 );
1855 let err = result.unwrap_err();
1856 match &err {
1857 CamelError::EndpointCreationFailed(msg) => {
1858 assert!(
1859 msg.contains("Consumer only supports 'events' or 'logs'"),
1860 "Error message should mention events or logs support, got: {}",
1861 msg
1862 );
1863 }
1864 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1865 }
1866 }
1867
1868 #[test]
1869 fn test_container_consumer_concurrency_model_is_concurrent() {
1870 let consumer = ContainerConsumer {
1871 config: ContainerConfig::from_uri("container:events").unwrap(),
1872 };
1873
1874 assert_eq!(
1875 consumer.concurrency_model(),
1876 camel_component_api::ConcurrencyModel::Concurrent { max: None }
1877 );
1878 }
1879
1880 #[tokio::test]
1884 async fn test_container_consumer_cancellation() {
1885 use std::sync::atomic::{AtomicBool, Ordering};
1886 use tokio::sync::mpsc;
1887
1888 let docker = match Docker::connect_with_local_defaults() {
1890 Ok(d) => d,
1891 Err(_) => {
1892 eprintln!("Skipping test: Could not connect to Docker daemon");
1893 return;
1894 }
1895 };
1896
1897 if docker.ping().await.is_err() {
1898 eprintln!("Skipping test: Docker daemon not responding to ping");
1899 return;
1900 }
1901
1902 let component = ContainerComponent::new();
1903 let ctx = NoOpComponentContext;
1904 let endpoint = component.create_endpoint("container:events", &ctx).unwrap();
1905 let mut consumer = endpoint.create_consumer().unwrap();
1906
1907 let (tx, _rx) = mpsc::channel(16);
1909 let cancel_token = tokio_util::sync::CancellationToken::new();
1910 let context = ConsumerContext::new(tx, cancel_token.clone());
1911
1912 let completed = Arc::new(AtomicBool::new(false));
1914 let completed_clone = completed.clone();
1915
1916 let handle = tokio::spawn(async move {
1918 let result = consumer.start(context).await;
1919 completed_clone.store(true, Ordering::SeqCst);
1921 result
1922 });
1923
1924 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1926
1927 assert!(
1929 !completed.load(Ordering::SeqCst),
1930 "Consumer should still be running before cancellation"
1931 );
1932
1933 cancel_token.cancel();
1935
1936 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1938
1939 assert!(
1941 result.is_ok(),
1942 "Consumer should gracefully shut down after cancellation"
1943 );
1944
1945 assert!(
1947 completed.load(Ordering::SeqCst),
1948 "Consumer should have completed after cancellation"
1949 );
1950 }
1951
1952 #[tokio::test]
1956 async fn test_container_producer_list_containers() {
1957 let docker = match Docker::connect_with_local_defaults() {
1960 Ok(d) => d,
1961 Err(_) => {
1962 eprintln!("Skipping test: Could not connect to Docker daemon");
1963 return;
1964 }
1965 };
1966
1967 if docker.ping().await.is_err() {
1968 eprintln!("Skipping test: Docker daemon not responding to ping");
1969 return;
1970 }
1971
1972 let component = ContainerComponent::new();
1974 let ctx = NoOpComponentContext;
1975 let endpoint = component.create_endpoint("container:list", &ctx).unwrap();
1976
1977 let ctx = ProducerContext::new();
1978 let mut producer = endpoint.create_producer(&ctx).unwrap();
1979
1980 let mut exchange = Exchange::new(Message::new(""));
1982 exchange
1983 .input
1984 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1985
1986 use tower::ServiceExt;
1988 let result = producer
1989 .ready()
1990 .await
1991 .unwrap()
1992 .call(exchange)
1993 .await
1994 .expect("Producer should succeed when Docker is available");
1995
1996 match &result.input.body {
1999 camel_component_api::Body::Json(json_value) => {
2000 assert!(
2001 json_value.is_array(),
2002 "Expected input body to be a JSON array, got: {:?}",
2003 json_value
2004 );
2005 }
2006 other => panic!("Expected Body::Json with array, got: {:?}", other),
2007 }
2008 }
2009}