1use std::collections::{HashMap, HashSet};
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use bollard::Docker;
14use bollard::service::{HostConfig, PortBinding};
15use camel_api::{Body, BoxProcessor, CamelError, Exchange, Message};
16use camel_component::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
17use camel_endpoint::parse_uri;
18use tower::Service;
19
20static CONTAINER_TRACKER: once_cell::sync::Lazy<Arc<Mutex<HashSet<String>>>> =
23 once_cell::sync::Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
24
25fn track_container(id: String) {
27 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
28 tracker.insert(id);
29 }
30}
31
32fn untrack_container(id: &str) {
34 if let Ok(mut tracker) = CONTAINER_TRACKER.lock() {
35 tracker.remove(id);
36 }
37}
38
39pub async fn cleanup_tracked_containers() {
41 let ids: Vec<String> = {
42 match CONTAINER_TRACKER.lock() {
43 Ok(tracker) => tracker.iter().cloned().collect(),
44 Err(_) => return,
45 }
46 };
47
48 if ids.is_empty() {
49 return;
50 }
51
52 tracing::info!("Cleaning up {} tracked container(s)", ids.len());
53
54 let docker = match Docker::connect_with_local_defaults() {
55 Ok(d) => d,
56 Err(e) => {
57 tracing::error!("Failed to connect to Docker for cleanup: {}", e);
58 return;
59 }
60 };
61
62 for id in ids {
63 match docker
64 .remove_container(
65 &id,
66 Some(bollard::container::RemoveContainerOptions {
67 force: true,
68 ..Default::default()
69 }),
70 )
71 .await
72 {
73 Ok(_) => {
74 tracing::debug!("Cleaned up container {}", id);
75 untrack_container(&id);
76 }
77 Err(e) => {
78 tracing::warn!("Failed to cleanup container {}: {}", id, e);
79 }
80 }
81 }
82}
83
84const DOCKER_CONNECT_TIMEOUT_SECS: u64 = 120;
88
89pub const HEADER_ACTION: &str = "CamelContainerAction";
91
92pub const HEADER_IMAGE: &str = "CamelContainerImage";
94
95pub const HEADER_CONTAINER_ID: &str = "CamelContainerId";
97
98pub const HEADER_LOG_STREAM: &str = "CamelContainerLogStream";
100
101pub const HEADER_LOG_TIMESTAMP: &str = "CamelContainerLogTimestamp";
103
104pub const HEADER_CONTAINER_NAME: &str = "CamelContainerName";
106
107pub const HEADER_ACTION_RESULT: &str = "CamelContainerActionResult";
109
110#[derive(Debug, Clone)]
115pub struct ContainerConfig {
116 pub operation: String,
118 pub image: Option<String>,
120 pub name: Option<String>,
122 pub host: Option<String>,
124 pub cmd: Option<String>,
126 pub ports: Option<String>,
128 pub env: Option<String>,
130 pub network: Option<String>,
132 pub container_id: Option<String>,
134 pub follow: bool,
136 pub timestamps: bool,
138 pub tail: Option<String>,
140 pub auto_pull: bool,
142 pub auto_remove: bool,
144}
145
146impl ContainerConfig {
147 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
155 let parts = parse_uri(uri)?;
156 if parts.scheme != "container" {
157 return Err(CamelError::InvalidUri(format!(
158 "expected scheme 'container', got '{}'",
159 parts.scheme
160 )));
161 }
162
163 let image = parts.params.get("image").cloned();
164 let name = parts.params.get("name").cloned();
165 let cmd = parts.params.get("cmd").cloned();
166 let ports = parts.params.get("ports").cloned();
167 let env = parts.params.get("env").cloned();
168 let network = parts.params.get("network").cloned();
169 let container_id = parts.params.get("containerId").cloned();
170 let follow = parts
171 .params
172 .get("follow")
173 .map(|v| v.eq_ignore_ascii_case("true"))
174 .unwrap_or(true);
175 let timestamps = parts
176 .params
177 .get("timestamps")
178 .map(|v| v.eq_ignore_ascii_case("true"))
179 .unwrap_or(false);
180 let tail = parts.params.get("tail").cloned();
181 let auto_pull = parts
182 .params
183 .get("autoPull")
184 .map(|v| v.eq_ignore_ascii_case("true"))
185 .unwrap_or(true);
186 let auto_remove = parts
187 .params
188 .get("autoRemove")
189 .map(|v| v.eq_ignore_ascii_case("true"))
190 .unwrap_or(true);
191 let host = parts
192 .params
193 .get("host")
194 .cloned()
195 .or_else(|| {
196 Some(if cfg!(windows) {
197 "npipe:////./pipe/docker_engine".to_string()
198 } else {
199 "unix:///var/run/docker.sock".to_string()
200 })
201 });
202
203 Ok(Self {
204 operation: parts.path,
205 image,
206 name,
207 host,
208 cmd,
209 ports,
210 env,
211 network,
212 container_id,
213 follow,
214 timestamps,
215 tail,
216 auto_pull,
217 auto_remove,
218 })
219 }
220
221 fn docker_socket_path(&self) -> Result<&str, CamelError> {
222 let host = self
223 .host
224 .as_deref()
225 .unwrap_or(if cfg!(windows) {
226 "npipe:////./pipe/docker_engine"
227 } else {
228 "unix:///var/run/docker.sock"
229 });
230
231 if host.starts_with("unix://") || host.starts_with("npipe://") {
232 return Ok(host);
233 }
234
235 if host.contains("://") {
236 return Err(CamelError::ProcessorError(format!(
237 "Unsupported Docker host scheme: {} (only unix:// and npipe:// are supported)",
238 host
239 )));
240 }
241
242 Ok(host)
243 }
244
245 pub fn connect_docker_client(&self) -> Result<Docker, CamelError> {
246 let socket_path = self.docker_socket_path()?;
247 Docker::connect_with_socket(
248 socket_path,
249 DOCKER_CONNECT_TIMEOUT_SECS,
250 bollard::API_DEFAULT_VERSION,
251 )
252 .map_err(|e| {
253 CamelError::ProcessorError(format!("Failed to connect to docker daemon: {}", e))
254 })
255 }
256
257 pub async fn connect_docker(&self) -> Result<Docker, CamelError> {
265 let docker = self.connect_docker_client()?;
266 docker
267 .ping()
268 .await
269 .map_err(|e| CamelError::ProcessorError(format!("Docker ping failed: {}", e)))?;
270 Ok(docker)
271 }
272
273 #[allow(clippy::type_complexity)]
274 fn parse_ports(
275 &self,
276 ) -> Option<(
277 HashMap<String, HashMap<(), ()>>,
278 HashMap<String, Option<Vec<PortBinding>>>,
279 )> {
280 let ports_str = self.ports.as_ref()?;
281
282 let mut exposed_ports: HashMap<String, HashMap<(), ()>> = HashMap::new();
283 let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
284
285 for mapping in ports_str.split(',') {
286 let mapping = mapping.trim();
287 if mapping.is_empty() {
288 continue;
289 }
290
291 let (host_port, container_spec) = mapping.split_once(':')?;
292
293 let (container_port, protocol) = if container_spec.contains('/') {
294 let parts: Vec<&str> = container_spec.split('/').collect();
295 (parts[0], parts[1])
296 } else {
297 (container_spec, "tcp")
298 };
299
300 let container_key = format!("{}/{}", container_port, protocol);
301
302 exposed_ports.insert(container_key.clone(), HashMap::new());
303
304 port_bindings.insert(
305 container_key,
306 Some(vec![PortBinding {
307 host_ip: None,
308 host_port: Some(host_port.to_string()),
309 }]),
310 );
311 }
312
313 if exposed_ports.is_empty() {
314 None
315 } else {
316 Some((exposed_ports, port_bindings))
317 }
318 }
319
320 fn parse_env(&self) -> Option<Vec<String>> {
321 let env_str = self.env.as_ref()?;
322
323 let env_vars: Vec<String> = env_str
324 .split(',')
325 .map(|s| s.trim().to_string())
326 .filter(|s| !s.is_empty())
327 .collect();
328
329 if env_vars.is_empty() {
330 None
331 } else {
332 Some(env_vars)
333 }
334 }
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338enum ProducerOperation {
339 List,
340 Run,
341 Start,
342 Stop,
343 Remove,
344}
345
346fn parse_producer_operation(operation: &str) -> Result<ProducerOperation, CamelError> {
347 match operation {
348 "list" => Ok(ProducerOperation::List),
349 "run" => Ok(ProducerOperation::Run),
350 "start" => Ok(ProducerOperation::Start),
351 "stop" => Ok(ProducerOperation::Stop),
352 "remove" => Ok(ProducerOperation::Remove),
353 _ => Err(CamelError::ProcessorError(format!(
354 "Unknown container operation: {}",
355 operation
356 ))),
357 }
358}
359
360fn resolve_container_name(exchange: &Exchange, config: &ContainerConfig) -> Option<String> {
361 exchange
362 .input
363 .header(HEADER_CONTAINER_NAME)
364 .and_then(|v| v.as_str().map(|s| s.to_string()))
365 .or_else(|| config.name.clone())
366}
367
368async fn image_exists_locally(docker: &Docker, image: &str) -> Result<bool, CamelError> {
369 let images = docker
370 .list_images::<&str>(None)
371 .await
372 .map_err(|e| CamelError::ProcessorError(format!("Failed to list images: {}", e)))?;
373
374 Ok(images.iter().any(|img| {
375 img.repo_tags
376 .iter()
377 .any(|tag| tag == image || tag.starts_with(&format!("{}:", image)))
378 }))
379}
380
381async fn pull_image_with_progress(
382 docker: &Docker,
383 image: &str,
384 timeout_secs: u64,
385) -> Result<(), CamelError> {
386 use futures::StreamExt;
387
388 tracing::info!("Pulling image: {}", image);
389
390 let mut stream = docker.create_image(
391 Some(bollard::image::CreateImageOptions {
392 from_image: image,
393 ..Default::default()
394 }),
395 None,
396 None,
397 );
398
399 let start = std::time::Instant::now();
400 let mut last_progress = std::time::Instant::now();
401
402 while let Some(item) = stream.next().await {
403 if start.elapsed().as_secs() > timeout_secs {
404 return Err(CamelError::ProcessorError(format!(
405 "Image pull timeout after {}s. Try manually: docker pull {}",
406 timeout_secs, image
407 )));
408 }
409
410 match item {
411 Ok(update) => {
412 if last_progress.elapsed().as_secs() >= 2 {
414 if let Some(status) = update.status {
415 tracing::debug!("Pull progress: {}", status);
416 }
417 last_progress = std::time::Instant::now();
418 }
419 }
420 Err(e) => {
421 let err_str = e.to_string().to_lowercase();
422 if err_str.contains("unauthorized") || err_str.contains("401") {
423 return Err(CamelError::ProcessorError(format!(
424 "Authentication required for image '{}'. Configure Docker credentials: docker login",
425 image
426 )));
427 }
428 if err_str.contains("not found") || err_str.contains("404") {
429 return Err(CamelError::ProcessorError(format!(
430 "Image '{}' not found in registry. Check the image name and tag",
431 image
432 )));
433 }
434 return Err(CamelError::ProcessorError(format!(
435 "Failed to pull image '{}': {}",
436 image, e
437 )));
438 }
439 }
440 }
441
442 tracing::info!("Successfully pulled image: {}", image);
443 Ok(())
444}
445
446async fn ensure_image_available(
447 docker: &Docker,
448 image: &str,
449 auto_pull: bool,
450 timeout_secs: u64,
451) -> Result<(), CamelError> {
452 if image_exists_locally(docker, image).await? {
453 tracing::debug!("Image '{}' already available locally", image);
454 return Ok(());
455 }
456
457 if !auto_pull {
458 return Err(CamelError::ProcessorError(format!(
459 "Image '{}' not found locally. Set autoPull=true to pull automatically, or run: docker pull {}",
460 image, image
461 )));
462 }
463
464 pull_image_with_progress(docker, image, timeout_secs).await
465}
466
467fn format_docker_event(event: &bollard::models::EventMessage) -> String {
468 let action = event.action.as_deref().unwrap_or("unknown");
469 let actor = event.actor.as_ref();
470
471 let container_name = actor
472 .and_then(|a| a.attributes.as_ref())
473 .and_then(|attrs| attrs.get("name"))
474 .map(|s| s.as_str())
475 .unwrap_or("unknown");
476
477 let image = actor
478 .and_then(|a| a.attributes.as_ref())
479 .and_then(|attrs| attrs.get("image"))
480 .map(|s| s.as_str())
481 .unwrap_or("");
482
483 let exit_code = actor
484 .and_then(|a| a.attributes.as_ref())
485 .and_then(|attrs| attrs.get("exitCode"))
486 .map(|s| s.as_str());
487
488 match action {
489 "create" => {
490 if image.is_empty() {
491 format!("[CREATE] Container {}", container_name)
492 } else {
493 format!("[CREATE] Container {} ({})", container_name, image)
494 }
495 }
496 "start" => format!("[START] Container {}", container_name),
497 "die" => {
498 if let Some(code) = exit_code {
499 format!("[DIE] Container {} (exit: {})", container_name, code)
500 } else {
501 format!("[DIE] Container {}", container_name)
502 }
503 }
504 "destroy" => format!("[DESTROY] Container {}", container_name),
505 "stop" => format!("[STOP] Container {}", container_name),
506 "pause" => format!("[PAUSE] Container {}", container_name),
507 "unpause" => format!("[UNPAUSE] Container {}", container_name),
508 "restart" => format!("[RESTART] Container {}", container_name),
509 _ => format!("[{}] Container {}", action.to_uppercase(), container_name),
510 }
511}
512
513async fn run_container_with_cleanup<CreateFn, CreateFut, StartFn, StartFut, RemoveFn, RemoveFut>(
514 create: CreateFn,
515 start: StartFn,
516 remove: RemoveFn,
517) -> Result<String, CamelError>
518where
519 CreateFn: FnOnce() -> CreateFut,
520 CreateFut: Future<Output = Result<String, CamelError>>,
521 StartFn: FnOnce(String) -> StartFut,
522 StartFut: Future<Output = Result<(), CamelError>>,
523 RemoveFn: FnOnce(String) -> RemoveFut,
524 RemoveFut: Future<Output = Result<(), CamelError>>,
525{
526 let container_id = create().await?;
527 if let Err(start_err) = start(container_id.clone()).await {
528 if let Err(remove_err) = remove(container_id.clone()).await {
529 return Err(CamelError::ProcessorError(format!(
530 "Failed to start container: {}. Cleanup failed: {}",
531 start_err, remove_err
532 )));
533 }
534 return Err(start_err);
535 }
536
537 Ok(container_id)
538}
539
540#[derive(Clone)]
545pub struct ContainerProducer {
546 config: ContainerConfig,
547 docker: Docker,
548}
549
550impl Service<Exchange> for ContainerProducer {
551 type Response = Exchange;
552 type Error = CamelError;
553 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
554
555 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
556 Poll::Ready(Ok(()))
557 }
558
559 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
560 let config = self.config.clone();
561 let docker = self.docker.clone();
562 Box::pin(async move {
563 let operation_name = exchange
565 .input
566 .header(HEADER_ACTION)
567 .and_then(|v| v.as_str().map(|s| s.to_string()))
568 .unwrap_or_else(|| config.operation.clone());
569
570 let operation = parse_producer_operation(&operation_name)?;
571
572 match operation {
574 ProducerOperation::List => {
575 let containers = docker.list_containers::<String>(None).await.map_err(|e| {
576 CamelError::ProcessorError(format!("Failed to list containers: {}", e))
577 })?;
578
579 let json_value = serde_json::to_value(&containers).map_err(|e| {
580 CamelError::ProcessorError(format!("Failed to serialize containers: {}", e))
581 })?;
582
583 exchange.input.body = Body::Json(json_value);
584 exchange.input.set_header(
585 HEADER_ACTION_RESULT,
586 serde_json::Value::String("success".to_string()),
587 );
588 }
589 ProducerOperation::Run => {
590 let image = exchange
592 .input
593 .header(HEADER_IMAGE)
594 .and_then(|v| v.as_str().map(|s| s.to_string()))
595 .or(config.image.clone())
596 .ok_or_else(|| {
597 CamelError::ProcessorError(
598 "missing image for run operation. Specify in URI (image=alpine) or header (CamelContainerImage)".to_string(),
599 )
600 })?;
601
602 let pull_timeout = 300; ensure_image_available(&docker, &image, config.auto_pull, pull_timeout)
605 .await
606 .map_err(|e| {
607 CamelError::ProcessorError(format!(
608 "Image '{}' not available: {}",
609 image, e
610 ))
611 })?;
612
613 let container_name = resolve_container_name(&exchange, &config);
614 let container_name_ref = container_name.as_deref().unwrap_or("");
615 let cmd_parts: Option<Vec<String>> = config
616 .cmd
617 .as_ref()
618 .map(|c| c.split_whitespace().map(|s| s.to_string()).collect());
619 let auto_remove = config.auto_remove;
620 let (exposed_ports, port_bindings) = config.parse_ports().unwrap_or_default();
621 let env_vars = config.parse_env();
622 let network_mode = config.network.clone();
623
624 let docker_create = docker.clone();
625 let docker_start = docker.clone();
626 let docker_remove = docker.clone();
627
628 let container_id = run_container_with_cleanup(
629 move || async move {
630 let create_options = bollard::container::CreateContainerOptions {
631 name: container_name_ref,
632 ..Default::default()
633 };
634 let container_config = bollard::container::Config::<String> {
635 image: Some(image.clone()),
636 cmd: cmd_parts,
637 env: env_vars,
638 exposed_ports: if exposed_ports.is_empty() { None } else { Some(exposed_ports) },
639 host_config: Some(HostConfig {
640 auto_remove: Some(auto_remove),
641 port_bindings: if port_bindings.is_empty() { None } else { Some(port_bindings) },
642 network_mode,
643 ..Default::default()
644 }),
645 ..Default::default()
646 };
647
648 let create_response = docker_create
649 .create_container(Some(create_options), container_config)
650 .await
651 .map_err(|e| {
652 let err_str = e.to_string().to_lowercase();
653 if err_str.contains("409") || err_str.contains("conflict") {
654 CamelError::ProcessorError(format!(
655 "Container name '{}' already exists. Use a unique name or remove the existing container first",
656 container_name_ref
657 ))
658 } else {
659 CamelError::ProcessorError(format!(
660 "Failed to create container: {}",
661 e
662 ))
663 }
664 })?;
665
666 Ok(create_response.id)
667 },
668 move |container_id| async move {
669 docker_start
670 .start_container::<String>(&container_id, None)
671 .await
672 .map_err(|e| {
673 CamelError::ProcessorError(format!(
674 "Failed to start container: {}",
675 e
676 ))
677 })
678 },
679 move |container_id| async move {
680 docker_remove
681 .remove_container(&container_id, None)
682 .await
683 .map_err(|e| {
684 CamelError::ProcessorError(format!(
685 "Failed to remove container after start failure: {}",
686 e
687 ))
688 })
689 },
690 )
691 .await?;
692
693 track_container(container_id.clone());
694
695 exchange
696 .input
697 .set_header(HEADER_CONTAINER_ID, serde_json::Value::String(container_id));
698 exchange.input.set_header(
699 HEADER_ACTION_RESULT,
700 serde_json::Value::String("success".to_string()),
701 );
702 }
703 ProducerOperation::Start | ProducerOperation::Stop | ProducerOperation::Remove => {
704 let container_id = exchange
706 .input
707 .header(HEADER_CONTAINER_ID)
708 .and_then(|v| v.as_str().map(|s| s.to_string()))
709 .ok_or_else(|| {
710 CamelError::ProcessorError(format!(
711 "{} header is required for {} operation",
712 HEADER_CONTAINER_ID, operation_name
713 ))
714 })?;
715
716 match operation {
717 ProducerOperation::Start => {
718 docker
719 .start_container::<String>(&container_id, None)
720 .await
721 .map_err(|e| {
722 CamelError::ProcessorError(format!(
723 "Failed to start container: {}",
724 e
725 ))
726 })?;
727 }
728 ProducerOperation::Stop => {
729 docker
730 .stop_container(&container_id, None)
731 .await
732 .map_err(|e| {
733 CamelError::ProcessorError(format!(
734 "Failed to stop container: {}",
735 e
736 ))
737 })?;
738 }
739 ProducerOperation::Remove => {
740 docker
741 .remove_container(&container_id, None)
742 .await
743 .map_err(|e| {
744 CamelError::ProcessorError(format!(
745 "Failed to remove container: {}",
746 e
747 ))
748 })?;
749 untrack_container(&container_id);
750 }
751 _ => {}
752 }
753
754 exchange.input.set_header(
756 HEADER_ACTION_RESULT,
757 serde_json::Value::String("success".to_string()),
758 );
759 }
760 }
761
762 Ok(exchange)
763 })
764 }
765}
766
767pub struct ContainerConsumer {
772 config: ContainerConfig,
773}
774
775#[async_trait]
776impl Consumer for ContainerConsumer {
777 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
778 match self.config.operation.as_str() {
779 "events" => self.start_events_consumer(context).await,
780 "logs" => self.start_logs_consumer(context).await,
781 _ => Err(CamelError::EndpointCreationFailed(format!(
782 "Consumer only supports 'events' or 'logs' operations, got '{}'",
783 self.config.operation
784 ))),
785 }
786 }
787
788 async fn stop(&mut self) -> Result<(), CamelError> {
789 Ok(())
790 }
791
792 fn concurrency_model(&self) -> camel_component::ConcurrencyModel {
793 camel_component::ConcurrencyModel::Concurrent { max: None }
794 }
795}
796
797impl ContainerConsumer {
798 async fn start_events_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
799 use futures::StreamExt;
800
801 loop {
802 if context.is_cancelled() {
803 tracing::info!("Container events consumer shutting down");
804 return Ok(());
805 }
806
807 let docker = match self.config.connect_docker().await {
808 Ok(d) => d,
809 Err(e) => {
810 tracing::error!(
811 "Consumer failed to connect to docker: {}. Retrying in 5s...",
812 e
813 );
814 tokio::select! {
815 _ = context.cancelled() => {
816 tracing::info!("Container events consumer shutting down");
817 return Ok(());
818 }
819 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
820 }
821 continue;
822 }
823 };
824
825 let mut event_stream = docker.events::<String>(None);
826
827 loop {
828 tokio::select! {
829 _ = context.cancelled() => {
830 tracing::info!("Container events consumer shutting down");
831 return Ok(());
832 }
833
834 msg = event_stream.next() => {
835 match msg {
836 Some(Ok(event)) => {
837 let formatted = format_docker_event(&event);
838 let message = Message::new(Body::Text(formatted));
839 let exchange = Exchange::new(message);
840
841 if let Err(e) = context.send(exchange).await {
842 tracing::error!("Failed to send exchange: {:?}", e);
843 break;
844 }
845 }
846 Some(Err(e)) => {
847 tracing::error!("Docker event stream error: {}. Reconnecting...", e);
848 break;
849 }
850 None => {
851 tracing::info!("Docker event stream ended. Reconnecting...");
852 break;
853 }
854 }
855 }
856 }
857 }
858
859 tokio::select! {
860 _ = context.cancelled() => {
861 tracing::info!("Container events consumer shutting down");
862 return Ok(());
863 }
864 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
865 }
866 }
867 }
868
869 async fn start_logs_consumer(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
870 use futures::StreamExt;
871
872 let container_id = self.config.container_id.clone().ok_or_else(|| {
873 CamelError::EndpointCreationFailed(
874 "containerId is required for logs consumer. Use container:logs?containerId=xxx"
875 .to_string(),
876 )
877 })?;
878
879 loop {
880 if context.is_cancelled() {
881 tracing::info!("Container logs consumer shutting down");
882 return Ok(());
883 }
884
885 let docker = match self.config.connect_docker().await {
886 Ok(d) => d,
887 Err(e) => {
888 tracing::error!(
889 "Logs consumer failed to connect to docker: {}. Retrying in 5s...",
890 e
891 );
892 tokio::select! {
893 _ = context.cancelled() => {
894 tracing::info!("Container logs consumer shutting down");
895 return Ok(());
896 }
897 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
898 }
899 continue;
900 }
901 };
902
903 let tail = self
904 .config
905 .tail
906 .clone()
907 .unwrap_or_else(|| "all".to_string());
908
909 let options = bollard::container::LogsOptions::<String> {
910 follow: self.config.follow,
911 stdout: true,
912 stderr: true,
913 timestamps: self.config.timestamps,
914 tail,
915 ..Default::default()
916 };
917
918 let mut log_stream = docker.logs(&container_id, Some(options));
919 let container_id_header = container_id.clone();
920
921 loop {
922 tokio::select! {
923 _ = context.cancelled() => {
924 tracing::info!("Container logs consumer shutting down");
925 return Ok(());
926 }
927
928 msg = log_stream.next() => {
929 match msg {
930 Some(Ok(log_output)) => {
931 let (stream_type, content) = match log_output {
932 bollard::container::LogOutput::StdOut { message } => {
933 ("stdout", String::from_utf8_lossy(&message).into_owned())
934 }
935 bollard::container::LogOutput::StdErr { message } => {
936 ("stderr", String::from_utf8_lossy(&message).into_owned())
937 }
938 bollard::container::LogOutput::Console { message } => {
939 ("console", String::from_utf8_lossy(&message).into_owned())
940 }
941 bollard::container::LogOutput::StdIn { message } => {
942 ("stdin", String::from_utf8_lossy(&message).into_owned())
943 }
944 };
945
946 let content = content.trim_end();
947 if content.is_empty() {
948 continue;
949 }
950
951 let mut message = Message::new(Body::Text(content.to_string()));
952 message.set_header(
953 HEADER_CONTAINER_ID,
954 serde_json::Value::String(container_id_header.clone()),
955 );
956 message.set_header(
957 HEADER_LOG_STREAM,
958 serde_json::Value::String(stream_type.to_string()),
959 );
960
961 if self.config.timestamps
962 && let Some(ts) = extract_timestamp(content) {
963 message.set_header(
964 HEADER_LOG_TIMESTAMP,
965 serde_json::Value::String(ts),
966 );
967 }
968
969 let exchange = Exchange::new(message);
970
971 if let Err(e) = context.send(exchange).await {
972 tracing::error!("Failed to send log exchange: {:?}", e);
973 break;
974 }
975 }
976 Some(Err(e)) => {
977 tracing::error!("Docker log stream error: {}. Reconnecting...", e);
978 break;
979 }
980 None => {
981 if self.config.follow {
982 tracing::info!("Docker log stream ended. Reconnecting...");
983 break;
984 } else {
985 tracing::info!("Container logs consumer finished (follow=false)");
986 return Ok(());
987 }
988 }
989 }
990 }
991 }
992 }
993
994 tokio::select! {
995 _ = context.cancelled() => {
996 tracing::info!("Container logs consumer shutting down");
997 return Ok(());
998 }
999 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
1000 }
1001 }
1002 }
1003}
1004
1005fn extract_timestamp(log_line: &str) -> Option<String> {
1006 let parts: Vec<&str> = log_line.splitn(2, ' ').collect();
1007 if parts.len() > 1 && parts[0].contains('T') {
1008 Some(parts[0].to_string())
1009 } else {
1010 None
1011 }
1012}
1013
1014pub struct ContainerComponent;
1022
1023impl ContainerComponent {
1024 pub fn new() -> Self {
1026 Self
1027 }
1028}
1029
1030impl Default for ContainerComponent {
1031 fn default() -> Self {
1032 Self::new()
1033 }
1034}
1035
1036impl Component for ContainerComponent {
1037 fn scheme(&self) -> &str {
1038 "container"
1039 }
1040
1041 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1042 let config = ContainerConfig::from_uri(uri)?;
1043 Ok(Box::new(ContainerEndpoint {
1044 uri: uri.to_string(),
1045 config,
1046 }))
1047 }
1048}
1049
1050pub struct ContainerEndpoint {
1055 uri: String,
1056 config: ContainerConfig,
1057}
1058
1059impl Endpoint for ContainerEndpoint {
1060 fn uri(&self) -> &str {
1061 &self.uri
1062 }
1063
1064 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1065 Ok(Box::new(ContainerConsumer {
1066 config: self.config.clone(),
1067 }))
1068 }
1069
1070 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1071 let docker = self.config.connect_docker_client()?;
1072 Ok(BoxProcessor::new(ContainerProducer {
1073 config: self.config.clone(),
1074 docker,
1075 }))
1076 }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use super::*;
1082
1083 #[test]
1084 fn test_container_config() {
1085 let config = ContainerConfig::from_uri("container:run?image=alpine").unwrap();
1086 assert_eq!(config.operation, "run");
1087 assert_eq!(config.image.as_deref(), Some("alpine"));
1088 assert_eq!(config.host.as_deref(), Some("unix:///var/run/docker.sock"));
1089 }
1090
1091 #[test]
1092 fn test_container_config_parses_name() {
1093 let config = ContainerConfig::from_uri("container:run?name=my-container").unwrap();
1094 assert_eq!(config.name.as_deref(), Some("my-container"));
1095 }
1096
1097 #[test]
1098 fn test_parse_producer_operation_known() {
1099 assert_eq!(
1100 parse_producer_operation("list").unwrap(),
1101 ProducerOperation::List
1102 );
1103 assert_eq!(
1104 parse_producer_operation("run").unwrap(),
1105 ProducerOperation::Run
1106 );
1107 assert_eq!(
1108 parse_producer_operation("start").unwrap(),
1109 ProducerOperation::Start
1110 );
1111 assert_eq!(
1112 parse_producer_operation("stop").unwrap(),
1113 ProducerOperation::Stop
1114 );
1115 assert_eq!(
1116 parse_producer_operation("remove").unwrap(),
1117 ProducerOperation::Remove
1118 );
1119 }
1120
1121 #[test]
1122 fn test_parse_producer_operation_unknown() {
1123 let err = parse_producer_operation("destruir_mundo").unwrap_err();
1124 match err {
1125 CamelError::ProcessorError(msg) => {
1126 assert!(
1127 msg.contains("Unknown container operation"),
1128 "Unexpected error message: {}",
1129 msg
1130 );
1131 }
1132 _ => panic!("Expected ProcessorError for unknown operation"),
1133 }
1134 }
1135
1136 #[test]
1137 fn test_resolve_container_name_header_overrides_config() {
1138 let config = ContainerConfig::from_uri("container:run?name=config-name").unwrap();
1139 let mut exchange = Exchange::new(Message::new(""));
1140 exchange.input.set_header(
1141 HEADER_CONTAINER_NAME,
1142 serde_json::Value::String("header-name".to_string()),
1143 );
1144
1145 let resolved = resolve_container_name(&exchange, &config);
1146 assert_eq!(resolved.as_deref(), Some("header-name"));
1147 }
1148
1149 #[test]
1150 fn test_container_config_rejects_tcp_host() {
1151 let config = ContainerConfig::from_uri("container:list?host=tcp://localhost:2375").unwrap();
1152 let err = config.connect_docker_client().unwrap_err();
1153 match err {
1154 CamelError::ProcessorError(msg) => {
1155 assert!(
1156 msg.to_lowercase().contains("tcp"),
1157 "Expected TCP scheme error, got: {}",
1158 msg
1159 );
1160 }
1161 _ => panic!("Expected ProcessorError for unsupported tcp host"),
1162 }
1163 }
1164
1165 #[tokio::test]
1166 async fn test_run_container_with_cleanup_removes_on_start_failure() {
1167 let remove_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1168 let remove_called_clone = remove_called.clone();
1169
1170 let result = run_container_with_cleanup(
1171 || async { Ok("container-123".to_string()) },
1172 |_id| async move {
1173 Err(CamelError::ProcessorError(
1174 "Failed to start container".to_string(),
1175 ))
1176 },
1177 move |_id| {
1178 let remove_called_inner = remove_called_clone.clone();
1179 async move {
1180 remove_called_inner.store(true, std::sync::atomic::Ordering::SeqCst);
1181 Ok(())
1182 }
1183 },
1184 )
1185 .await;
1186
1187 assert!(result.is_err(), "Expected start failure to bubble up");
1188 assert!(
1189 remove_called.load(std::sync::atomic::Ordering::SeqCst),
1190 "Expected cleanup to remove container"
1191 );
1192 }
1193
1194 #[test]
1195 fn test_container_component_creates_endpoint() {
1196 let component = ContainerComponent::new();
1197 assert_eq!(component.scheme(), "container");
1198 let endpoint = component
1199 .create_endpoint("container:run?image=alpine")
1200 .unwrap();
1201 assert_eq!(endpoint.uri(), "container:run?image=alpine");
1202 }
1203
1204 #[test]
1205 fn test_container_config_parses_ports() {
1206 let config =
1207 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1208 assert_eq!(config.ports.as_deref(), Some("8080:80,8443:443"));
1209 }
1210
1211 #[test]
1212 fn test_container_config_parses_env() {
1213 let config =
1214 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux").unwrap();
1215 assert_eq!(config.env.as_deref(), Some("FOO=bar,BAZ=qux"));
1216 }
1217
1218 #[test]
1219 fn test_container_config_parses_logs_options() {
1220 let config = ContainerConfig::from_uri(
1221 "container:logs?containerId=my-app&follow=true×tamps=true&tail=100",
1222 )
1223 .unwrap();
1224 assert_eq!(config.operation, "logs");
1225 assert_eq!(config.container_id.as_deref(), Some("my-app"));
1226 assert!(config.follow);
1227 assert!(config.timestamps);
1228 assert_eq!(config.tail.as_deref(), Some("100"));
1229 }
1230
1231 #[test]
1232 fn test_container_config_logs_defaults() {
1233 let config = ContainerConfig::from_uri("container:logs?containerId=test").unwrap();
1234 assert!(config.follow); assert!(!config.timestamps); assert!(config.tail.is_none()); }
1238
1239 #[test]
1240 fn test_parse_ports_single() {
1241 let config = ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80").unwrap();
1242 let (exposed, bindings) = config.parse_ports().unwrap();
1243
1244 assert!(exposed.contains_key("80/tcp"));
1245 assert!(bindings.contains_key("80/tcp"));
1246
1247 let binding = bindings.get("80/tcp").unwrap().as_ref().unwrap();
1248 assert_eq!(binding.len(), 1);
1249 assert_eq!(binding[0].host_port, Some("8080".to_string()));
1250 }
1251
1252 #[test]
1253 fn test_parse_ports_multiple() {
1254 let config =
1255 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80,8443:443").unwrap();
1256 let (exposed, bindings) = config.parse_ports().unwrap();
1257
1258 assert!(exposed.contains_key("80/tcp"));
1259 assert!(exposed.contains_key("443/tcp"));
1260 assert_eq!(bindings.len(), 2);
1261 }
1262
1263 #[test]
1264 fn test_parse_ports_with_protocol() {
1265 let config =
1266 ContainerConfig::from_uri("container:run?image=nginx&ports=8080:80/tcp,5353:53/udp")
1267 .unwrap();
1268 let (exposed, bindings) = config.parse_ports().unwrap();
1269
1270 assert!(exposed.contains_key("80/tcp"));
1271 assert!(exposed.contains_key("53/udp"));
1272 }
1273
1274 #[test]
1275 fn test_parse_ports_none() {
1276 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1277 assert!(config.parse_ports().is_none());
1278 }
1279
1280 #[test]
1281 fn test_parse_env_single() {
1282 let config = ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar").unwrap();
1283 let env = config.parse_env().unwrap();
1284
1285 assert_eq!(env.len(), 1);
1286 assert_eq!(env[0], "FOO=bar");
1287 }
1288
1289 #[test]
1290 fn test_parse_env_multiple() {
1291 let config =
1292 ContainerConfig::from_uri("container:run?image=nginx&env=FOO=bar,BAZ=qux,NUM=123")
1293 .unwrap();
1294 let env = config.parse_env().unwrap();
1295
1296 assert_eq!(env.len(), 3);
1297 assert!(env.contains(&"FOO=bar".to_string()));
1298 assert!(env.contains(&"BAZ=qux".to_string()));
1299 assert!(env.contains(&"NUM=123".to_string()));
1300 }
1301
1302 #[test]
1303 fn test_parse_env_none() {
1304 let config = ContainerConfig::from_uri("container:run?image=nginx").unwrap();
1305 assert!(config.parse_env().is_none());
1306 }
1307
1308 mod test_helpers {
1309 use async_trait::async_trait;
1310 use camel_api::CamelError;
1311
1312 pub struct NullRouteController;
1314
1315 #[async_trait]
1316 impl camel_api::RouteController for NullRouteController {
1317 async fn start_route(&mut self, _: &str) -> Result<(), CamelError> {
1318 Ok(())
1319 }
1320 async fn stop_route(&mut self, _: &str) -> Result<(), CamelError> {
1321 Ok(())
1322 }
1323 async fn restart_route(&mut self, _: &str) -> Result<(), CamelError> {
1324 Ok(())
1325 }
1326 async fn suspend_route(&mut self, _: &str) -> Result<(), CamelError> {
1327 Ok(())
1328 }
1329 async fn resume_route(&mut self, _: &str) -> Result<(), CamelError> {
1330 Ok(())
1331 }
1332 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
1333 None
1334 }
1335 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
1336 Ok(())
1337 }
1338 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
1339 Ok(())
1340 }
1341 }
1342 }
1343
1344 use camel_api::Message;
1345 use std::sync::Arc;
1346 use test_helpers::NullRouteController;
1347 use tokio::sync::Mutex;
1348
1349 #[tokio::test]
1350 async fn test_container_producer_resolves_operation_from_header() {
1351 let docker = match Docker::connect_with_local_defaults() {
1353 Ok(d) => d,
1354 Err(_) => {
1355 eprintln!("Skipping test: Could not connect to Docker daemon");
1356 return;
1357 }
1358 };
1359
1360 if docker.ping().await.is_err() {
1361 eprintln!("Skipping test: Docker daemon not responding to ping");
1362 return;
1363 }
1364
1365 let component = ContainerComponent::new();
1366 let endpoint = component.create_endpoint("container:run").unwrap();
1367
1368 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1369 let mut producer = endpoint.create_producer(&ctx).unwrap();
1370
1371 let mut exchange = Exchange::new(Message::new(""));
1372 exchange
1373 .input
1374 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1375
1376 use tower::ServiceExt;
1377 let result = producer
1378 .ready()
1379 .await
1380 .unwrap()
1381 .call(exchange)
1382 .await
1383 .unwrap();
1384
1385 assert_eq!(
1387 result
1388 .input
1389 .header(HEADER_ACTION_RESULT)
1390 .map(|v| v.as_str().unwrap()),
1391 Some("success")
1392 );
1393 }
1394
1395 #[tokio::test]
1396 async fn test_container_producer_connection_error_on_invalid_host() {
1397 let component = ContainerComponent::new();
1399 let endpoint = component
1400 .create_endpoint("container:list?host=unix:///nonexistent/docker.sock")
1401 .unwrap();
1402
1403 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1404 let result = endpoint.create_producer(&ctx);
1405
1406 assert!(
1408 result.is_err(),
1409 "Expected error when connecting to invalid host"
1410 );
1411 let err = result.unwrap_err();
1412 match &err {
1413 CamelError::ProcessorError(msg) => {
1414 assert!(
1415 msg.to_lowercase().contains("connection")
1416 || msg.to_lowercase().contains("connect")
1417 || msg.to_lowercase().contains("socket")
1418 || msg.contains("docker"),
1419 "Error message should indicate connection failure, got: {}",
1420 msg
1421 );
1422 }
1423 _ => panic!("Expected ProcessorError, got: {:?}", err),
1424 }
1425 }
1426
1427 #[tokio::test]
1429 async fn test_container_producer_lifecycle_operations_missing_id() {
1430 let docker = match Docker::connect_with_local_defaults() {
1432 Ok(d) => d,
1433 Err(_) => {
1434 eprintln!("Skipping test: Could not connect to Docker daemon");
1435 return;
1436 }
1437 };
1438
1439 if docker.ping().await.is_err() {
1440 eprintln!("Skipping test: Docker daemon not responding to ping");
1441 return;
1442 }
1443
1444 let component = ContainerComponent::new();
1445 let endpoint = component.create_endpoint("container:start").unwrap();
1446 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1447 let mut producer = endpoint.create_producer(&ctx).unwrap();
1448
1449 for operation in ["start", "stop", "remove"] {
1451 let mut exchange = Exchange::new(Message::new(""));
1452 exchange.input.set_header(
1453 HEADER_ACTION,
1454 serde_json::Value::String(operation.to_string()),
1455 );
1456 use tower::ServiceExt;
1459 let result = producer.ready().await.unwrap().call(exchange).await;
1460
1461 assert!(
1462 result.is_err(),
1463 "Expected error for {} operation without CamelContainerId",
1464 operation
1465 );
1466 let err = result.unwrap_err();
1467 match &err {
1468 CamelError::ProcessorError(msg) => {
1469 assert!(
1470 msg.contains(HEADER_CONTAINER_ID),
1471 "Error message should mention {}, got: {}",
1472 HEADER_CONTAINER_ID,
1473 msg
1474 );
1475 }
1476 _ => panic!("Expected ProcessorError for {}, got: {:?}", operation, err),
1477 }
1478 }
1479 }
1480
1481 #[tokio::test]
1483 async fn test_container_producer_stop_nonexistent() {
1484 let docker = match Docker::connect_with_local_defaults() {
1486 Ok(d) => d,
1487 Err(_) => {
1488 eprintln!("Skipping test: Could not connect to Docker daemon");
1489 return;
1490 }
1491 };
1492
1493 if docker.ping().await.is_err() {
1494 eprintln!("Skipping test: Docker daemon not responding to ping");
1495 return;
1496 }
1497
1498 let component = ContainerComponent::new();
1499 let endpoint = component.create_endpoint("container:stop").unwrap();
1500 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1501 let mut producer = endpoint.create_producer(&ctx).unwrap();
1502
1503 let mut exchange = Exchange::new(Message::new(""));
1504 exchange
1505 .input
1506 .set_header(HEADER_ACTION, serde_json::Value::String("stop".into()));
1507 exchange.input.set_header(
1508 HEADER_CONTAINER_ID,
1509 serde_json::Value::String("nonexistent-container-123".into()),
1510 );
1511
1512 use tower::ServiceExt;
1513 let result = producer.ready().await.unwrap().call(exchange).await;
1514
1515 assert!(
1516 result.is_err(),
1517 "Expected error when stopping nonexistent container"
1518 );
1519 let err = result.unwrap_err();
1520 match &err {
1521 CamelError::ProcessorError(msg) => {
1522 assert!(
1524 msg.to_lowercase().contains("no such container")
1525 || msg.to_lowercase().contains("not found")
1526 || msg.contains("404"),
1527 "Error message should indicate container not found, got: {}",
1528 msg
1529 );
1530 }
1531 _ => panic!("Expected ProcessorError, got: {:?}", err),
1532 }
1533 }
1534
1535 #[tokio::test]
1537 async fn test_container_producer_run_missing_image() {
1538 let docker = match Docker::connect_with_local_defaults() {
1540 Ok(d) => d,
1541 Err(_) => {
1542 eprintln!("Skipping test: Could not connect to Docker daemon");
1543 return;
1544 }
1545 };
1546
1547 if docker.ping().await.is_err() {
1548 eprintln!("Skipping test: Docker daemon not responding to ping");
1549 return;
1550 }
1551
1552 let component = ContainerComponent::new();
1554 let endpoint = component.create_endpoint("container:run").unwrap();
1555 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1556 let mut producer = endpoint.create_producer(&ctx).unwrap();
1557
1558 let mut exchange = Exchange::new(Message::new(""));
1559 exchange
1560 .input
1561 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1562 use tower::ServiceExt;
1565 let result = producer.ready().await.unwrap().call(exchange).await;
1566
1567 assert!(
1568 result.is_err(),
1569 "Expected error for run operation without image"
1570 );
1571 let err = result.unwrap_err();
1572 match &err {
1573 CamelError::ProcessorError(msg) => {
1574 assert!(
1575 msg.to_lowercase().contains("image"),
1576 "Error message should mention 'image', got: {}",
1577 msg
1578 );
1579 }
1580 _ => panic!("Expected ProcessorError, got: {:?}", err),
1581 }
1582 }
1583
1584 #[tokio::test]
1586 async fn test_container_producer_run_image_from_header() {
1587 let docker = match Docker::connect_with_local_defaults() {
1589 Ok(d) => d,
1590 Err(_) => {
1591 eprintln!("Skipping test: Could not connect to Docker daemon");
1592 return;
1593 }
1594 };
1595
1596 if docker.ping().await.is_err() {
1597 eprintln!("Skipping test: Docker daemon not responding to ping");
1598 return;
1599 }
1600
1601 let component = ContainerComponent::new();
1603 let endpoint = component.create_endpoint("container:run").unwrap();
1604 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1605 let mut producer = endpoint.create_producer(&ctx).unwrap();
1606
1607 let mut exchange = Exchange::new(Message::new(""));
1608 exchange
1609 .input
1610 .set_header(HEADER_ACTION, serde_json::Value::String("run".into()));
1611 exchange.input.set_header(
1613 HEADER_IMAGE,
1614 serde_json::Value::String("nonexistent-image-xyz-12345:latest".into()),
1615 );
1616
1617 use tower::ServiceExt;
1618 let result = producer.ready().await.unwrap().call(exchange).await;
1619
1620 assert!(
1622 result.is_err(),
1623 "Expected error when running container with nonexistent image"
1624 );
1625 let err = result.unwrap_err();
1626 match &err {
1627 CamelError::ProcessorError(msg) => {
1628 assert!(
1630 msg.to_lowercase().contains("no such image")
1631 || msg.to_lowercase().contains("not found")
1632 || msg.to_lowercase().contains("image")
1633 || msg.to_lowercase().contains("pull")
1634 || msg.contains("404"),
1635 "Error message should indicate image issue, got: {}",
1636 msg
1637 );
1638 }
1639 _ => panic!("Expected ProcessorError, got: {:?}", err),
1640 }
1641 }
1642
1643 #[tokio::test]
1646 async fn test_container_producer_run_alpine_container() {
1647 let docker = match Docker::connect_with_local_defaults() {
1648 Ok(d) => d,
1649 Err(_) => {
1650 eprintln!("Skipping test: Could not connect to Docker daemon");
1651 return;
1652 }
1653 };
1654
1655 if docker.ping().await.is_err() {
1656 eprintln!("Skipping test: Docker daemon not responding to ping");
1657 return;
1658 }
1659
1660 let images = docker.list_images::<&str>(None).await.unwrap();
1662 let has_alpine = images
1663 .iter()
1664 .any(|img| img.repo_tags.iter().any(|t| t.starts_with("alpine")));
1665
1666 if !has_alpine {
1667 eprintln!("Pulling alpine:latest image...");
1668 let mut stream = docker.create_image(
1669 Some(bollard::image::CreateImageOptions {
1670 from_image: "alpine:latest",
1671 ..Default::default()
1672 }),
1673 None,
1674 None,
1675 );
1676
1677 use futures::StreamExt;
1678 while let Some(_item) = stream.next().await {
1679 }
1681 eprintln!("Image pulled successfully");
1682 }
1683
1684 let component = ContainerComponent::new();
1686 let endpoint = component.create_endpoint("container:run").unwrap();
1687 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1688 let mut producer = endpoint.create_producer(&ctx).unwrap();
1689
1690 let timestamp = std::time::SystemTime::now()
1692 .duration_since(std::time::UNIX_EPOCH)
1693 .unwrap()
1694 .as_millis();
1695 let container_name = format!("test-rust-camel-{}", timestamp);
1696 let mut exchange = Exchange::new(Message::new(""));
1697 exchange.input.set_header(
1698 HEADER_IMAGE,
1699 serde_json::Value::String("alpine:latest".into()),
1700 );
1701 exchange.input.set_header(
1702 HEADER_CONTAINER_NAME,
1703 serde_json::Value::String(container_name.clone()),
1704 );
1705
1706 use tower::ServiceExt;
1707 let result = producer
1708 .ready()
1709 .await
1710 .unwrap()
1711 .call(exchange)
1712 .await
1713 .expect("Container run should succeed");
1714
1715 let container_id = result
1717 .input
1718 .header(HEADER_CONTAINER_ID)
1719 .and_then(|v| v.as_str().map(|s| s.to_string()))
1720 .expect("Expected container ID header");
1721 assert!(!container_id.is_empty(), "Container ID should not be empty");
1722
1723 assert_eq!(
1725 result
1726 .input
1727 .header(HEADER_ACTION_RESULT)
1728 .and_then(|v| v.as_str()),
1729 Some("success")
1730 );
1731
1732 let inspect = docker
1734 .inspect_container(&container_id, None)
1735 .await
1736 .expect("Container should exist");
1737 assert_eq!(
1738 inspect.id.as_ref().map(|s| s.as_str()),
1739 Some(container_id.as_str())
1740 );
1741
1742 docker
1744 .remove_container(
1745 &container_id,
1746 Some(bollard::container::RemoveContainerOptions {
1747 force: true,
1748 ..Default::default()
1749 }),
1750 )
1751 .await
1752 .ok();
1753
1754 eprintln!("✅ Container {} created and cleaned up", container_id);
1755 }
1756
1757 #[tokio::test]
1759 async fn test_container_consumer_unsupported_operation() {
1760 use tokio::sync::mpsc;
1761
1762 let component = ContainerComponent::new();
1763 let endpoint = component.create_endpoint("container:run").unwrap();
1764 let mut consumer = endpoint.create_consumer().unwrap();
1765
1766 let (tx, _rx) = mpsc::channel(16);
1768 let cancel_token = tokio_util::sync::CancellationToken::new();
1769 let context = ConsumerContext::new(tx, cancel_token);
1770
1771 let result = consumer.start(context).await;
1772
1773 assert!(
1775 result.is_err(),
1776 "Expected error for unsupported consumer operation"
1777 );
1778 let err = result.unwrap_err();
1779 match &err {
1780 CamelError::EndpointCreationFailed(msg) => {
1781 assert!(
1782 msg.contains("Consumer only supports 'events' or 'logs'"),
1783 "Error message should mention events or logs support, got: {}",
1784 msg
1785 );
1786 }
1787 _ => panic!("Expected EndpointCreationFailed error, got: {:?}", err),
1788 }
1789 }
1790
1791 #[test]
1792 fn test_container_consumer_concurrency_model_is_concurrent() {
1793 let consumer = ContainerConsumer {
1794 config: ContainerConfig::from_uri("container:events").unwrap(),
1795 };
1796
1797 assert_eq!(
1798 consumer.concurrency_model(),
1799 camel_component::ConcurrencyModel::Concurrent { max: None }
1800 );
1801 }
1802
1803 #[tokio::test]
1807 async fn test_container_consumer_cancellation() {
1808 use std::sync::atomic::{AtomicBool, Ordering};
1809 use tokio::sync::mpsc;
1810
1811 let docker = match Docker::connect_with_local_defaults() {
1813 Ok(d) => d,
1814 Err(_) => {
1815 eprintln!("Skipping test: Could not connect to Docker daemon");
1816 return;
1817 }
1818 };
1819
1820 if docker.ping().await.is_err() {
1821 eprintln!("Skipping test: Docker daemon not responding to ping");
1822 return;
1823 }
1824
1825 let component = ContainerComponent::new();
1826 let endpoint = component.create_endpoint("container:events").unwrap();
1827 let mut consumer = endpoint.create_consumer().unwrap();
1828
1829 let (tx, _rx) = mpsc::channel(16);
1831 let cancel_token = tokio_util::sync::CancellationToken::new();
1832 let context = ConsumerContext::new(tx, cancel_token.clone());
1833
1834 let completed = Arc::new(AtomicBool::new(false));
1836 let completed_clone = completed.clone();
1837
1838 let handle = tokio::spawn(async move {
1840 let result = consumer.start(context).await;
1841 completed_clone.store(true, Ordering::SeqCst);
1843 result
1844 });
1845
1846 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1848
1849 assert!(
1851 !completed.load(Ordering::SeqCst),
1852 "Consumer should still be running before cancellation"
1853 );
1854
1855 cancel_token.cancel();
1857
1858 let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), handle).await;
1860
1861 assert!(
1863 result.is_ok(),
1864 "Consumer should gracefully shut down after cancellation"
1865 );
1866
1867 assert!(
1869 completed.load(Ordering::SeqCst),
1870 "Consumer should have completed after cancellation"
1871 );
1872 }
1873
1874 #[tokio::test]
1878 async fn test_container_producer_list_containers() {
1879 let docker = match Docker::connect_with_local_defaults() {
1882 Ok(d) => d,
1883 Err(_) => {
1884 eprintln!("Skipping test: Could not connect to Docker daemon");
1885 return;
1886 }
1887 };
1888
1889 if docker.ping().await.is_err() {
1890 eprintln!("Skipping test: Docker daemon not responding to ping");
1891 return;
1892 }
1893
1894 let component = ContainerComponent::new();
1896 let endpoint = component.create_endpoint("container:list").unwrap();
1897
1898 let ctx = ProducerContext::new(Arc::new(Mutex::new(NullRouteController)));
1899 let mut producer = endpoint.create_producer(&ctx).unwrap();
1900
1901 let mut exchange = Exchange::new(Message::new(""));
1903 exchange
1904 .input
1905 .set_header(HEADER_ACTION, serde_json::Value::String("list".into()));
1906
1907 use tower::ServiceExt;
1909 let result = producer
1910 .ready()
1911 .await
1912 .unwrap()
1913 .call(exchange)
1914 .await
1915 .expect("Producer should succeed when Docker is available");
1916
1917 match &result.input.body {
1920 camel_api::Body::Json(json_value) => {
1921 assert!(
1922 json_value.is_array(),
1923 "Expected input body to be a JSON array, got: {:?}",
1924 json_value
1925 );
1926 }
1927 other => panic!("Expected Body::Json with array, got: {:?}", other),
1928 }
1929 }
1930}