1use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25fn track_container(id: String) {
27 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28 tracker.insert(id);
29 }
30}
31
32fn untrack_container(id: &str) {
34 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35 tracker.remove(id);
36 }
37}
38
39pub async fn cleanup_tracked_containers() {
41 let ids: Vec<String> = {
42 match CONTAINER_TRACKER.lock() {
43 Ok(tracker) => tracker.iter().cloned().collect(),
44 Err(_) => return,
45 }
46 };
47
48 if ids.is_empty() {
49 return;
50 }
51
52 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54 let docker = match Docker::connect_with_local_defaults() {
55 Ok(d) => d,
56 Err(e) => {
57 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58 return;
59 }
60 };
61
62 for id in ids {
63 match docker
64 .remove_container(
65 &id,
66 Some(bollard::container::RemoveContainerOptions {
67 force: true,
68 ..Default::default()
69 }),
70 )
71 .await
72 {
73 Ok(_) => {
74 tracing::debug!("Cleaned up container {}", id);
75 untrack_container(&id);
76 }
77 Err(e) => {
78 tracing::warn!("Failed to cleanup container {}: {}", id, e);
79 }
80 }
81 }
82}
83
84const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110#[derive(Debug, Clone, PartialEq)]
118pub struct ContainerGlobalConfig {
119 pub docker_host: String,
121}
122
123impl Default for ContainerGlobalConfig {
124 fn default() -> Self {
125 Self {
126 docker_host: "unix:///var/run/docker.sock".to_string(),
127 }
128 }
129}
130
131impl ContainerGlobalConfig {
132 pub fn new() -> Self {
133 Self::default()
134 }
135
136 pub fn with_docker_host(mut self, v: impl Into<String>) -> Self {
137 self.docker_host = v.into();
138 self
139 }
140}
141
142#[derive(Debug, Clone)]
151pub struct ContainerConfig {
152 pub operation: String,
154 pub image: Option<String>,
156 pub name: Option<String>,
158 pub host: Option<String>,
160 pub cmd: Option<String>,
162 pub ports: Option<String>,
164 pub env: Option<String>,
166 pub network: Option<String>,
168 pub container_id: Option<String>,
170 pub follow: bool,
172 pub timestamps: bool,
174 pub tail: Option<String>,
176 pub auto_pull: bool,
178 pub auto_remove: bool,
180}
181
182impl ContainerConfig {
183 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
191 let parts = parse_uri(uri)?;
192 if parts.scheme != "container" {
193 return Err(CamelError::InvalidUri(format!(
194 "expected scheme 'container', got '{}'",
195 parts.scheme
196 )));
197 }
198
199 let image = parts.params.get("image").cloned();
200 let name = parts.params.get("name").cloned();
201 let cmd = parts.params.get("cmd").cloned();
202 let ports = parts.params.get("ports").cloned();
203 let env = parts.params.get("env").cloned();
204 let network = parts.params.get("network").cloned();
205 let container_id = parts.params.get("containerId").cloned();
206 let follow = parts
207 .params
208 .get("follow")
209 .map(|v| v.eq_ignore_ascii_case("true"))
210 .unwrap_or(true);
211 let timestamps = parts
212 .params
213 .get("timestamps")
214 .map(|v| v.eq_ignore_ascii_case("true"))
215 .unwrap_or(false);
216 let tail = parts.params.get("tail").cloned();
217 let auto_pull = parts
218 .params
219 .get("autoPull")
220 .map(|v| v.eq_ignore_ascii_case("true"))
221 .unwrap_or(true);
222 let auto_remove = parts
223 .params
224 .get("autoRemove")
225 .map(|v| v.eq_ignore_ascii_case("true"))
226 .unwrap_or(true);
227 let host = parts.params.get("host").cloned();
229
230 Ok(Self {
231 operation: parts.path,
232 image,
233 name,
234 host,
235 cmd,
236 ports,
237 env,
238 network,
239 container_id,
240 follow,
241 timestamps,
242 tail,
243 auto_pull,
244 auto_remove,
245 })
246 }
247
248 fn apply_global_defaults(&mut self, global: &ContainerGlobalConfig) {
251 if self.host.is_none() {
252 self.host = Some(global.docker_host.clone());
253 }
254 }
255
256 fn docker_socket_path(&self) -> Result<&str, CamelError> {
257 let host = self.host.as_deref().unwrap_or(if cfg!(windows) {
258 "npipe:////./pipe/docker_engine"
259 } else {
260 "unix:///var/run/docker.sock"
261 });
262
263 if host.starts_with("unix://") || host.starts_with("npipe://") {
264 return Ok(host);
265 }
266
267 if host.contains("://") {
268 return Err(CamelError::ProcessorError(format!(
269 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
270 host
271 )));
272 }
273
274 Ok(host)
275 }
276
277 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
278 let socket_path = self.docker_socket_path()?;
279 Docker::connect_with_socket(
280 socket_path,
281 DOCKER_CONNECT_TIMEOUT_SECS,
282 bollard::API_DEFAULT_VERSION,
283 )
284 .map_err(|e| {
285 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
286 })
287 }
288
289 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
297 let docker = self.connect_docker_client()?;
298 docker
299 .ping()
300 .await
301 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
302 Ok(docker)
303 }
304
305 #[allow(clippy::type_complexity)]
306 fn parse_ports(
307 &self,
308 ) -> Option<(
309 HashMap<String, HashMap<(), ()>>,
310 HashMap<String, Option<Vec<PortBinding>>>,
311 )> {
312 let ports_str = self.ports.as_ref()?;
313
314 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
315 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
316
317 for mapping in ports_str.split(',') {
318 let mapping = mapping.trim();
319 if mapping.is_empty() {
320 continue;
321 }
322
323 let (host_port, container_spec) = mapping.split_once(':')?;
324
325 let (container_port, protocol) = if container_spec.contains('/') {
326 let parts: Vec<&str> = container_spec.split('/').collect();
327 (parts[0], parts[1])
328 } else {
329 (container_spec, "tcp")
330 };
331
332 let container_key = format!("{}/{}", container_port, protocol);
333
334 exposed_ports.insert(container_key.clone(), HashMap::new());
335
336 port_bindings.insert(
337 container_key,
338 Some(vec![PortBinding {
339 host_ip: None,
340 host_port: Some(host_port.to_string()),
341 }]),
342 );
343 }
344
345 if exposed_ports.is_empty() {
346 None
347 } else {
348 Some((exposed_ports, port_bindings))
349 }
350 }
351
352 fn parse_env(&self) -> Option<Vec<String>> {
353 let env_str = self.env.as_ref()?;
354
355 let env_vars: Vec<String> = env_str
356 .split(',')
357 .map(|s| s.trim().to_string())
358 .filter(|s| !s.is_empty())
359 .collect();
360
361 if env_vars.is_empty() {
362 None
363 } else {
364 Some(env_vars)
365 }
366 }
367}
368
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
370enum ProducerOperation {
371 List,
372 Run,
373 Start,
374 Stop,
375 Remove,
376}
377
378fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
379 match operation {
380 "list" => Ok(ProducerOperation::List),
381 "run" => Ok(ProducerOperation::Run),
382 "start" => Ok(ProducerOperation::Start),
383 "stop" => Ok(ProducerOperation::Stop),
384 "remove" => Ok(ProducerOperation::Remove),
385 _ => Err(CamelError::ProcessorError(format!(
386 "Unknown container operation: {}",
387 operation
388 ))),
389 }
390}
391
392fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
393 exchange
394 .input
395 .header(HEADER_CONTAINER_NAME)
396 .and_then(|v| v.as_str().map(|s| s.to_string()))
397 .or_else(|| config.name.clone())
398}
399
400async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
401 let images = docker
402 .list_images::<&str>(None)
403 .await
404 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
405
406 Ok(images.iter().any(|img| {
407 img.repo_tags
408 .iter()
409 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
410 }))
411}
412
413async fn pull_image_with_progress(
414 docker: &Docker,
415 image: &str,
416 timeout_secs: u64,
417) -> Result<(), CamelError> {
418 use futures::StreamExt;
419
420 tracing::info!("Pulling image: {}", image);
421
422 let mut stream = docker.create_image(
423 Some(bollard::image::CreateImageOptions {
424 from_image: image,
425 ..Default::default()
426 }),
427 None,
428 None,
429 );
430
431 let start = std::time::Instant::now();
432 let mut last_progress = std::time::Instant::now();
433
434 while let Some(item) = stream.next().await {
435 if start.elapsed().as_secs() > timeout_secs {
436 return Err(CamelError::ProcessorError(format!(
437 "Image pull timeout after {}s. Try manually: docker pull {}",
438 timeout_secs, image
439 )));
440 }
441
442 match item {
443 Ok(update) => {
444 if last_progress.elapsed().as_secs() >= 2 {
446 if let Some(status) = update.status {
447 tracing::debug!("Pull progress: {}", status);
448 }
449 last_progress = std::time::Instant::now();
450 }
451 }
452 Err(e) => {
453 let err_str = e.to_string().to_lowercase();
454 if err_str.contains("unauthorized") || err_str.contains("401") {
455 return Err(CamelError::ProcessorError(format!(
456 "Authentication required for image '{}'. Configure Docker credentials: docker login",
457 image
458 )));
459 }
460 if err_str.contains("not found") || err_str.contains("404") {
461 return Err(CamelError::ProcessorError(format!(
462 "Image '{}' not found in registry. Check the image name and tag",
463 image
464 )));
465 }
466 return Err(CamelError::ProcessorError(format!(
467 "Failed to pull image '{}': {}",
468 image, e
469 )));
470 }
471 }
472 }
473
474 tracing::info!("Successfully pulled image: {}", image);
475 Ok(())
476}
477
478async fn ensure_image_available(
479 docker: &Docker,
480 image: &str,
481 auto_pull: bool,
482 timeout_secs: u64,
483) -> Result<(), CamelError> {
484 if image_exists_locally(docker, image).await? {
485 tracing::debug!("Image '{}' already available locally", image);
486 return Ok(());
487 }
488
489 if !auto_pull {
490 return Err(CamelError::ProcessorError(format!(
491 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
492 image, image
493 )));
494 }
495
496 pull_image_with_progress(docker, image, timeout_secs).await
497}
498
499fn format_docker_event(event: &bollard::models::EventMessage) -> String {
500 let action = event.action.as_deref().unwrap_or("unknown");
501 let actor = event.actor.as_ref();
502
503 let container_name = actor
504 .and_then(|a| a.attributes.as_ref())
505 .and_then(|attrs| attrs.get("name"))
506 .map(|s| s.as_str())
507 .unwrap_or("unknown");
508
509 let image = actor
510 .and_then(|a| a.attributes.as_ref())
511 .and_then(|attrs| attrs.get("image"))
512 .map(|s| s.as_str())
513 .unwrap_or("");
514
515 let exit_code = actor
516 .and_then(|a| a.attributes.as_ref())
517 .and_then(|attrs| attrs.get("exitCode"))
518 .map(|s| s.as_str());
519
520 match action {
521 "create" => {
522 if image.is_empty() {
523 format!("[CREATE] Container {}", container_name)
524 } else {
525 format!("[CREATE] Container {} ({})", container_name, image)
526 }
527 }
528 "start" => format!("[START] Container {}", container_name),
529 "die" => {
530 if let Some(code) = exit_code {
531 format!("[DIE] Container {} (exit: {})", container_name, code)
532 } else {
533 format!("[DIE] Container {}", container_name)
534 }
535 }
536 "destroy" => format!("[DESTROY] Container {}", container_name),
537 "stop" => format!("[STOP] Container {}", container_name),
538 "pause" => format!("[PAUSE] Container {}", container_name),
539 "unpause" => format!("[UNPAUSE] Container {}", container_name),
540 "restart" => format!("[RESTART] Container {}", container_name),
541 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
542 }
543}
544
545async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
546 create: CreateFn,
547 start: StartFn,
548 remove: RemoveFn,
549) -> Result<String, CamelError>
550where
551 CreateFn: FnOnce() -> CreateFut,
552 CreateFut: Future<Output = Result<String, CamelError>>,
553 StartFn: FnOnce(String) -> StartFut,
554 StartFut: Future<Output = Result<(), CamelError>>,
555 RemoveFn: FnOnce(String) -> RemoveFut,
556 RemoveFut: Future<Output = Result<(), CamelError>>,
557{
558 let container_id = create().await?;
559 if let Err(start_err) = start(container_id.clone()).await {
560 if let Err(remove_err) = remove(container_id.clone()).await {
561 return Err(CamelError::ProcessorError(format!(
562 "Failed to start container: {}. Cleanup failed: {}",
563 start_err, remove_err
564 )));
565 }
566 return Err(start_err);
567 }
568
569 Ok(container_id)
570}
571
572#[derive(Clone)]
577pub struct ContainerProducer {
578 config: ContainerConfig,
579 docker: Docker,
580}
581
582impl Service<Exchange> for ContainerProducer {
583 type Response = Exchange;
584 type Error = CamelError;
585 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
586
587 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
588 Poll::Ready(Ok(()))
589 }
590
591 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
592 let config = self.config.clone();
593 let docker = self.docker.clone();
594 Box::pin(async move {
595 let operation_name = exchange
597 .input
598 .header(HEADER_ACTION)
599 .and_then(|v| v.as_str().map(|s| s.to_string()))
600 .unwrap_or_else(|| config.operation.clone());
601
602 let operation = parse_producer_operation(&operation_name)?;
603
604 match operation {
606 ProducerOperation::List => {
607 let containers = docker.list_containers::<String>(None).await.map_err(|e| {
608 CamelError::ProcessorError(format!("Failed to list containers: {}", e))
609 })?;
610
611 let json_value = serde_json::to_value(&containers).map_err(|e| {
612 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
613 })?;
614
615 exchange.input.body = Body::Json(json_value);
616 exchange.input.set_header(
617 HEADER_ACTION_RESULT,
618 serde_json::Value::String("success".to_string()),
619 );
620 }
621 ProducerOperation::Run => {
622 let image = exchange
624 .input
625 .header(HEADER_IMAGE)
626 .and_then(|v| v.as_str().map(|s| s.to_string()))
627 .or(config.image.clone())
628 .ok_or_else(|| {
629 CamelError::ProcessorError(
630 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
631 )
632 })?;
633
634 let pull_timeout = 300; ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
637 .await
638 .map_err(|e| {
639 CamelError::ProcessorError(format!(
640 "Image '{}' not available: {}",
641 image, e
642 ))
643 })?;
644
645 let container_name = resolve_container_name(&exchange, &config);
646 let container_name_ref = container_name.as_deref().unwrap_or("");
647 let cmd_parts: Option<Vec<String>> = config
648 .cmd
649 .as_ref()
650 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
651 let auto_remove = config.auto_remove;
652 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
653 let env_vars = config.parse_env();
654 let network_mode = config.network.clone();
655
656 let docker_create = docker.clone();
657 let docker_start = docker.clone();
658 let docker_remove = docker.clone();
659
660 let container_id = run_container_with_cleanup(
661 move || async move {
662 let create_options = bollard::container::CreateContainerOptions {
663 name: container_name_ref,
664 ..Default::default()
665 };
666 let container_config = bollard::container::Config::<String> {
667 image: Some(image.clone()),
668 cmd: cmd_parts,
669 env: env_vars,
670 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
671 host_config: Some(HostConfig {
672 auto_remove: Some(auto_remove),
673 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
674 network_mode,
675 ..Default::default()
676 }),
677 ..Default::default()
678 };
679
680 let create_response = docker_create
681 .create_container(Some(create_options), container_config)
682 .await
683 .map_err(|e| {
684 let err_str = e.to_string().to_lowercase();
685 if err_str.contains("409") || err_str.contains("conflict") {
686 CamelError::ProcessorError(format!(
687 "Container name '{}' already exists. Use a unique name or remove the existing container first",
688 container_name_ref
689 ))
690 } else {
691 CamelError::ProcessorError(format!(
692 "Failed to create container: {}",
693 e
694 ))
695 }
696 })?;
697
698 Ok(create_response.id)
699 },
700 move |container_id| async move {
701 docker_start
702 .start_container::<String>(&container_id, None)
703 .await
704 .map_err(|e| {
705 CamelError::ProcessorError(format!(
706 "Failed to start container: {}",
707 e
708 ))
709 })
710 },
711 move |container_id| async move {
712 docker_remove
713 .remove_container(&container_id, None)
714 .await
715 .map_err(|e| {
716 CamelError::ProcessorError(format!(
717 "Failed to remove container after start failure: {}",
718 e
719 ))
720 })
721 },
722 )
723 .await?;
724
725 track_container(container_id.clone());
726
727 exchange
728 .input
729 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
730 exchange.input.set_header(
731 HEADER_ACTION_RESULT,
732 serde_json::Value::String("success".to_string()),
733 );
734 }
735 ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
736 let container_id = exchange
738 .input
739 .header(HEADER_CONTAINER_ID)
740 .and_then(|v| v.as_str().map(|s| s.to_string()))
741 .ok_or_else(|| {
742 CamelError::ProcessorError(format!(
743 "{} header is required for {} operation",
744 HEADER_CONTAINER_ID, operation_name
745 ))
746 })?;
747
748 match operation {
749 ProducerOperation::Start => {
750 docker
751 .start_container::<String>(&container_id, None)
752 .await
753 .map_err(|e| {
754 CamelError::ProcessorError(format!(
755 "Failed to start container: {}",
756 e
757 ))
758 })?;
759 }
760 ProducerOperation::Stop => {
761 docker
762 .stop_container(&container_id, None)
763 .await
764 .map_err(|e| {
765 CamelError::ProcessorError(format!(
766 "Failed to stop container: {}",
767 e
768 ))
769 })?;
770 }
771 ProducerOperation::Remove => {
772 docker
773 .remove_container(&container_id, None)
774 .await
775 .map_err(|e| {
776 CamelError::ProcessorError(format!(
777 "Failed to remove container: {}",
778 e
779 ))
780 })?;
781 untrack_container(&container_id);
782 }
783 _ => {}
784 }
785
786 exchange.input.set_header(
788 HEADER_ACTION_RESULT,
789 serde_json::Value::String("success".to_string()),
790 );
791 }
792 }
793
794 Ok(exchange)
795 })
796 }
797}
798
799pub struct ContainerConsumer {
804 config: ContainerConfig,
805}
806
807#[async_trait]
808impl Consumer for ContainerConsumer {
809 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
810 match self.config.operation.as_str() {
811 "events" => self.start_events_consumer(context).await,
812 "logs" => self.start_logs_consumer(context).await,
813 _ => Err(CamelError::EndpointCreationFailed(format!(
814 "Consumer only supports 'events' or 'logs' operations, got '{}'",
815 self.config.operation
816 ))),
817 }
818 }
819
820 async fn stop(&mut self) -> Result<(), CamelError> {
821 Ok(())
822 }
823
824 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
825 camel_component::ConcurrencyModel::Concurrent { max: None }
826 }
827}
828
829impl ContainerConsumer {
830 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
831 use futures::StreamExt;
832
833 loop {
834 if context.is_cancelled() {
835 tracing::info!("Container events consumer shutting down");
836 return Ok(());
837 }
838
839 let docker = match self.config.connect_docker().await {
840 Ok(d) => d,
841 Err(e) => {
842 tracing::error!(
843 "Consumer failed to connect to docker: {}. Retrying in 5s...",
844 e
845 );
846 tokio::select! {
847 _ = context.cancelled() => {
848 tracing::info!("Container events consumer shutting down");
849 return Ok(());
850 }
851 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
852 }
853 continue;
854 }
855 };
856
857 let mut event_stream = docker.events::<String>(None);
858
859 loop {
860 tokio::select! {
861 _ = context.cancelled() => {
862 tracing::info!("Container events consumer shutting down");
863 return Ok(());
864 }
865
866 msg = event_stream.next() => {
867 match msg {
868 Some(Ok(event)) => {
869 let formatted = format_docker_event(&event);
870 let message = Message::new(Body::Text(formatted));
871 let exchange = Exchange::new(message);
872
873 if let Err(e) = context.send(exchange).await {
874 tracing::error!("Failed to send exchange: {:?}", e);
875 break;
876 }
877 }
878 Some(Err(e)) => {
879 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
880 break;
881 }
882 None => {
883 tracing::info!("Docker event stream ended. Reconnecting...");
884 break;
885 }
886 }
887 }
888 }
889 }
890
891 tokio::select! {
892 _ = context.cancelled() => {
893 tracing::info!("Container events consumer shutting down");
894 return Ok(());
895 }
896 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
897 }
898 }
899 }
900
901 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
902 use futures::StreamExt;
903
904 let container_id = self.config.container_id.clone().ok_or_else(|| {
905 CamelError::EndpointCreationFailed(
906 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
907 .to_string(),
908 )
909 })?;
910
911 loop {
912 if context.is_cancelled() {
913 tracing::info!("Container logs consumer shutting down");
914 return Ok(());
915 }
916
917 let docker = match self.config.connect_docker().await {
918 Ok(d) => d,
919 Err(e) => {
920 tracing::error!(
921 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
922 e
923 );
924 tokio::select! {
925 _ = context.cancelled() => {
926 tracing::info!("Container logs consumer shutting down");
927 return Ok(());
928 }
929 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
930 }
931 continue;
932 }
933 };
934
935 let tail = self
936 .config
937 .tail
938 .clone()
939 .unwrap_or_else(|| "all".to_string());
940
941 let options = bollard::container::LogsOptions::<String> {
942 follow: self.config.follow,
943 stdout: true,
944 stderr: true,
945 timestamps: self.config.timestamps,
946 tail,
947 ..Default::default()
948 };
949
950 let mut log_stream = docker.logs(&container_id, Some(options));
951 let container_id_header = container_id.clone();
952
953 loop {
954 tokio::select! {
955 _ = context.cancelled() => {
956 tracing::info!("Container logs consumer shutting down");
957 return Ok(());
958 }
959
960 msg = log_stream.next() => {
961 match msg {
962 Some(Ok(log_output)) => {
963 let (stream_type, content) = match log_output {
964 bollard::container::LogOutput::StdOut { message } => {
965 ("stdout", String::from_utf8_lossy(&message).into_owned())
966 }
967 bollard::container::LogOutput::StdErr { message } => {
968 ("stderr", String::from_utf8_lossy(&message).into_owned())
969 }
970 bollard::container::LogOutput::Console { message } => {
971 ("console", String::from_utf8_lossy(&message).into_owned())
972 }
973 bollard::container::LogOutput::StdIn { message } => {
974 ("stdin", String::from_utf8_lossy(&message).into_owned())
975 }
976 };
977
978 let content = content.trim_end();
979 if content.is_empty() {
980 continue;
981 }
982
983 let mut message = Message::new(Body::Text(content.to_string()));
984 message.set_header(
985 HEADER_CONTAINER_ID,
986 serde_json::Value::String(container_id_header.clone()),
987 );
988 message.set_header(
989 HEADER_LOG_STREAM,
990 serde_json::Value::String(stream_type.to_string()),
991 );
992
993 if self.config.timestamps
994 && let Some(ts) = extract_timestamp(content) {
995 message.set_header(
996 HEADER_LOG_TIMESTAMP,
997 serde_json::Value::String(ts),
998 );
999 }
1000
1001 let exchange = Exchange::new(message);
1002
1003 if let Err(e) = context.send(exchange).await {
1004 tracing::error!("Failed to send log exchange: {:?}", e);
1005 break;
1006 }
1007 }
1008 Some(Err(e)) => {
1009 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
1010 break;
1011 }
1012 None => {
1013 if self.config.follow {
1014 tracing::info!("Docker log stream ended. Reconnecting...");
1015 break;
1016 } else {
1017 tracing::info!("Container logs consumer finished (follow=false)");
1018 return Ok(());
1019 }
1020 }
1021 }
1022 }
1023 }
1024 }
1025
1026 tokio::select! {
1027 _ = context.cancelled() => {
1028 tracing::info!("Container logs consumer shutting down");
1029 return Ok(());
1030 }
1031 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1032 }
1033 }
1034 }
1035}
1036
1037fn extract_timestamp(log_line: &str) -> Option<String> {
1038 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1039 if parts.len() > 1 && parts[0].contains('T') {
1040 Some(parts[0].to_string())
1041 } else {
1042 None
1043 }
1044}
1045
1046pub struct ContainerComponent {
1054 config: Option<ContainerGlobalConfig>,
1055}
1056
1057impl ContainerComponent {
1058 pub fn new() -> Self {
1060 Self { config: None }
1061 }
1062
1063 pub fn with_config(config: ContainerGlobalConfig) -> Self {
1065 Self {
1066 config: Some(config),
1067 }
1068 }
1069
1070 pub fn with_optional_config(config: Option<ContainerGlobalConfig>) -> Self {
1072 Self { config }
1073 }
1074}
1075
1076impl Default for ContainerComponent {
1077 fn default() -> Self {
1078 Self::new()
1079 }
1080}
1081
1082impl Component for ContainerComponent {
1083 fn scheme(&self) -> &str {
1084 "container"
1085 }
1086
1087 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1088 let mut config = ContainerConfig::from_uri(uri)?;
1089 if let Some(ref global) = self.config {
1091 config.apply_global_defaults(global);
1092 }
1093 Ok(Box::new(ContainerEndpoint {
1094 uri: uri.to_string(),
1095 config,
1096 }))
1097 }
1098}
1099
1100pub struct ContainerEndpoint {
1105 uri: String,
1106 config: ContainerConfig,
1107}
1108
1109impl ContainerEndpoint {
1110 pub fn docker_host(&self) -> Option<&str> {
1113 self.config.host.as_deref()
1114 }
1115}
1116
1117impl Endpoint for ContainerEndpoint {
1118 fn uri(&self) -> &str {
1119 &self.uri
1120 }
1121
1122 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1123 Ok(Box::new(ContainerConsumer {
1124 config: self.config.clone(),
1125 }))
1126 }
1127
1128 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1129 let docker = self.config.connect_docker_client()?;
1130 Ok(BoxProcessor::new(ContainerProducer {
1131 config: self.config.clone(),
1132 docker,
1133 }))
1134 }
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139 use super::*;
1140
1141 #[test]
1142 fn test_container_config() {
1143 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1144 assert_eq!(config.operation, "run");
1145 assert_eq!(config.image.as_deref(), Some("alpine"));
1146 assert!(config.host.is_none());
1148 }
1149
1150 #[test]
1151 fn test_global_config_applied_to_endpoint() {
1152 let global =
1155 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1156 let mut config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1157 assert!(
1158 config.host.is_none(),
1159 "URI without ?host= should leave host as None"
1160 );
1161 config.apply_global_defaults(&global);
1162 assert_eq!(
1163 config.host.as_deref(),
1164 Some("unix:///custom/docker.sock"),
1165 "global docker_host must be applied when URI did not set host"
1166 );
1167 }
1168
1169 #[test]
1170 fn test_uri_param_wins_over_global_config() {
1171 let global =
1173 ContainerGlobalConfig::default().with_docker_host("unix:///custom/docker.sock");
1174 let mut config =
1175 ContainerConfig::from_uri("container:run?image=alpine&host=unix:///override.sock")
1176 .unwrap();
1177 assert_eq!(
1178 config.host.as_deref(),
1179 Some("unix:///override.sock"),
1180 "URI-set host should be parsed correctly"
1181 );
1182 config.apply_global_defaults(&global);
1183 assert_eq!(
1184 config.host.as_deref(),
1185 Some("unix:///override.sock"),
1186 "global config must NOT override a host already set by URI"
1187 );
1188 }
1189
1190 #[test]
1191 fn test_container_config_parses_name() {
1192 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1193 assert_eq!(config.name.as_deref(), Some("my-container"));
1194 }
1195
1196 #[test]
1197 fn test_parse_producer_operation_known() {
1198 assert_eq!(
1199 parse_producer_operation("list").unwrap(),
1200 ProducerOperation::List
1201 );
1202 assert_eq!(
1203 parse_producer_operation("run").unwrap(),
1204 ProducerOperation::Run
1205 );
1206 assert_eq!(
1207 parse_producer_operation("start").unwrap(),
1208 ProducerOperation::Start
1209 );
1210 assert_eq!(
1211 parse_producer_operation("stop").unwrap(),
1212 ProducerOperation::Stop
1213 );
1214 assert_eq!(
1215 parse_producer_operation("remove").unwrap(),
1216 ProducerOperation::Remove
1217 );
1218 }
1219
1220 #[test]
1221 fn test_parse_producer_operation_unknown() {
1222 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1223 match err {
1224 CamelError::ProcessorError(msg) => {
1225 assert!(
1226 msg.contains("Unknown container operation"),
1227 "Unexpected error message: {}",
1228 msg
1229 );
1230 }
1231 _ => panic!("Expected ProcessorError for unknown operation"),
1232 }
1233 }
1234
1235 #[test]
1236 fn test_resolve_container_name_header_overrides_config() {
1237 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1238 let mut exchange = Exchange::new(Message::new(""));
1239 exchange.input.set_header(
1240 HEADER_CONTAINER_NAME,
1241 serde_json::Value::String("header-name".to_string()),
1242 );
1243
1244 let resolved = resolve_container_name(&exchange, &config);
1245 assert_eq!(resolved.as_deref(), Some("header-name"));
1246 }
1247
1248 #[test]
1249 fn test_container_config_rejects_tcp_host() {
1250 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1251 let err = config.connect_docker_client().unwrap_err();
1252 match err {
1253 CamelError::ProcessorError(msg) => {
1254 assert!(
1255 msg.to_lowercase().contains("tcp"),
1256 "Expected TCP scheme error, got: {}",
1257 msg
1258 );
1259 }
1260 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1261 }
1262 }
1263
1264 #[tokio::test]
1265 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1266 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1267 let remove_called_clone = remove_called.clone();
1268
1269 let result = run_container_with_cleanup(
1270 || async { Ok("container-123".to_string()) },
1271 |_id| async move {
1272 Err(CamelError::ProcessorError(
1273 "Failed to start container".to_string(),
1274 ))
1275 },
1276 move |_id| {
1277 let remove_called_inner = remove_called_clone.clone();
1278 async move {
1279 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1280 Ok(())
1281 }
1282 },
1283 )
1284 .await;
1285
1286 assert!(result.is_err(), "Expected start failure to bubble up");
1287 assert!(
1288 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1289 "Expected cleanup to remove container"
1290 );
1291 }
1292
1293 #[test]
1294 fn test_container_component_creates_endpoint() {
1295 let component = ContainerComponent::new();
1296 assert_eq!(component.scheme(), "container");
1297 let endpoint = component
1298 .create_endpoint("container:run?image=alpine")
1299 .unwrap();
1300 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1301 }
1302
1303 #[test]
1304 fn test_container_config_parses_ports() {
1305 let config =
1306 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1307 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1308 }
1309
1310 #[test]
1311 fn test_container_config_parses_env() {
1312 let config =
1313 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1314 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1315 }
1316
1317 #[test]
1318 fn test_container_config_parses_logs_options() {
1319 let config = ContainerConfig::from_uri(
1320 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1321 )
1322 .unwrap();
1323 assert_eq!(config.operation, "logs");
1324 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1325 assert!(config.follow);
1326 assert!(config.timestamps);
1327 assert_eq!(config.tail.as_deref(), Some("100"));
1328 }
1329
1330 #[test]
1331 fn test_container_config_logs_defaults() {
1332 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1333 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1337
1338 #[test]
1339 fn test_parse_ports_single() {
1340 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1341 let (exposed, bindings) = config.parse_ports().unwrap();
1342
1343 assert!(exposed.contains_key("80/tcp"));
1344 assert!(bindings.contains_key("80/tcp"));
1345
1346 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1347 assert_eq!(binding.len(), 1);
1348 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1349 }
1350
1351 #[test]
1352 fn test_parse_ports_multiple() {
1353 let config =
1354 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1355 let (exposed, bindings) = config.parse_ports().unwrap();
1356
1357 assert!(exposed.contains_key("80/tcp"));
1358 assert!(exposed.contains_key("443/tcp"));
1359 assert_eq!(bindings.len(), 2);
1360 }
1361
1362 #[test]
1363 fn test_parse_ports_with_protocol() {
1364 let config =
1365 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1366 .unwrap();
1367 let (exposed, _bindings) = config.parse_ports().unwrap();
1368
1369 assert!(exposed.contains_key("80/tcp"));
1370 assert!(exposed.contains_key("53/udp"));
1371 }
1372
1373 #[test]
1374 fn test_parse_ports_none() {
1375 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1376 assert!(config.parse_ports().is_none());
1377 }
1378
1379 #[test]
1380 fn test_parse_env_single() {
1381 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1382 let env = config.parse_env().unwrap();
1383
1384 assert_eq!(env.len(), 1);
1385 assert_eq!(env[0], "FOO=bar");
1386 }
1387
1388 #[test]
1389 fn test_parse_env_multiple() {
1390 let config =
1391 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1392 .unwrap();
1393 let env = config.parse_env().unwrap();
1394
1395 assert_eq!(env.len(), 3);
1396 assert!(env.contains(&"FOO=bar".to_string()));
1397 assert!(env.contains(&"BAZ=qux".to_string()));
1398 assert!(env.contains(&"NUM=123".to_string()));
1399 }
1400
1401 #[test]
1402 fn test_parse_env_none() {
1403 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1404 assert!(config.parse_env().is_none());
1405 }
1406
1407 use camel_api::Message;
1408 use std::sync::Arc;
1409
1410 #[tokio::test]
1411 async fn test_container_producer_resolves_operation_from_header() {
1412 let docker = match Docker::connect_with_local_defaults() {
1414 Ok(d) => d,
1415 Err(_) => {
1416 eprintln!("Skipping test: Could not connect to Docker daemon");
1417 return;
1418 }
1419 };
1420
1421 if docker.ping().await.is_err() {
1422 eprintln!("Skipping test: Docker daemon not responding to ping");
1423 return;
1424 }
1425
1426 let component = ContainerComponent::new();
1427 let endpoint = component.create_endpoint("container:run").unwrap();
1428
1429 let ctx = ProducerContext::new();
1430 let mut producer = endpoint.create_producer(&ctx).unwrap();
1431
1432 let mut exchange = Exchange::new(Message::new(""));
1433 exchange
1434 .input
1435 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1436
1437 use tower::ServiceExt;
1438 let result = producer
1439 .ready()
1440 .await
1441 .unwrap()
1442 .call(exchange)
1443 .await
1444 .unwrap();
1445
1446 assert_eq!(
1448 result
1449 .input
1450 .header(HEADER_ACTION_RESULT)
1451 .map(|v| v.as_str().unwrap()),
1452 Some("success")
1453 );
1454 }
1455
1456 #[tokio::test]
1457 async fn test_container_producer_connection_error_on_invalid_host() {
1458 let component = ContainerComponent::new();
1460 let endpoint = component
1461 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1462 .unwrap();
1463
1464 let ctx = ProducerContext::new();
1465 let result = endpoint.create_producer(&ctx);
1466
1467 assert!(
1469 result.is_err(),
1470 "Expected error when connecting to invalid host"
1471 );
1472 let err = result.unwrap_err();
1473 match &err {
1474 CamelError::ProcessorError(msg) => {
1475 assert!(
1476 msg.to_lowercase().contains("connection")
1477 || msg.to_lowercase().contains("connect")
1478 || msg.to_lowercase().contains("socket")
1479 || msg.contains("docker"),
1480 "Error message should indicate connection failure, got: {}",
1481 msg
1482 );
1483 }
1484 _ => panic!("Expected ProcessorError, got: {:?}", err),
1485 }
1486 }
1487
1488 #[tokio::test]
1490 async fn test_container_producer_lifecycle_operations_missing_id() {
1491 let docker = match Docker::connect_with_local_defaults() {
1493 Ok(d) => d,
1494 Err(_) => {
1495 eprintln!("Skipping test: Could not connect to Docker daemon");
1496 return;
1497 }
1498 };
1499
1500 if docker.ping().await.is_err() {
1501 eprintln!("Skipping test: Docker daemon not responding to ping");
1502 return;
1503 }
1504
1505 let component = ContainerComponent::new();
1506 let endpoint = component.create_endpoint("container:start").unwrap();
1507 let ctx = ProducerContext::new();
1508 let mut producer = endpoint.create_producer(&ctx).unwrap();
1509
1510 for operation in ["start", "stop", "remove"] {
1512 let mut exchange = Exchange::new(Message::new(""));
1513 exchange.input.set_header(
1514 HEADER_ACTION,
1515 serde_json::Value::String(operation.to_string()),
1516 );
1517 use tower::ServiceExt;
1520 let result = producer.ready().await.unwrap().call(exchange).await;
1521
1522 assert!(
1523 result.is_err(),
1524 "Expected error for {} operation without CamelContainerId",
1525 operation
1526 );
1527 let err = result.unwrap_err();
1528 match &err {
1529 CamelError::ProcessorError(msg) => {
1530 assert!(
1531 msg.contains(HEADER_CONTAINER_ID),
1532 "Error message should mention {}, got: {}",
1533 HEADER_CONTAINER_ID,
1534 msg
1535 );
1536 }
1537 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1538 }
1539 }
1540 }
1541
1542 #[tokio::test]
1544 async fn test_container_producer_stop_nonexistent() {
1545 let docker = match Docker::connect_with_local_defaults() {
1547 Ok(d) => d,
1548 Err(_) => {
1549 eprintln!("Skipping test: Could not connect to Docker daemon");
1550 return;
1551 }
1552 };
1553
1554 if docker.ping().await.is_err() {
1555 eprintln!("Skipping test: Docker daemon not responding to ping");
1556 return;
1557 }
1558
1559 let component = ContainerComponent::new();
1560 let endpoint = component.create_endpoint("container:stop").unwrap();
1561 let ctx = ProducerContext::new();
1562 let mut producer = endpoint.create_producer(&ctx).unwrap();
1563
1564 let mut exchange = Exchange::new(Message::new(""));
1565 exchange
1566 .input
1567 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1568 exchange.input.set_header(
1569 HEADER_CONTAINER_ID,
1570 serde_json::Value::String("nonexistent-container-123".into()),
1571 );
1572
1573 use tower::ServiceExt;
1574 let result = producer.ready().await.unwrap().call(exchange).await;
1575
1576 assert!(
1577 result.is_err(),
1578 "Expected error when stopping nonexistent container"
1579 );
1580 let err = result.unwrap_err();
1581 match &err {
1582 CamelError::ProcessorError(msg) => {
1583 assert!(
1585 msg.to_lowercase().contains("no such container")
1586 || msg.to_lowercase().contains("not found")
1587 || msg.contains("404"),
1588 "Error message should indicate container not found, got: {}",
1589 msg
1590 );
1591 }
1592 _ => panic!("Expected ProcessorError, got: {:?}", err),
1593 }
1594 }
1595
1596 #[tokio::test]
1598 async fn test_container_producer_run_missing_image() {
1599 let docker = match Docker::connect_with_local_defaults() {
1601 Ok(d) => d,
1602 Err(_) => {
1603 eprintln!("Skipping test: Could not connect to Docker daemon");
1604 return;
1605 }
1606 };
1607
1608 if docker.ping().await.is_err() {
1609 eprintln!("Skipping test: Docker daemon not responding to ping");
1610 return;
1611 }
1612
1613 let component = ContainerComponent::new();
1615 let endpoint = component.create_endpoint("container:run").unwrap();
1616 let ctx = ProducerContext::new();
1617 let mut producer = endpoint.create_producer(&ctx).unwrap();
1618
1619 let mut exchange = Exchange::new(Message::new(""));
1620 exchange
1621 .input
1622 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1623 use tower::ServiceExt;
1626 let result = producer.ready().await.unwrap().call(exchange).await;
1627
1628 assert!(
1629 result.is_err(),
1630 "Expected error for run operation without image"
1631 );
1632 let err = result.unwrap_err();
1633 match &err {
1634 CamelError::ProcessorError(msg) => {
1635 assert!(
1636 msg.to_lowercase().contains("image"),
1637 "Error message should mention 'image', got: {}",
1638 msg
1639 );
1640 }
1641 _ => panic!("Expected ProcessorError, got: {:?}", err),
1642 }
1643 }
1644
1645 #[tokio::test]
1647 async fn test_container_producer_run_image_from_header() {
1648 let docker = match Docker::connect_with_local_defaults() {
1650 Ok(d) => d,
1651 Err(_) => {
1652 eprintln!("Skipping test: Could not connect to Docker daemon");
1653 return;
1654 }
1655 };
1656
1657 if docker.ping().await.is_err() {
1658 eprintln!("Skipping test: Docker daemon not responding to ping");
1659 return;
1660 }
1661
1662 let component = ContainerComponent::new();
1664 let endpoint = component.create_endpoint("container:run").unwrap();
1665 let ctx = ProducerContext::new();
1666 let mut producer = endpoint.create_producer(&ctx).unwrap();
1667
1668 let mut exchange = Exchange::new(Message::new(""));
1669 exchange
1670 .input
1671 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1672 exchange.input.set_header(
1674 HEADER_IMAGE,
1675 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1676 );
1677
1678 use tower::ServiceExt;
1679 let result = producer.ready().await.unwrap().call(exchange).await;
1680
1681 assert!(
1683 result.is_err(),
1684 "Expected error when running container with nonexistent image"
1685 );
1686 let err = result.unwrap_err();
1687 match &err {
1688 CamelError::ProcessorError(msg) => {
1689 assert!(
1691 msg.to_lowercase().contains("no such image")
1692 || msg.to_lowercase().contains("not found")
1693 || msg.to_lowercase().contains("image")
1694 || msg.to_lowercase().contains("pull")
1695 || msg.contains("404"),
1696 "Error message should indicate image issue, got: {}",
1697 msg
1698 );
1699 }
1700 _ => panic!("Expected ProcessorError, got: {:?}", err),
1701 }
1702 }
1703
1704 #[tokio::test]
1707 async fn test_container_producer_run_alpine_container() {
1708 let docker = match Docker::connect_with_local_defaults() {
1709 Ok(d) => d,
1710 Err(_) => {
1711 eprintln!("Skipping test: Could not connect to Docker daemon");
1712 return;
1713 }
1714 };
1715
1716 if docker.ping().await.is_err() {
1717 eprintln!("Skipping test: Docker daemon not responding to ping");
1718 return;
1719 }
1720
1721 let images = docker.list_images::<&str>(None).await.unwrap();
1723 let has_alpine = images
1724 .iter()
1725 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1726
1727 if !has_alpine {
1728 eprintln!("Pulling alpine:latest image...");
1729 let mut stream = docker.create_image(
1730 Some(bollard::image::CreateImageOptions {
1731 from_image: "alpine:latest",
1732 ..Default::default()
1733 }),
1734 None,
1735 None,
1736 );
1737
1738 use futures::StreamExt;
1739 while let Some(_item) = stream.next().await {
1740 }
1742 eprintln!("Image pulled successfully");
1743 }
1744
1745 let component = ContainerComponent::new();
1747 let endpoint = component.create_endpoint("container:run").unwrap();
1748 let ctx = ProducerContext::new();
1749 let mut producer = endpoint.create_producer(&ctx).unwrap();
1750
1751 let timestamp = std::time::SystemTime::now()
1753 .duration_since(std::time::UNIX_EPOCH)
1754 .unwrap()
1755 .as_millis();
1756 let container_name = format!("test-rust-camel-{}", timestamp);
1757 let mut exchange = Exchange::new(Message::new(""));
1758 exchange.input.set_header(
1759 HEADER_IMAGE,
1760 serde_json::Value::String("alpine:latest".into()),
1761 );
1762 exchange.input.set_header(
1763 HEADER_CONTAINER_NAME,
1764 serde_json::Value::String(container_name.clone()),
1765 );
1766
1767 use tower::ServiceExt;
1768 let result = producer
1769 .ready()
1770 .await
1771 .unwrap()
1772 .call(exchange)
1773 .await
1774 .expect("Container run should succeed");
1775
1776 let container_id = result
1778 .input
1779 .header(HEADER_CONTAINER_ID)
1780 .and_then(|v| v.as_str().map(|s| s.to_string()))
1781 .expect("Expected container ID header");
1782 assert!(!container_id.is_empty(), "Container ID should not be empty");
1783
1784 assert_eq!(
1786 result
1787 .input
1788 .header(HEADER_ACTION_RESULT)
1789 .and_then(|v| v.as_str()),
1790 Some("success")
1791 );
1792
1793 let inspect = docker
1795 .inspect_container(&container_id, None)
1796 .await
1797 .expect("Container should exist");
1798 assert_eq!(
1799 inspect.id.as_ref().map(|s| s.as_str()),
1800 Some(container_id.as_str())
1801 );
1802
1803 docker
1805 .remove_container(
1806 &container_id,
1807 Some(bollard::container::RemoveContainerOptions {
1808 force: true,
1809 ..Default::default()
1810 }),
1811 )
1812 .await
1813 .ok();
1814
1815 eprintln!("✅ Container {} created and cleaned up", container_id);
1816 }
1817
1818 #[tokio::test]
1820 async fn test_container_consumer_unsupported_operation() {
1821 use tokio::sync::mpsc;
1822
1823 let component = ContainerComponent::new();
1824 let endpoint = component.create_endpoint("container:run").unwrap();
1825 let mut consumer = endpoint.create_consumer().unwrap();
1826
1827 let (tx, _rx) = mpsc::channel(16);
1829 let cancel_token = tokio_util::sync::CancellationToken::new();
1830 let context = ConsumerContext::new(tx, cancel_token);
1831
1832 let result = consumer.start(context).await;
1833
1834 assert!(
1836 result.is_err(),
1837 "Expected error for unsupported consumer operation"
1838 );
1839 let err = result.unwrap_err();
1840 match &err {
1841 CamelError::EndpointCreationFailed(msg) => {
1842 assert!(
1843 msg.contains("Consumer only supports 'events' or 'logs'"),
1844 "Error message should mention events or logs support, got: {}",
1845 msg
1846 );
1847 }
1848 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1849 }
1850 }
1851
1852 #[test]
1853 fn test_container_consumer_concurrency_model_is_concurrent() {
1854 let consumer = ContainerConsumer {
1855 config: ContainerConfig::from_uri("container:events").unwrap(),
1856 };
1857
1858 assert_eq!(
1859 consumer.concurrency_model(),
1860 camel_component::ConcurrencyModel::Concurrent { max: None }
1861 );
1862 }
1863
1864 #[tokio::test]
1868 async fn test_container_consumer_cancellation() {
1869 use std::sync::atomic::{AtomicBool, Ordering};
1870 use tokio::sync::mpsc;
1871
1872 let docker = match Docker::connect_with_local_defaults() {
1874 Ok(d) => d,
1875 Err(_) => {
1876 eprintln!("Skipping test: Could not connect to Docker daemon");
1877 return;
1878 }
1879 };
1880
1881 if docker.ping().await.is_err() {
1882 eprintln!("Skipping test: Docker daemon not responding to ping");
1883 return;
1884 }
1885
1886 let component = ContainerComponent::new();
1887 let endpoint = component.create_endpoint("container:events").unwrap();
1888 let mut consumer = endpoint.create_consumer().unwrap();
1889
1890 let (tx, _rx) = mpsc::channel(16);
1892 let cancel_token = tokio_util::sync::CancellationToken::new();
1893 let context = ConsumerContext::new(tx, cancel_token.clone());
1894
1895 let completed = Arc::new(AtomicBool::new(false));
1897 let completed_clone = completed.clone();
1898
1899 let handle = tokio::spawn(async move {
1901 let result = consumer.start(context).await;
1902 completed_clone.store(true, Ordering::SeqCst);
1904 result
1905 });
1906
1907 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1909
1910 assert!(
1912 !completed.load(Ordering::SeqCst),
1913 "Consumer should still be running before cancellation"
1914 );
1915
1916 cancel_token.cancel();
1918
1919 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1921
1922 assert!(
1924 result.is_ok(),
1925 "Consumer should gracefully shut down after cancellation"
1926 );
1927
1928 assert!(
1930 completed.load(Ordering::SeqCst),
1931 "Consumer should have completed after cancellation"
1932 );
1933 }
1934
1935 #[tokio::test]
1939 async fn test_container_producer_list_containers() {
1940 let docker = match Docker::connect_with_local_defaults() {
1943 Ok(d) => d,
1944 Err(_) => {
1945 eprintln!("Skipping test: Could not connect to Docker daemon");
1946 return;
1947 }
1948 };
1949
1950 if docker.ping().await.is_err() {
1951 eprintln!("Skipping test: Docker daemon not responding to ping");
1952 return;
1953 }
1954
1955 let component = ContainerComponent::new();
1957 let endpoint = component.create_endpoint("container:list").unwrap();
1958
1959 let ctx = ProducerContext::new();
1960 let mut producer = endpoint.create_producer(&ctx).unwrap();
1961
1962 let mut exchange = Exchange::new(Message::new(""));
1964 exchange
1965 .input
1966 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1967
1968 use tower::ServiceExt;
1970 let result = producer
1971 .ready()
1972 .await
1973 .unwrap()
1974 .call(exchange)
1975 .await
1976 .expect("Producer should succeed when Docker is available");
1977
1978 match &result.input.body {
1981 camel_api::Body::Json(json_value) => {
1982 assert!(
1983 json_value.is_array(),
1984 "Expected input body to be a JSON array, got: {:?}",
1985 json_value
1986 );
1987 }
1988 other => panic!("Expected Body::Json with array, got: {:?}", other),
1989 }
1990 }
1991}