1use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25fn track_container(id: String) {
27 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28 tracker.insert(id);
29 }
30}
31
32fn untrack_container(id: &str) {
34 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35 tracker.remove(id);
36 }
37}
38
39pub async fn cleanup_tracked_containers() {
41 let ids: Vec<String> = {
42 match CONTAINER_TRACKER.lock() {
43 Ok(tracker) => tracker.iter().cloned().collect(),
44 Err(_) => return,
45 }
46 };
47
48 if ids.is_empty() {
49 return;
50 }
51
52 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54 let docker = match Docker::connect_with_unix_defaults() {
55 Ok(d) => d,
56 Err(e) => {
57 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58 return;
59 }
60 };
61
62 for id in ids {
63 match docker
64 .remove_container(
65 &id,
66 Some(bollard::container::RemoveContainerOptions {
67 force: true,
68 ..Default::default()
69 }),
70 )
71 .await
72 {
73 Ok(_) => {
74 tracing::debug!("Cleaned up container {}", id);
75 untrack_container(&id);
76 }
77 Err(e) => {
78 tracing::warn!("Failed to cleanup container {}: {}", id, e);
79 }
80 }
81 }
82}
83
84const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110#[derive(Debug, Clone)]
115pub struct ContainerConfig {
116 pub operation: String,
118 pub image: Option<String>,
120 pub name: Option<String>,
122 pub host: Option<String>,
124 pub cmd: Option<String>,
126 pub ports: Option<String>,
128 pub env: Option<String>,
130 pub network: Option<String>,
132 pub container_id: Option<String>,
134 pub follow: bool,
136 pub timestamps: bool,
138 pub tail: Option<String>,
140 pub auto_pull: bool,
142 pub auto_remove: bool,
144}
145
146impl ContainerConfig {
147 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
155 let parts = parse_uri(uri)?;
156 if parts.scheme != "container" {
157 return Err(CamelError::InvalidUri(format!(
158 "expected scheme 'container', got '{}'",
159 parts.scheme
160 )));
161 }
162
163 let image = parts.params.get("image").cloned();
164 let name = parts.params.get("name").cloned();
165 let cmd = parts.params.get("cmd").cloned();
166 let ports = parts.params.get("ports").cloned();
167 let env = parts.params.get("env").cloned();
168 let network = parts.params.get("network").cloned();
169 let container_id = parts.params.get("containerId").cloned();
170 let follow = parts
171 .params
172 .get("follow")
173 .map(|v| v.eq_ignore_ascii_case("true"))
174 .unwrap_or(true);
175 let timestamps = parts
176 .params
177 .get("timestamps")
178 .map(|v| v.eq_ignore_ascii_case("true"))
179 .unwrap_or(false);
180 let tail = parts.params.get("tail").cloned();
181 let auto_pull = parts
182 .params
183 .get("autoPull")
184 .map(|v| v.eq_ignore_ascii_case("true"))
185 .unwrap_or(true);
186 let auto_remove = parts
187 .params
188 .get("autoRemove")
189 .map(|v| v.eq_ignore_ascii_case("true"))
190 .unwrap_or(true);
191 let host = parts
192 .params
193 .get("host")
194 .cloned()
195 .or_else(|| Some("unix:///var/run/docker.sock".to_string()));
196
197 Ok(Self {
198 operation: parts.path,
199 image,
200 name,
201 host,
202 cmd,
203 ports,
204 env,
205 network,
206 container_id,
207 follow,
208 timestamps,
209 tail,
210 auto_pull,
211 auto_remove,
212 })
213 }
214
215 fn docker_socket_path(&self) -> Result<&str, CamelError> {
216 let host = self
217 .host
218 .as_deref()
219 .unwrap_or("unix:///var/run/docker.sock");
220
221 if host.starts_with("unix://") {
222 return Ok(host.trim_start_matches("unix://"));
223 }
224
225 if host.contains("://") {
226 return Err(CamelError::ProcessorError(format!(
227 "Unsupported Docker host scheme: {} (only unix:// is supported)",
228 host
229 )));
230 }
231
232 Ok(host)
233 }
234
235 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
236 let socket_path = self.docker_socket_path()?;
237 Docker::connect_with_unix(
238 socket_path,
239 DOCKER_CONNECT_TIMEOUT_SECS,
240 bollard::API_DEFAULT_VERSION,
241 )
242 .map_err(|e| {
243 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
244 })
245 }
246
247 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
255 let docker = self.connect_docker_client()?;
256 docker
257 .ping()
258 .await
259 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
260 Ok(docker)
261 }
262
263 #[allow(clippy::type_complexity)]
264 fn parse_ports(
265 &self,
266 ) -> Option<(
267 HashMap<String, HashMap<(), ()>>,
268 HashMap<String, Option<Vec<PortBinding>>>,
269 )> {
270 let ports_str = self.ports.as_ref()?;
271
272 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
273 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
274
275 for mapping in ports_str.split(',') {
276 let mapping = mapping.trim();
277 if mapping.is_empty() {
278 continue;
279 }
280
281 let (host_port, container_spec) = mapping.split_once(':')?;
282
283 let (container_port, protocol) = if container_spec.contains('/') {
284 let parts: Vec<&str> = container_spec.split('/').collect();
285 (parts[0], parts[1])
286 } else {
287 (container_spec, "tcp")
288 };
289
290 let container_key = format!("{}/{}", container_port, protocol);
291
292 exposed_ports.insert(container_key.clone(), HashMap::new());
293
294 port_bindings.insert(
295 container_key,
296 Some(vec![PortBinding {
297 host_ip: None,
298 host_port: Some(host_port.to_string()),
299 }]),
300 );
301 }
302
303 if exposed_ports.is_empty() {
304 None
305 } else {
306 Some((exposed_ports, port_bindings))
307 }
308 }
309
310 fn parse_env(&self) -> Option<Vec<String>> {
311 let env_str = self.env.as_ref()?;
312
313 let env_vars: Vec<String> = env_str
314 .split(',')
315 .map(|s| s.trim().to_string())
316 .filter(|s| !s.is_empty())
317 .collect();
318
319 if env_vars.is_empty() {
320 None
321 } else {
322 Some(env_vars)
323 }
324 }
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
328enum ProducerOperation {
329 List,
330 Run,
331 Start,
332 Stop,
333 Remove,
334}
335
336fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
337 match operation {
338 "list" => Ok(ProducerOperation::List),
339 "run" => Ok(ProducerOperation::Run),
340 "start" => Ok(ProducerOperation::Start),
341 "stop" => Ok(ProducerOperation::Stop),
342 "remove" => Ok(ProducerOperation::Remove),
343 _ => Err(CamelError::ProcessorError(format!(
344 "Unknown container operation: {}",
345 operation
346 ))),
347 }
348}
349
350fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
351 exchange
352 .input
353 .header(HEADER_CONTAINER_NAME)
354 .and_then(|v| v.as_str().map(|s| s.to_string()))
355 .or_else(|| config.name.clone())
356}
357
358async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
359 let images = docker
360 .list_images::<&str>(None)
361 .await
362 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
363
364 Ok(images.iter().any(|img| {
365 img.repo_tags
366 .iter()
367 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
368 }))
369}
370
371async fn pull_image_with_progress(
372 docker: &Docker,
373 image: &str,
374 timeout_secs: u64,
375) -> Result<(), CamelError> {
376 use futures::StreamExt;
377
378 tracing::info!("Pulling image: {}", image);
379
380 let mut stream = docker.create_image(
381 Some(bollard::image::CreateImageOptions {
382 from_image: image,
383 ..Default::default()
384 }),
385 None,
386 None,
387 );
388
389 let start = std::time::Instant::now();
390 let mut last_progress = std::time::Instant::now();
391
392 while let Some(item) = stream.next().await {
393 if start.elapsed().as_secs() > timeout_secs {
394 return Err(CamelError::ProcessorError(format!(
395 "Image pull timeout after {}s. Try manually: docker pull {}",
396 timeout_secs, image
397 )));
398 }
399
400 match item {
401 Ok(update) => {
402 if last_progress.elapsed().as_secs() >= 2 {
404 if let Some(status) = update.status {
405 tracing::debug!("Pull progress: {}", status);
406 }
407 last_progress = std::time::Instant::now();
408 }
409 }
410 Err(e) => {
411 let err_str = e.to_string().to_lowercase();
412 if err_str.contains("unauthorized") || err_str.contains("401") {
413 return Err(CamelError::ProcessorError(format!(
414 "Authentication required for image '{}'. Configure Docker credentials: docker login",
415 image
416 )));
417 }
418 if err_str.contains("not found") || err_str.contains("404") {
419 return Err(CamelError::ProcessorError(format!(
420 "Image '{}' not found in registry. Check the image name and tag",
421 image
422 )));
423 }
424 return Err(CamelError::ProcessorError(format!(
425 "Failed to pull image '{}': {}",
426 image, e
427 )));
428 }
429 }
430 }
431
432 tracing::info!("Successfully pulled image: {}", image);
433 Ok(())
434}
435
436async fn ensure_image_available(
437 docker: &Docker,
438 image: &str,
439 auto_pull: bool,
440 timeout_secs: u64,
441) -> Result<(), CamelError> {
442 if image_exists_locally(docker, image).await? {
443 tracing::debug!("Image '{}' already available locally", image);
444 return Ok(());
445 }
446
447 if !auto_pull {
448 return Err(CamelError::ProcessorError(format!(
449 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
450 image, image
451 )));
452 }
453
454 pull_image_with_progress(docker, image, timeout_secs).await
455}
456
457fn format_docker_event(event: &bollard::models::EventMessage) -> String {
458 let action = event.action.as_deref().unwrap_or("unknown");
459 let actor = event.actor.as_ref();
460
461 let container_name = actor
462 .and_then(|a| a.attributes.as_ref())
463 .and_then(|attrs| attrs.get("name"))
464 .map(|s| s.as_str())
465 .unwrap_or("unknown");
466
467 let image = actor
468 .and_then(|a| a.attributes.as_ref())
469 .and_then(|attrs| attrs.get("image"))
470 .map(|s| s.as_str())
471 .unwrap_or("");
472
473 let exit_code = actor
474 .and_then(|a| a.attributes.as_ref())
475 .and_then(|attrs| attrs.get("exitCode"))
476 .map(|s| s.as_str());
477
478 match action {
479 "create" => {
480 if image.is_empty() {
481 format!("[CREATE] Container {}", container_name)
482 } else {
483 format!("[CREATE] Container {} ({})", container_name, image)
484 }
485 }
486 "start" => format!("[START] Container {}", container_name),
487 "die" => {
488 if let Some(code) = exit_code {
489 format!("[DIE] Container {} (exit: {})", container_name, code)
490 } else {
491 format!("[DIE] Container {}", container_name)
492 }
493 }
494 "destroy" => format!("[DESTROY] Container {}", container_name),
495 "stop" => format!("[STOP] Container {}", container_name),
496 "pause" => format!("[PAUSE] Container {}", container_name),
497 "unpause" => format!("[UNPAUSE] Container {}", container_name),
498 "restart" => format!("[RESTART] Container {}", container_name),
499 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
500 }
501}
502
503async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
504 create: CreateFn,
505 start: StartFn,
506 remove: RemoveFn,
507) -> Result<String, CamelError>
508where
509 CreateFn: FnOnce() -> CreateFut,
510 CreateFut: Future<Output = Result<String, CamelError>>,
511 StartFn: FnOnce(String) -> StartFut,
512 StartFut: Future<Output = Result<(), CamelError>>,
513 RemoveFn: FnOnce(String) -> RemoveFut,
514 RemoveFut: Future<Output = Result<(), CamelError>>,
515{
516 let container_id = create().await?;
517 if let Err(start_err) = start(container_id.clone()).await {
518 if let Err(remove_err) = remove(container_id.clone()).await {
519 return Err(CamelError::ProcessorError(format!(
520 "Failed to start container: {}. Cleanup failed: {}",
521 start_err, remove_err
522 )));
523 }
524 return Err(start_err);
525 }
526
527 Ok(container_id)
528}
529
530#[derive(Clone)]
535pub struct ContainerProducer {
536 config: ContainerConfig,
537 docker: Docker,
538}
539
540impl Service<Exchange> for ContainerProducer {
541 type Response = Exchange;
542 type Error = CamelError;
543 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
544
545 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
546 Poll::Ready(Ok(()))
547 }
548
549 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
550 let config = self.config.clone();
551 let docker = self.docker.clone();
552 Box::pin(async move {
553 let operation_name = exchange
555 .input
556 .header(HEADER_ACTION)
557 .and_then(|v| v.as_str().map(|s| s.to_string()))
558 .unwrap_or_else(|| config.operation.clone());
559
560 let operation = parse_producer_operation(&operation_name)?;
561
562 match operation {
564 ProducerOperation::List => {
565 let containers = docker.list_containers::<String>(None).await.map_err(|e| {
566 CamelError::ProcessorError(format!("Failed to list containers: {}", e))
567 })?;
568
569 let json_value = serde_json::to_value(&containers).map_err(|e| {
570 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
571 })?;
572
573 exchange.input.body = Body::Json(json_value);
574 exchange.input.set_header(
575 HEADER_ACTION_RESULT,
576 serde_json::Value::String("success".to_string()),
577 );
578 }
579 ProducerOperation::Run => {
580 let image = exchange
582 .input
583 .header(HEADER_IMAGE)
584 .and_then(|v| v.as_str().map(|s| s.to_string()))
585 .or(config.image.clone())
586 .ok_or_else(|| {
587 CamelError::ProcessorError(
588 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
589 )
590 })?;
591
592 let pull_timeout = 300; ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
595 .await
596 .map_err(|e| {
597 CamelError::ProcessorError(format!(
598 "Image '{}' not available: {}",
599 image, e
600 ))
601 })?;
602
603 let container_name = resolve_container_name(&exchange, &config);
604 let container_name_ref = container_name.as_deref().unwrap_or("");
605 let cmd_parts: Option<Vec<String>> = config
606 .cmd
607 .as_ref()
608 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
609 let auto_remove = config.auto_remove;
610 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
611 let env_vars = config.parse_env();
612 let network_mode = config.network.clone();
613
614 let docker_create = docker.clone();
615 let docker_start = docker.clone();
616 let docker_remove = docker.clone();
617
618 let container_id = run_container_with_cleanup(
619 move || async move {
620 let create_options = bollard::container::CreateContainerOptions {
621 name: container_name_ref,
622 ..Default::default()
623 };
624 let container_config = bollard::container::Config::<String> {
625 image: Some(image.clone()),
626 cmd: cmd_parts,
627 env: env_vars,
628 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
629 host_config: Some(HostConfig {
630 auto_remove: Some(auto_remove),
631 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
632 network_mode,
633 ..Default::default()
634 }),
635 ..Default::default()
636 };
637
638 let create_response = docker_create
639 .create_container(Some(create_options), container_config)
640 .await
641 .map_err(|e| {
642 let err_str = e.to_string().to_lowercase();
643 if err_str.contains("409") || err_str.contains("conflict") {
644 CamelError::ProcessorError(format!(
645 "Container name '{}' already exists. Use a unique name or remove the existing container first",
646 container_name_ref
647 ))
648 } else {
649 CamelError::ProcessorError(format!(
650 "Failed to create container: {}",
651 e
652 ))
653 }
654 })?;
655
656 Ok(create_response.id)
657 },
658 move |container_id| async move {
659 docker_start
660 .start_container::<String>(&container_id, None)
661 .await
662 .map_err(|e| {
663 CamelError::ProcessorError(format!(
664 "Failed to start container: {}",
665 e
666 ))
667 })
668 },
669 move |container_id| async move {
670 docker_remove
671 .remove_container(&container_id, None)
672 .await
673 .map_err(|e| {
674 CamelError::ProcessorError(format!(
675 "Failed to remove container after start failure: {}",
676 e
677 ))
678 })
679 },
680 )
681 .await?;
682
683 track_container(container_id.clone());
684
685 exchange
686 .input
687 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
688 exchange.input.set_header(
689 HEADER_ACTION_RESULT,
690 serde_json::Value::String("success".to_string()),
691 );
692 }
693 ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
694 let container_id = exchange
696 .input
697 .header(HEADER_CONTAINER_ID)
698 .and_then(|v| v.as_str().map(|s| s.to_string()))
699 .ok_or_else(|| {
700 CamelError::ProcessorError(format!(
701 "{} header is required for {} operation",
702 HEADER_CONTAINER_ID, operation_name
703 ))
704 })?;
705
706 match operation {
707 ProducerOperation::Start => {
708 docker
709 .start_container::<String>(&container_id, None)
710 .await
711 .map_err(|e| {
712 CamelError::ProcessorError(format!(
713 "Failed to start container: {}",
714 e
715 ))
716 })?;
717 }
718 ProducerOperation::Stop => {
719 docker
720 .stop_container(&container_id, None)
721 .await
722 .map_err(|e| {
723 CamelError::ProcessorError(format!(
724 "Failed to stop container: {}",
725 e
726 ))
727 })?;
728 }
729 ProducerOperation::Remove => {
730 docker
731 .remove_container(&container_id, None)
732 .await
733 .map_err(|e| {
734 CamelError::ProcessorError(format!(
735 "Failed to remove container: {}",
736 e
737 ))
738 })?;
739 untrack_container(&container_id);
740 }
741 _ => {}
742 }
743
744 exchange.input.set_header(
746 HEADER_ACTION_RESULT,
747 serde_json::Value::String("success".to_string()),
748 );
749 }
750 }
751
752 Ok(exchange)
753 })
754 }
755}
756
757pub struct ContainerConsumer {
762 config: ContainerConfig,
763}
764
765#[async_trait]
766impl Consumer for ContainerConsumer {
767 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
768 match self.config.operation.as_str() {
769 "events" => self.start_events_consumer(context).await,
770 "logs" => self.start_logs_consumer(context).await,
771 _ => Err(CamelError::EndpointCreationFailed(format!(
772 "Consumer only supports 'events' or 'logs' operations, got '{}'",
773 self.config.operation
774 ))),
775 }
776 }
777
778 async fn stop(&mut self) -> Result<(), CamelError> {
779 Ok(())
780 }
781
782 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
783 camel_component::ConcurrencyModel::Concurrent { max: None }
784 }
785}
786
787impl ContainerConsumer {
788 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
789 use futures::StreamExt;
790
791 loop {
792 if context.is_cancelled() {
793 tracing::info!("Container events consumer shutting down");
794 return Ok(());
795 }
796
797 let docker = match self.config.connect_docker().await {
798 Ok(d) => d,
799 Err(e) => {
800 tracing::error!(
801 "Consumer failed to connect to docker: {}. Retrying in 5s...",
802 e
803 );
804 tokio::select! {
805 _ = context.cancelled() => {
806 tracing::info!("Container events consumer shutting down");
807 return Ok(());
808 }
809 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
810 }
811 continue;
812 }
813 };
814
815 let mut event_stream = docker.events::<String>(None);
816
817 loop {
818 tokio::select! {
819 _ = context.cancelled() => {
820 tracing::info!("Container events consumer shutting down");
821 return Ok(());
822 }
823
824 msg = event_stream.next() => {
825 match msg {
826 Some(Ok(event)) => {
827 let formatted = format_docker_event(&event);
828 let message = Message::new(Body::Text(formatted));
829 let exchange = Exchange::new(message);
830
831 if let Err(e) = context.send(exchange).await {
832 tracing::error!("Failed to send exchange: {:?}", e);
833 break;
834 }
835 }
836 Some(Err(e)) => {
837 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
838 break;
839 }
840 None => {
841 tracing::info!("Docker event stream ended. Reconnecting...");
842 break;
843 }
844 }
845 }
846 }
847 }
848
849 tokio::select! {
850 _ = context.cancelled() => {
851 tracing::info!("Container events consumer shutting down");
852 return Ok(());
853 }
854 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
855 }
856 }
857 }
858
859 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
860 use futures::StreamExt;
861
862 let container_id = self.config.container_id.clone().ok_or_else(|| {
863 CamelError::EndpointCreationFailed(
864 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
865 .to_string(),
866 )
867 })?;
868
869 loop {
870 if context.is_cancelled() {
871 tracing::info!("Container logs consumer shutting down");
872 return Ok(());
873 }
874
875 let docker = match self.config.connect_docker().await {
876 Ok(d) => d,
877 Err(e) => {
878 tracing::error!(
879 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
880 e
881 );
882 tokio::select! {
883 _ = context.cancelled() => {
884 tracing::info!("Container logs consumer shutting down");
885 return Ok(());
886 }
887 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
888 }
889 continue;
890 }
891 };
892
893 let tail = self
894 .config
895 .tail
896 .clone()
897 .unwrap_or_else(|| "all".to_string());
898
899 let options = bollard::container::LogsOptions::<String> {
900 follow: self.config.follow,
901 stdout: true,
902 stderr: true,
903 timestamps: self.config.timestamps,
904 tail,
905 ..Default::default()
906 };
907
908 let mut log_stream = docker.logs(&container_id, Some(options));
909 let container_id_header = container_id.clone();
910
911 loop {
912 tokio::select! {
913 _ = context.cancelled() => {
914 tracing::info!("Container logs consumer shutting down");
915 return Ok(());
916 }
917
918 msg = log_stream.next() => {
919 match msg {
920 Some(Ok(log_output)) => {
921 let (stream_type, content) = match log_output {
922 bollard::container::LogOutput::StdOut { message } => {
923 ("stdout", String::from_utf8_lossy(&message).into_owned())
924 }
925 bollard::container::LogOutput::StdErr { message } => {
926 ("stderr", String::from_utf8_lossy(&message).into_owned())
927 }
928 bollard::container::LogOutput::Console { message } => {
929 ("console", String::from_utf8_lossy(&message).into_owned())
930 }
931 bollard::container::LogOutput::StdIn { message } => {
932 ("stdin", String::from_utf8_lossy(&message).into_owned())
933 }
934 };
935
936 let content = content.trim_end();
937 if content.is_empty() {
938 continue;
939 }
940
941 let mut message = Message::new(Body::Text(content.to_string()));
942 message.set_header(
943 HEADER_CONTAINER_ID,
944 serde_json::Value::String(container_id_header.clone()),
945 );
946 message.set_header(
947 HEADER_LOG_STREAM,
948 serde_json::Value::String(stream_type.to_string()),
949 );
950
951 if self.config.timestamps
952 && let Some(ts) = extract_timestamp(content) {
953 message.set_header(
954 HEADER_LOG_TIMESTAMP,
955 serde_json::Value::String(ts),
956 );
957 }
958
959 let exchange = Exchange::new(message);
960
961 if let Err(e) = context.send(exchange).await {
962 tracing::error!("Failed to send log exchange: {:?}", e);
963 break;
964 }
965 }
966 Some(Err(e)) => {
967 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
968 break;
969 }
970 None => {
971 if self.config.follow {
972 tracing::info!("Docker log stream ended. Reconnecting...");
973 break;
974 } else {
975 tracing::info!("Container logs consumer finished (follow=false)");
976 return Ok(());
977 }
978 }
979 }
980 }
981 }
982 }
983
984 tokio::select! {
985 _ = context.cancelled() => {
986 tracing::info!("Container logs consumer shutting down");
987 return Ok(());
988 }
989 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
990 }
991 }
992 }
993}
994
995fn extract_timestamp(log_line: &str) -> Option<String> {
996 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
997 if parts.len() > 1 && parts[0].contains('T') {
998 Some(parts[0].to_string())
999 } else {
1000 None
1001 }
1002}
1003
1004pub struct ContainerComponent;
1012
1013impl ContainerComponent {
1014 pub fn new() -> Self {
1016 Self
1017 }
1018}
1019
1020impl Default for ContainerComponent {
1021 fn default() -> Self {
1022 Self::new()
1023 }
1024}
1025
1026impl Component for ContainerComponent {
1027 fn scheme(&self) -> &str {
1028 "container"
1029 }
1030
1031 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1032 let config = ContainerConfig::from_uri(uri)?;
1033 Ok(Box::new(ContainerEndpoint {
1034 uri: uri.to_string(),
1035 config,
1036 }))
1037 }
1038}
1039
1040pub struct ContainerEndpoint {
1045 uri: String,
1046 config: ContainerConfig,
1047}
1048
1049impl Endpoint for ContainerEndpoint {
1050 fn uri(&self) -> &str {
1051 &self.uri
1052 }
1053
1054 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1055 Ok(Box::new(ContainerConsumer {
1056 config: self.config.clone(),
1057 }))
1058 }
1059
1060 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1061 let docker = self.config.connect_docker_client()?;
1062 Ok(BoxProcessor::new(ContainerProducer {
1063 config: self.config.clone(),
1064 docker,
1065 }))
1066 }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 #[test]
1074 fn test_container_config() {
1075 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1076 assert_eq!(config.operation, "run");
1077 assert_eq!(config.image.as_deref(), Some("alpine"));
1078 assert_eq!(config.host.as_deref(), Some("unix:///var/run/docker.sock"));
1079 }
1080
1081 #[test]
1082 fn test_container_config_parses_name() {
1083 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1084 assert_eq!(config.name.as_deref(), Some("my-container"));
1085 }
1086
1087 #[test]
1088 fn test_parse_producer_operation_known() {
1089 assert_eq!(
1090 parse_producer_operation("list").unwrap(),
1091 ProducerOperation::List
1092 );
1093 assert_eq!(
1094 parse_producer_operation("run").unwrap(),
1095 ProducerOperation::Run
1096 );
1097 assert_eq!(
1098 parse_producer_operation("start").unwrap(),
1099 ProducerOperation::Start
1100 );
1101 assert_eq!(
1102 parse_producer_operation("stop").unwrap(),
1103 ProducerOperation::Stop
1104 );
1105 assert_eq!(
1106 parse_producer_operation("remove").unwrap(),
1107 ProducerOperation::Remove
1108 );
1109 }
1110
1111 #[test]
1112 fn test_parse_producer_operation_unknown() {
1113 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1114 match err {
1115 CamelError::ProcessorError(msg) => {
1116 assert!(
1117 msg.contains("Unknown container operation"),
1118 "Unexpected error message: {}",
1119 msg
1120 );
1121 }
1122 _ => panic!("Expected ProcessorError for unknown operation"),
1123 }
1124 }
1125
1126 #[test]
1127 fn test_resolve_container_name_header_overrides_config() {
1128 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1129 let mut exchange = Exchange::new(Message::new(""));
1130 exchange.input.set_header(
1131 HEADER_CONTAINER_NAME,
1132 serde_json::Value::String("header-name".to_string()),
1133 );
1134
1135 let resolved = resolve_container_name(&exchange, &config);
1136 assert_eq!(resolved.as_deref(), Some("header-name"));
1137 }
1138
1139 #[test]
1140 fn test_container_config_rejects_tcp_host() {
1141 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1142 let err = config.connect_docker_client().unwrap_err();
1143 match err {
1144 CamelError::ProcessorError(msg) => {
1145 assert!(
1146 msg.to_lowercase().contains("tcp"),
1147 "Expected TCP scheme error, got: {}",
1148 msg
1149 );
1150 }
1151 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1152 }
1153 }
1154
1155 #[tokio::test]
1156 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1157 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1158 let remove_called_clone = remove_called.clone();
1159
1160 let result = run_container_with_cleanup(
1161 || async { Ok("container-123".to_string()) },
1162 |_id| async move {
1163 Err(CamelError::ProcessorError(
1164 "Failed to start container".to_string(),
1165 ))
1166 },
1167 move |_id| {
1168 let remove_called_inner = remove_called_clone.clone();
1169 async move {
1170 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1171 Ok(())
1172 }
1173 },
1174 )
1175 .await;
1176
1177 assert!(result.is_err(), "Expected start failure to bubble up");
1178 assert!(
1179 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1180 "Expected cleanup to remove container"
1181 );
1182 }
1183
1184 #[test]
1185 fn test_container_component_creates_endpoint() {
1186 let component = ContainerComponent::new();
1187 assert_eq!(component.scheme(), "container");
1188 let endpoint = component
1189 .create_endpoint("container:run?image=alpine")
1190 .unwrap();
1191 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1192 }
1193
1194 #[test]
1195 fn test_container_config_parses_ports() {
1196 let config =
1197 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1198 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1199 }
1200
1201 #[test]
1202 fn test_container_config_parses_env() {
1203 let config =
1204 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1205 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1206 }
1207
1208 #[test]
1209 fn test_container_config_parses_logs_options() {
1210 let config = ContainerConfig::from_uri(
1211 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1212 )
1213 .unwrap();
1214 assert_eq!(config.operation, "logs");
1215 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1216 assert!(config.follow);
1217 assert!(config.timestamps);
1218 assert_eq!(config.tail.as_deref(), Some("100"));
1219 }
1220
1221 #[test]
1222 fn test_container_config_logs_defaults() {
1223 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1224 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1228
1229 #[test]
1230 fn test_parse_ports_single() {
1231 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1232 let (exposed, bindings) = config.parse_ports().unwrap();
1233
1234 assert!(exposed.contains_key("80/tcp"));
1235 assert!(bindings.contains_key("80/tcp"));
1236
1237 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1238 assert_eq!(binding.len(), 1);
1239 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1240 }
1241
1242 #[test]
1243 fn test_parse_ports_multiple() {
1244 let config =
1245 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1246 let (exposed, bindings) = config.parse_ports().unwrap();
1247
1248 assert!(exposed.contains_key("80/tcp"));
1249 assert!(exposed.contains_key("443/tcp"));
1250 assert_eq!(bindings.len(), 2);
1251 }
1252
1253 #[test]
1254 fn test_parse_ports_with_protocol() {
1255 let config =
1256 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1257 .unwrap();
1258 let (exposed, bindings) = config.parse_ports().unwrap();
1259
1260 assert!(exposed.contains_key("80/tcp"));
1261 assert!(exposed.contains_key("53/udp"));
1262 }
1263
1264 #[test]
1265 fn test_parse_ports_none() {
1266 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1267 assert!(config.parse_ports().is_none());
1268 }
1269
1270 #[test]
1271 fn test_parse_env_single() {
1272 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1273 let env = config.parse_env().unwrap();
1274
1275 assert_eq!(env.len(), 1);
1276 assert_eq!(env[0], "FOO=bar");
1277 }
1278
1279 #[test]
1280 fn test_parse_env_multiple() {
1281 let config =
1282 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1283 .unwrap();
1284 let env = config.parse_env().unwrap();
1285
1286 assert_eq!(env.len(), 3);
1287 assert!(env.contains(&"FOO=bar".to_string()));
1288 assert!(env.contains(&"BAZ=qux".to_string()));
1289 assert!(env.contains(&"NUM=123".to_string()));
1290 }
1291
1292 #[test]
1293 fn test_parse_env_none() {
1294 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1295 assert!(config.parse_env().is_none());
1296 }
1297
1298 mod test_helpers {
1299 use async_trait::async_trait;
1300 use camel_api::CamelError;
1301
1302 pub struct NullRouteController;
1304
1305 #[async_trait]
1306 impl camel_api::RouteController for NullRouteController {
1307 async fn start_route(&mut self, _: &str) -> Result<(), CamelError> {
1308 Ok(())
1309 }
1310 async fn stop_route(&mut self, _: &str) -> Result<(), CamelError> {
1311 Ok(())
1312 }
1313 async fn restart_route(&mut self, _: &str) -> Result<(), CamelError> {
1314 Ok(())
1315 }
1316 async fn suspend_route(&mut self, _: &str) -> Result<(), CamelError> {
1317 Ok(())
1318 }
1319 async fn resume_route(&mut self, _: &str) -> Result<(), CamelError> {
1320 Ok(())
1321 }
1322 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1323 None
1324 }
1325 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1326 Ok(())
1327 }
1328 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1329 Ok(())
1330 }
1331 }
1332 }
1333
1334 use camel_api::Message;
1335 use std::sync::Arc;
1336 use test_helpers::NullRouteController;
1337 use tokio::sync::Mutex;
1338
1339 #[tokio::test]
1340 async fn test_container_producer_resolves_operation_from_header() {
1341 let component = ContainerComponent::new();
1342 let endpoint = component.create_endpoint("container:run").unwrap();
1343
1344 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1345 let mut producer = endpoint.create_producer(&ctx).unwrap();
1346
1347 let mut exchange = Exchange::new(Message::new(""));
1348 exchange
1349 .input
1350 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1351
1352 use tower::ServiceExt;
1353 let result = producer
1354 .ready()
1355 .await
1356 .unwrap()
1357 .call(exchange)
1358 .await
1359 .unwrap();
1360
1361 assert_eq!(
1363 result
1364 .input
1365 .header(HEADER_ACTION_RESULT)
1366 .map(|v| v.as_str().unwrap()),
1367 Some("success")
1368 );
1369 }
1370
1371 #[tokio::test]
1372 async fn test_container_producer_connection_error_on_invalid_host() {
1373 let component = ContainerComponent::new();
1375 let endpoint = component
1376 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1377 .unwrap();
1378
1379 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1380 let result = endpoint.create_producer(&ctx);
1381
1382 assert!(
1384 result.is_err(),
1385 "Expected error when connecting to invalid host"
1386 );
1387 let err = result.unwrap_err();
1388 match &err {
1389 CamelError::ProcessorError(msg) => {
1390 assert!(
1391 msg.to_lowercase().contains("connection")
1392 || msg.to_lowercase().contains("connect")
1393 || msg.to_lowercase().contains("socket")
1394 || msg.contains("docker"),
1395 "Error message should indicate connection failure, got: {}",
1396 msg
1397 );
1398 }
1399 _ => panic!("Expected ProcessorError, got: {:?}", err),
1400 }
1401 }
1402
1403 #[tokio::test]
1405 async fn test_container_producer_lifecycle_operations_missing_id() {
1406 let docker = match Docker::connect_with_unix_defaults() {
1408 Ok(d) => d,
1409 Err(_) => {
1410 eprintln!("Skipping test: Could not connect to Docker daemon");
1411 return;
1412 }
1413 };
1414
1415 if docker.ping().await.is_err() {
1416 eprintln!("Skipping test: Docker daemon not responding to ping");
1417 return;
1418 }
1419
1420 let component = ContainerComponent::new();
1421 let endpoint = component.create_endpoint("container:start").unwrap();
1422 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1423 let mut producer = endpoint.create_producer(&ctx).unwrap();
1424
1425 for operation in ["start", "stop", "remove"] {
1427 let mut exchange = Exchange::new(Message::new(""));
1428 exchange.input.set_header(
1429 HEADER_ACTION,
1430 serde_json::Value::String(operation.to_string()),
1431 );
1432 use tower::ServiceExt;
1435 let result = producer.ready().await.unwrap().call(exchange).await;
1436
1437 assert!(
1438 result.is_err(),
1439 "Expected error for {} operation without CamelContainerId",
1440 operation
1441 );
1442 let err = result.unwrap_err();
1443 match &err {
1444 CamelError::ProcessorError(msg) => {
1445 assert!(
1446 msg.contains(HEADER_CONTAINER_ID),
1447 "Error message should mention {}, got: {}",
1448 HEADER_CONTAINER_ID,
1449 msg
1450 );
1451 }
1452 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1453 }
1454 }
1455 }
1456
1457 #[tokio::test]
1459 async fn test_container_producer_stop_nonexistent() {
1460 let docker = match Docker::connect_with_unix_defaults() {
1462 Ok(d) => d,
1463 Err(_) => {
1464 eprintln!("Skipping test: Could not connect to Docker daemon");
1465 return;
1466 }
1467 };
1468
1469 if docker.ping().await.is_err() {
1470 eprintln!("Skipping test: Docker daemon not responding to ping");
1471 return;
1472 }
1473
1474 let component = ContainerComponent::new();
1475 let endpoint = component.create_endpoint("container:stop").unwrap();
1476 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1477 let mut producer = endpoint.create_producer(&ctx).unwrap();
1478
1479 let mut exchange = Exchange::new(Message::new(""));
1480 exchange
1481 .input
1482 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1483 exchange.input.set_header(
1484 HEADER_CONTAINER_ID,
1485 serde_json::Value::String("nonexistent-container-123".into()),
1486 );
1487
1488 use tower::ServiceExt;
1489 let result = producer.ready().await.unwrap().call(exchange).await;
1490
1491 assert!(
1492 result.is_err(),
1493 "Expected error when stopping nonexistent container"
1494 );
1495 let err = result.unwrap_err();
1496 match &err {
1497 CamelError::ProcessorError(msg) => {
1498 assert!(
1500 msg.to_lowercase().contains("no such container")
1501 || msg.to_lowercase().contains("not found")
1502 || msg.contains("404"),
1503 "Error message should indicate container not found, got: {}",
1504 msg
1505 );
1506 }
1507 _ => panic!("Expected ProcessorError, got: {:?}", err),
1508 }
1509 }
1510
1511 #[tokio::test]
1513 async fn test_container_producer_run_missing_image() {
1514 let docker = match Docker::connect_with_unix_defaults() {
1516 Ok(d) => d,
1517 Err(_) => {
1518 eprintln!("Skipping test: Could not connect to Docker daemon");
1519 return;
1520 }
1521 };
1522
1523 if docker.ping().await.is_err() {
1524 eprintln!("Skipping test: Docker daemon not responding to ping");
1525 return;
1526 }
1527
1528 let component = ContainerComponent::new();
1530 let endpoint = component.create_endpoint("container:run").unwrap();
1531 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1532 let mut producer = endpoint.create_producer(&ctx).unwrap();
1533
1534 let mut exchange = Exchange::new(Message::new(""));
1535 exchange
1536 .input
1537 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1538 use tower::ServiceExt;
1541 let result = producer.ready().await.unwrap().call(exchange).await;
1542
1543 assert!(
1544 result.is_err(),
1545 "Expected error for run operation without image"
1546 );
1547 let err = result.unwrap_err();
1548 match &err {
1549 CamelError::ProcessorError(msg) => {
1550 assert!(
1551 msg.to_lowercase().contains("image"),
1552 "Error message should mention 'image', got: {}",
1553 msg
1554 );
1555 }
1556 _ => panic!("Expected ProcessorError, got: {:?}", err),
1557 }
1558 }
1559
1560 #[tokio::test]
1562 async fn test_container_producer_run_image_from_header() {
1563 let docker = match Docker::connect_with_unix_defaults() {
1565 Ok(d) => d,
1566 Err(_) => {
1567 eprintln!("Skipping test: Could not connect to Docker daemon");
1568 return;
1569 }
1570 };
1571
1572 if docker.ping().await.is_err() {
1573 eprintln!("Skipping test: Docker daemon not responding to ping");
1574 return;
1575 }
1576
1577 let component = ContainerComponent::new();
1579 let endpoint = component.create_endpoint("container:run").unwrap();
1580 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1581 let mut producer = endpoint.create_producer(&ctx).unwrap();
1582
1583 let mut exchange = Exchange::new(Message::new(""));
1584 exchange
1585 .input
1586 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1587 exchange.input.set_header(
1589 HEADER_IMAGE,
1590 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1591 );
1592
1593 use tower::ServiceExt;
1594 let result = producer.ready().await.unwrap().call(exchange).await;
1595
1596 assert!(
1598 result.is_err(),
1599 "Expected error when running container with nonexistent image"
1600 );
1601 let err = result.unwrap_err();
1602 match &err {
1603 CamelError::ProcessorError(msg) => {
1604 assert!(
1606 msg.to_lowercase().contains("no such image")
1607 || msg.to_lowercase().contains("not found")
1608 || msg.to_lowercase().contains("image")
1609 || msg.to_lowercase().contains("pull")
1610 || msg.contains("404"),
1611 "Error message should indicate image issue, got: {}",
1612 msg
1613 );
1614 }
1615 _ => panic!("Expected ProcessorError, got: {:?}", err),
1616 }
1617 }
1618
1619 #[tokio::test]
1622 async fn test_container_producer_run_alpine_container() {
1623 let docker = match Docker::connect_with_unix_defaults() {
1624 Ok(d) => d,
1625 Err(_) => {
1626 eprintln!("Skipping test: Could not connect to Docker daemon");
1627 return;
1628 }
1629 };
1630
1631 if docker.ping().await.is_err() {
1632 eprintln!("Skipping test: Docker daemon not responding to ping");
1633 return;
1634 }
1635
1636 let images = docker.list_images::<&str>(None).await.unwrap();
1638 let has_alpine = images
1639 .iter()
1640 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1641
1642 if !has_alpine {
1643 eprintln!("Pulling alpine:latest image...");
1644 let mut stream = docker.create_image(
1645 Some(bollard::image::CreateImageOptions {
1646 from_image: "alpine:latest",
1647 ..Default::default()
1648 }),
1649 None,
1650 None,
1651 );
1652
1653 use futures::StreamExt;
1654 while let Some(_item) = stream.next().await {
1655 }
1657 eprintln!("Image pulled successfully");
1658 }
1659
1660 let component = ContainerComponent::new();
1662 let endpoint = component.create_endpoint("container:run").unwrap();
1663 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1664 let mut producer = endpoint.create_producer(&ctx).unwrap();
1665
1666 let timestamp = std::time::SystemTime::now()
1668 .duration_since(std::time::UNIX_EPOCH)
1669 .unwrap()
1670 .as_millis();
1671 let container_name = format!("test-rust-camel-{}", timestamp);
1672 let mut exchange = Exchange::new(Message::new(""));
1673 exchange.input.set_header(
1674 HEADER_IMAGE,
1675 serde_json::Value::String("alpine:latest".into()),
1676 );
1677 exchange.input.set_header(
1678 HEADER_CONTAINER_NAME,
1679 serde_json::Value::String(container_name.clone()),
1680 );
1681
1682 use tower::ServiceExt;
1683 let result = producer
1684 .ready()
1685 .await
1686 .unwrap()
1687 .call(exchange)
1688 .await
1689 .expect("Container run should succeed");
1690
1691 let container_id = result
1693 .input
1694 .header(HEADER_CONTAINER_ID)
1695 .and_then(|v| v.as_str().map(|s| s.to_string()))
1696 .expect("Expected container ID header");
1697 assert!(!container_id.is_empty(), "Container ID should not be empty");
1698
1699 assert_eq!(
1701 result
1702 .input
1703 .header(HEADER_ACTION_RESULT)
1704 .and_then(|v| v.as_str()),
1705 Some("success")
1706 );
1707
1708 let inspect = docker
1710 .inspect_container(&container_id, None)
1711 .await
1712 .expect("Container should exist");
1713 assert_eq!(
1714 inspect.id.as_ref().map(|s| s.as_str()),
1715 Some(container_id.as_str())
1716 );
1717
1718 docker
1720 .remove_container(
1721 &container_id,
1722 Some(bollard::container::RemoveContainerOptions {
1723 force: true,
1724 ..Default::default()
1725 }),
1726 )
1727 .await
1728 .ok();
1729
1730 eprintln!("✅ Container {} created and cleaned up", container_id);
1731 }
1732
1733 #[tokio::test]
1735 async fn test_container_consumer_unsupported_operation() {
1736 use tokio::sync::mpsc;
1737
1738 let component = ContainerComponent::new();
1739 let endpoint = component.create_endpoint("container:run").unwrap();
1740 let mut consumer = endpoint.create_consumer().unwrap();
1741
1742 let (tx, _rx) = mpsc::channel(16);
1744 let cancel_token = tokio_util::sync::CancellationToken::new();
1745 let context = ConsumerContext::new(tx, cancel_token);
1746
1747 let result = consumer.start(context).await;
1748
1749 assert!(
1751 result.is_err(),
1752 "Expected error for unsupported consumer operation"
1753 );
1754 let err = result.unwrap_err();
1755 match &err {
1756 CamelError::EndpointCreationFailed(msg) => {
1757 assert!(
1758 msg.contains("Consumer only supports 'events' or 'logs'"),
1759 "Error message should mention events or logs support, got: {}",
1760 msg
1761 );
1762 }
1763 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1764 }
1765 }
1766
1767 #[test]
1768 fn test_container_consumer_concurrency_model_is_concurrent() {
1769 let consumer = ContainerConsumer {
1770 config: ContainerConfig::from_uri("container:events").unwrap(),
1771 };
1772
1773 assert_eq!(
1774 consumer.concurrency_model(),
1775 camel_component::ConcurrencyModel::Concurrent { max: None }
1776 );
1777 }
1778
1779 #[tokio::test]
1783 async fn test_container_consumer_cancellation() {
1784 use std::sync::atomic::{AtomicBool, Ordering};
1785 use tokio::sync::mpsc;
1786
1787 let docker = match Docker::connect_with_unix_defaults() {
1789 Ok(d) => d,
1790 Err(_) => {
1791 eprintln!("Skipping test: Could not connect to Docker daemon");
1792 return;
1793 }
1794 };
1795
1796 if docker.ping().await.is_err() {
1797 eprintln!("Skipping test: Docker daemon not responding to ping");
1798 return;
1799 }
1800
1801 let component = ContainerComponent::new();
1802 let endpoint = component.create_endpoint("container:events").unwrap();
1803 let mut consumer = endpoint.create_consumer().unwrap();
1804
1805 let (tx, _rx) = mpsc::channel(16);
1807 let cancel_token = tokio_util::sync::CancellationToken::new();
1808 let context = ConsumerContext::new(tx, cancel_token.clone());
1809
1810 let completed = Arc::new(AtomicBool::new(false));
1812 let completed_clone = completed.clone();
1813
1814 let handle = tokio::spawn(async move {
1816 let result = consumer.start(context).await;
1817 completed_clone.store(true, Ordering::SeqCst);
1819 result
1820 });
1821
1822 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1824
1825 assert!(
1827 !completed.load(Ordering::SeqCst),
1828 "Consumer should still be running before cancellation"
1829 );
1830
1831 cancel_token.cancel();
1833
1834 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1836
1837 assert!(
1839 result.is_ok(),
1840 "Consumer should gracefully shut down after cancellation"
1841 );
1842
1843 assert!(
1845 completed.load(Ordering::SeqCst),
1846 "Consumer should have completed after cancellation"
1847 );
1848 }
1849
1850 #[tokio::test]
1854 async fn test_container_producer_list_containers() {
1855 let docker = match Docker::connect_with_unix_defaults() {
1858 Ok(d) => d,
1859 Err(_) => {
1860 eprintln!("Skipping test: Could not connect to Docker daemon");
1861 return;
1862 }
1863 };
1864
1865 if docker.ping().await.is_err() {
1866 eprintln!("Skipping test: Docker daemon not responding to ping");
1867 return;
1868 }
1869
1870 let component = ContainerComponent::new();
1872 let endpoint = component.create_endpoint("container:list").unwrap();
1873
1874 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1875 let mut producer = endpoint.create_producer(&ctx).unwrap();
1876
1877 let mut exchange = Exchange::new(Message::new(""));
1879 exchange
1880 .input
1881 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1882
1883 use tower::ServiceExt;
1885 let result = producer
1886 .ready()
1887 .await
1888 .unwrap()
1889 .call(exchange)
1890 .await
1891 .expect("Producer should succeed when Docker is available");
1892
1893 match &result.input.body {
1896 camel_api::Body::Json(json_value) => {
1897 assert!(
1898 json_value.is_array(),
1899 "Expected input body to be a JSON array, got: {:?}",
1900 json_value
1901 );
1902 }
1903 other => panic!("Expected Body::Json with array, got: {:?}", other),
1904 }
1905 }
1906}